handlers.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. """
  2. Queue handlers
  3. """
  4. import random
  5. import json
  6. import os
  7. import uuid
  8. import boto3
  9. from decimal import Decimal
  10. from boto_utils import (
  11. DecimalEncoder,
  12. get_config,
  13. get_user_info,
  14. paginate,
  15. running_job_exists,
  16. utc_timestamp,
  17. deserialize_item,
  18. )
  19. from decorators import (
  20. with_logging,
  21. catch_errors,
  22. add_cors_headers,
  23. json_body_loader,
  24. load_schema,
  25. request_validator,
  26. )
  27. sfn_client = boto3.client("stepfunctions")
  28. ddb_client = boto3.client("dynamodb")
  29. ddb_resource = boto3.resource("dynamodb")
  30. deletion_queue_table_name = os.getenv("DeletionQueueTable", "S3F2_DeletionQueue")
  31. deletion_queue_table = ddb_resource.Table(deletion_queue_table_name)
  32. jobs_table = ddb_resource.Table(os.getenv("JobTable", "S3F2_Jobs"))
  33. bucket_count = int(os.getenv("GSIBucketCount", 1))
  34. max_size_bytes = 375000
  35. @with_logging
  36. @add_cors_headers
  37. @json_body_loader
  38. @catch_errors
  39. def enqueue_handler(event, context):
  40. body = event["body"]
  41. validate_queue_items([body])
  42. user_info = get_user_info(event)
  43. item = enqueue_items([body], user_info)[0]
  44. deletion_queue_table.put_item(Item=item)
  45. return {"statusCode": 201, "body": json.dumps(item, cls=DecimalEncoder)}
  46. @with_logging
  47. @add_cors_headers
  48. @json_body_loader
  49. @catch_errors
  50. def enqueue_batch_handler(event, context):
  51. body = event["body"]
  52. matches = body["Matches"]
  53. validate_queue_items(matches)
  54. user_info = get_user_info(event)
  55. items = enqueue_items(matches, user_info)
  56. return {
  57. "statusCode": 201,
  58. "body": json.dumps({"Matches": items}, cls=DecimalEncoder),
  59. }
  60. @with_logging
  61. @add_cors_headers
  62. @request_validator(load_schema("list_queue_items"))
  63. @catch_errors
  64. def get_handler(event, context):
  65. defaults = {"Type": "Simple"}
  66. qs = event.get("queryStringParameters")
  67. if not qs:
  68. qs = {}
  69. page_size = int(qs.get("page_size", 10))
  70. scan_params = {"Limit": page_size}
  71. start_at = qs.get("start_at")
  72. if start_at:
  73. scan_params["ExclusiveStartKey"] = {"DeletionQueueItemId": start_at}
  74. items = deletion_queue_table.scan(**scan_params).get("Items", [])
  75. if len(items) < page_size:
  76. next_start = None
  77. else:
  78. next_start = items[-1]["DeletionQueueItemId"]
  79. return {
  80. "statusCode": 200,
  81. "body": json.dumps(
  82. {
  83. "MatchIds": list(map(lambda item: dict(defaults, **item), items)),
  84. "NextStart": next_start,
  85. },
  86. cls=DecimalEncoder,
  87. ),
  88. "headers": {"Access-Control-Expose-Headers": "content-length"},
  89. }
  90. @with_logging
  91. @add_cors_headers
  92. @json_body_loader
  93. @catch_errors
  94. def cancel_handler(event, context):
  95. if running_job_exists():
  96. raise ValueError("Cannot delete matches whilst there is a job in progress")
  97. body = event["body"]
  98. matches = body["Matches"]
  99. with deletion_queue_table.batch_writer() as batch:
  100. for match in matches:
  101. batch.delete_item(Key={"DeletionQueueItemId": match["DeletionQueueItemId"]})
  102. return {"statusCode": 204}
  103. @with_logging
  104. @add_cors_headers
  105. @catch_errors
  106. def process_handler(event, context):
  107. if running_job_exists():
  108. raise ValueError("There is already a job in progress")
  109. job_id = str(uuid.uuid4())
  110. config = get_config()
  111. item = {
  112. "Id": job_id,
  113. "Sk": job_id,
  114. "Type": "Job",
  115. "JobStatus": "QUEUED",
  116. "GSIBucket": str(random.randint(0, bucket_count - 1)),
  117. "CreatedAt": utc_timestamp(),
  118. "CreatedBy": get_user_info(event),
  119. **{k: v for k, v in config.items() if k not in ["JobDetailsRetentionDays"]},
  120. }
  121. if int(config.get("JobDetailsRetentionDays", 0)) > 0:
  122. item["Expires"] = utc_timestamp(days=config["JobDetailsRetentionDays"])
  123. jobs_table.put_item(Item=item)
  124. return {"statusCode": 202, "body": json.dumps(item, cls=DecimalEncoder)}
  125. def validate_queue_items(items):
  126. for item in items:
  127. if item.get("Type", "Simple") == "Composite":
  128. is_array = isinstance(item["MatchId"], list)
  129. enough_columns = is_array and len(item["MatchId"]) > 0
  130. just_one_mapper = len(item["DataMappers"]) == 1
  131. if not is_array:
  132. raise ValueError(
  133. "MatchIds of Composite type need to be specified as array"
  134. )
  135. if not enough_columns:
  136. raise ValueError(
  137. "MatchIds of Composite type need to have a value for at least one column"
  138. )
  139. if not just_one_mapper:
  140. raise ValueError(
  141. "MatchIds of Composite type need to be associated to exactly one Data Mapper"
  142. )
  143. def enqueue_items(matches, user_info):
  144. items = []
  145. with deletion_queue_table.batch_writer() as batch:
  146. for match in matches:
  147. match_id = match["MatchId"]
  148. data_mappers = match.get("DataMappers", [])
  149. item = {
  150. "DeletionQueueItemId": str(uuid.uuid4()),
  151. "Type": match.get("Type", "Simple"),
  152. "MatchId": match_id,
  153. "CreatedAt": utc_timestamp(),
  154. "DataMappers": data_mappers,
  155. "CreatedBy": user_info,
  156. }
  157. batch.put_item(Item=item)
  158. items.append(item)
  159. return items