events.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import json
  2. import os
  3. import logging
  4. import urllib.request
  5. import urllib.error
  6. from collections.abc import Iterable
  7. from functools import lru_cache
  8. from boto_utils import emit_event
  9. logger = logging.getLogger(__name__)
  10. def emit_deletion_event(message_body, stats):
  11. job_id = message_body["JobId"]
  12. event_data = {
  13. "Statistics": stats,
  14. "Object": message_body["Object"],
  15. }
  16. emit_event(job_id, "ObjectUpdated", event_data, get_emitter_id())
  17. def emit_skipped_event(message_body, skip_reason):
  18. job_id = message_body["JobId"]
  19. event_data = {
  20. "Object": message_body["Object"],
  21. "Reason": skip_reason,
  22. }
  23. emit_event(job_id, "ObjectUpdateSkipped", event_data, get_emitter_id())
  24. def emit_failure_event(message_body, err_message, event_name):
  25. json_body = json.loads(message_body)
  26. job_id = json_body.get("JobId")
  27. if not job_id:
  28. raise ValueError("Message missing Job ID")
  29. event_data = {
  30. "Error": err_message,
  31. "Message": json_body,
  32. }
  33. emit_event(job_id, event_name, event_data, get_emitter_id())
  34. def sanitize_message(err_message, message_body):
  35. """
  36. Obtain all the known match IDs from the original message and ensure
  37. they are masked in the given err message
  38. """
  39. try:
  40. sanitised = err_message
  41. if not isinstance(message_body, dict):
  42. message_body = json.loads(message_body)
  43. matches = []
  44. cols = message_body.get("Columns", [])
  45. for col in cols:
  46. match_ids = col.get("MatchIds")
  47. if isinstance(match_ids, Iterable):
  48. matches.extend(match_ids)
  49. for m in matches:
  50. sanitised = sanitised.replace(str(m), "*** MATCH ID ***")
  51. return sanitised
  52. except (json.decoder.JSONDecodeError, ValueError):
  53. return err_message
  54. @lru_cache()
  55. def get_emitter_id():
  56. metadata_endpoint = os.getenv("ECS_CONTAINER_METADATA_URI")
  57. if metadata_endpoint:
  58. res = ""
  59. try:
  60. res = urllib.request.urlopen(metadata_endpoint, timeout=1).read()
  61. metadata = json.loads(res)
  62. return "ECSTask_{}".format(
  63. metadata["Labels"]["com.amazonaws.ecs.task-arn"].rsplit("/", 1)[1]
  64. )
  65. except urllib.error.URLError as e:
  66. logger.warning(
  67. "Error when accessing the metadata service: {}".format(e.reason)
  68. )
  69. except (AttributeError, KeyError, IndexError) as e:
  70. logger.warning(
  71. "Malformed response from the metadata service: {}".format(res)
  72. )
  73. except Exception as e:
  74. logger.warning(
  75. "Error when getting emitter id from metadata service: {}".format(str(e))
  76. )
  77. return "ECSTask"