handlers.py 5.2 KB

  1. """
  2. Queue handlers
  3. """
  4. import random
  5. import json
  6. import os
  7. import uuid
  8. import boto3
  9. from decimal import Decimal
  10. from boto_utils import (
  11. DecimalEncoder,
  12. get_config,
  13. get_user_info,
  14. paginate,
  15. running_job_exists,
  16. utc_timestamp,
  17. deserialize_item,
  18. )
  19. from decorators import (
  20. with_logging,
  21. catch_errors,
  22. add_cors_headers,
  23. json_body_loader,
  24. load_schema,
  25. request_validator,
  26. )
  27. sfn_client = boto3.client("stepfunctions")
  28. ddb_client = boto3.client("dynamodb")
  29. ddb_resource = boto3.resource("dynamodb")
  30. deletion_queue_table_name = os.getenv("DeletionQueueTable", "S3F2_DeletionQueue")
  31. deletion_queue_table = ddb_resource.Table(deletion_queue_table_name)
  32. jobs_table = ddb_resource.Table(os.getenv("JobTable", "S3F2_Jobs"))
  33. bucket_count = int(os.getenv("GSIBucketCount", 1))
  34. max_size_bytes = 375000
  35. @with_logging
  36. @add_cors_headers
  37. @json_body_loader
  38. @catch_errors
  39. def enqueue_handler(event, context):
  40. body = event["body"]
  41. validate_queue_items([body])
  42. user_info = get_user_info(event)
  43. item = enqueue_items([body], user_info)[0]
  44. deletion_queue_table.put_item(Item=item)
  45. return {"statusCode": 201, "body": json.dumps(item, cls=DecimalEncoder)}
  46. @with_logging
  47. @add_cors_headers
  48. @json_body_loader
  49. @catch_errors
  50. def enqueue_batch_handler(event, context):
  51. body = event["body"]
  52. matches = body["Matches"]
  53. validate_queue_items(matches)
  54. user_info = get_user_info(event)
  55. items = enqueue_items(matches, user_info)
  56. return {
  57. "statusCode": 201,
  58. "body": json.dumps({"Matches": items}, cls=DecimalEncoder),
  59. }
  60. @with_logging
  61. @add_cors_headers
  62. @request_validator(load_schema("list_queue_items"))
  63. @catch_errors
  64. def get_handler(event, context):
  65. defaults = {"Type": "Simple"}
  66. qs = event.get("queryStringParameters")
  67. if not qs:
  68. qs = {}
  69. page_size = int(qs.get("page_size", 10))
  70. scan_params = {"Limit": page_size}
  71. start_at = qs.get("start_at")
  72. if start_at:
  73. scan_params["ExclusiveStartKey"] = {"DeletionQueueItemId": start_at}
  74. items = deletion_queue_table.scan(**scan_params).get("Items", [])
  75. if len(items) < page_size:
  76. next_start = None
  77. else:
  78. next_start = items[-1]["DeletionQueueItemId"]
  79. return {
  80. "statusCode": 200,
  81. "body": json.dumps(
  82. {
  83. "MatchIds": list(map(lambda item: dict(defaults, **item), items)),
  84. "NextStart": next_start,
  85. },
  86. cls=DecimalEncoder,
  87. ),
  88. "headers": {"Access-Control-Expose-Headers": "content-length"},
  89. }
  90. @with_logging
  91. @add_cors_headers
  92. @json_body_loader
  93. @catch_errors
  94. def cancel_handler(event, context):
  95. if running_job_exists():
  96. raise ValueError("Cannot delete matches whilst there is a job in progress")
  97. body = event["body"]
  98. matches = body["Matches"]
  99. with deletion_queue_table.batch_writer() as batch:
  100. for match in matches:
  101. batch.delete_item(Key={"DeletionQueueItemId": match["DeletionQueueItemId"]})
  102. return {"statusCode": 204}
  103. @with_logging
  104. @add_cors_headers
  105. @catch_errors
  106. def process_handler(event, context):
  107. if running_job_exists():
  108. raise ValueError("There is already a job in progress")
  109. job_id = str(uuid.uuid4())
  110. config = get_config()
  111. item = {
  112. "Id": job_id,
  113. "Sk": job_id,
  114. "Type": "Job",
  115. "JobStatus": "QUEUED",
  116. "GSIBucket": str(random.randint(0, bucket_count - 1)),
  117. "CreatedAt": utc_timestamp(),
  118. "CreatedBy": get_user_info(event),
  119. **{k: v for k, v in config.items() if k not in ["JobDetailsRetentionDays"]},
  120. }
  121. if int(config.get("JobDetailsRetentionDays", 0)) > 0:
  122. item["Expires"] = utc_timestamp(days=config["JobDetailsRetentionDays"])
  123. jobs_table.put_item(Item=item)
  124. return {"statusCode": 202, "body": json.dumps(item, cls=DecimalEncoder)}
  125. def validate_queue_items(items):
  126. for item in items:
  127. if item.get("Type", "Simple") == "Composite":
  128. is_array = isinstance(item["MatchId"], list)
  129. enough_columns = is_array and len(item["MatchId"]) > 0
  130. just_one_mapper = len(item["DataMappers"]) == 1
  131. if not is_array:
  132. raise ValueError(
  133. "MatchIds of Composite type need to be specified as array"
  134. )
  135. if not enough_columns:
  136. raise ValueError(
  137. "MatchIds of Composite type need to have a value for at least one column"
  138. )
  139. if not just_one_mapper:
  140. raise ValueError(
  141. "MatchIds of Composite type need to be associated to exactly one Data Mapper"
  142. )
  143. def enqueue_items(matches, user_info):
  144. items = []
  145. with deletion_queue_table.batch_writer() as batch:
  146. for match in matches:
  147. match_id = match["MatchId"]
  148. data_mappers = match.get("DataMappers", [])
  149. item = {
  150. "DeletionQueueItemId": str(uuid.uuid4()),
  151. "Type": match.get("Type", "Simple"),
  152. "MatchId": match_id,
  153. "CreatedAt": utc_timestamp(),
  154. "DataMappers": data_mappers,
  155. "CreatedBy": user_info,
  156. }
  157. batch.put_item(Item=item)
  158. items.append(item)
  159. return items