stream_processor.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import logging
  2. from datetime import datetime, timezone
  3. from os import getenv
  4. import json
  5. import boto3
  6. from boto3.dynamodb.types import TypeDeserializer
  7. from botocore.exceptions import ClientError
  8. from itertools import groupby
  9. from operator import itemgetter
  10. from stats_updater import update_stats
  11. from status_updater import update_status, skip_cleanup_states
  12. from boto_utils import (
  13. DecimalEncoder,
  14. deserialize_item,
  15. emit_event,
  16. fetch_job_manifest,
  17. json_lines_iterator,
  18. utc_timestamp,
  19. )
  20. from decorators import with_logging
  21. deserializer = TypeDeserializer()
  22. logger = logging.getLogger()
  23. logger.setLevel(logging.INFO)
  24. client = boto3.client("stepfunctions")
  25. ddb = boto3.resource("dynamodb")
  26. glue = boto3.client("glue")
  27. state_machine_arn = getenv("StateMachineArn")
  28. q_table = ddb.Table(getenv("DeletionQueueTable"))
  29. glue_db = getenv("GlueDatabase", "s3f2_manifests_database")
  30. glue_table = getenv("JobManifestsGlueTable", "s3f2_manifests_table")
  31. @with_logging
  32. def handler(event, context):
  33. records = event["Records"]
  34. new_jobs = get_records(records, "Job", "INSERT")
  35. deleted_jobs = get_records(records, "Job", "REMOVE", new_image=False)
  36. events = get_records(records, "JobEvent", "INSERT")
  37. grouped_events = groupby(sorted(events, key=itemgetter("Id")), key=itemgetter("Id"))
  38. for job in new_jobs:
  39. process_job(job)
  40. for job in deleted_jobs:
  41. cleanup_manifests(job)
  42. for job_id, group in grouped_events:
  43. group = [i for i in group]
  44. update_stats(job_id, group)
  45. updated_job = update_status(job_id, group)
  46. # Perform cleanup if required
  47. if (
  48. updated_job
  49. and updated_job.get("JobStatus") == "FORGET_COMPLETED_CLEANUP_IN_PROGRESS"
  50. ):
  51. try:
  52. clear_deletion_queue(updated_job)
  53. emit_event(
  54. job_id, "CleanupSucceeded", utc_timestamp(), "StreamProcessor"
  55. )
  56. except Exception as e:
  57. emit_event(
  58. job_id,
  59. "CleanupFailed",
  60. {"Error": "Unable to clear deletion queue: {}".format(str(e))},
  61. "StreamProcessor",
  62. )
  63. elif updated_job and updated_job.get("JobStatus") in skip_cleanup_states:
  64. emit_event(job_id, "CleanupSkipped", utc_timestamp(), "StreamProcessor")
  65. def process_job(job):
  66. job_id = job["Id"]
  67. state = {
  68. k: job[k]
  69. for k in [
  70. "AthenaConcurrencyLimit",
  71. "AthenaQueryMaxRetries",
  72. "DeletionTasksMaxNumber",
  73. "ForgetQueueWaitSeconds",
  74. "Id",
  75. "QueryExecutionWaitSeconds",
  76. "QueryQueueWaitSeconds",
  77. ]
  78. }
  79. try:
  80. client.start_execution(
  81. stateMachineArn=state_machine_arn,
  82. name=job_id,
  83. input=json.dumps(state, cls=DecimalEncoder),
  84. )
  85. except client.exceptions.ExecutionAlreadyExists:
  86. logger.warning("Execution %s already exists", job_id)
  87. except (ClientError, ValueError) as e:
  88. emit_event(
  89. job_id,
  90. "Exception",
  91. {
  92. "Error": "ExecutionFailure",
  93. "Cause": "Unable to start StepFunction execution: {}".format(str(e)),
  94. },
  95. "StreamProcessor",
  96. )
  97. def cleanup_manifests(job):
  98. logger.info("Removing job manifest partitions")
  99. job_id = job["Id"]
  100. partitions = []
  101. for manifest in job.get("Manifests", []):
  102. data_mapper_id = manifest.split("/")[5]
  103. partitions.append([job_id, data_mapper_id])
  104. max_deletion_batch_size = 25
  105. for i in range(0, len(partitions), max_deletion_batch_size):
  106. glue.batch_delete_partition(
  107. DatabaseName=glue_db,
  108. TableName=glue_table,
  109. PartitionsToDelete=[
  110. {"Values": partition_tuple}
  111. for partition_tuple in partitions[i : i + max_deletion_batch_size]
  112. ],
  113. )
  114. def clear_deletion_queue(job):
  115. logger.info("Clearing successfully deleted matches")
  116. to_delete = set()
  117. for manifest_object in job.get("Manifests", []):
  118. manifest = fetch_job_manifest(manifest_object)
  119. for line in json_lines_iterator(manifest):
  120. to_delete.add(line["DeletionQueueItemId"])
  121. with q_table.batch_writer() as batch:
  122. for item_id in to_delete:
  123. batch.delete_item(Key={"DeletionQueueItemId": item_id})
  124. def is_operation(record, operation):
  125. return record.get("eventName") == operation
  126. def is_record_type(record, record_type, new_image):
  127. image = record["dynamodb"].get("NewImage" if new_image else "OldImage")
  128. if not image:
  129. return False
  130. item = deserialize_item(image)
  131. return item.get("Type") and item.get("Type") == record_type
  132. def get_records(records, record_type, operation, new_image=True):
  133. return [
  134. deserialize_item(r["dynamodb"].get("NewImage" if new_image else "OldImage", {}))
  135. for r in records
  136. if is_record_type(r, record_type, new_image) and is_operation(r, operation)
  137. ]