handlers.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. """
  2. Job handlers
  3. """
  4. import json
  5. import os
  6. import boto3
  7. from boto3.dynamodb.conditions import Key, Attr
  8. from boto_utils import DecimalEncoder, utc_timestamp
  9. from decorators import (
  10. with_logging,
  11. request_validator,
  12. catch_errors,
  13. add_cors_headers,
  14. load_schema,
  15. )
  16. ddb = boto3.resource("dynamodb")
  17. table = ddb.Table(os.getenv("JobTable", "S3F2_Jobs"))
  18. index = os.getenv("JobTableDateGSI", "Date-GSI")
  19. bucket_count = int(os.getenv("GSIBucketCount", 1))
  20. end_statuses = [
  21. "COMPLETED_CLEANUP_FAILED",
  22. "COMPLETED",
  23. "FAILED",
  24. "FIND_FAILED",
  25. "FORGET_FAILED",
  26. "FORGET_PARTIALLY_FAILED",
  27. ]
  28. job_summary_attributes = [
  29. "Id",
  30. "CreatedAt",
  31. "JobStatus",
  32. "JobFinishTime",
  33. "JobStartTime",
  34. "TotalObjectRollbackFailedCount",
  35. "TotalObjectUpdatedCount",
  36. "TotalObjectUpdateSkippedCount",
  37. "TotalObjectUpdateFailedCount",
  38. "TotalQueryCount",
  39. "TotalQueryFailedCount",
  40. "TotalQueryScannedInBytes",
  41. "TotalQuerySucceededCount",
  42. "TotalQueryTimeInMillis",
  43. ]
  44. @with_logging
  45. @add_cors_headers
  46. @request_validator(load_schema("get_job"))
  47. @catch_errors
  48. def get_job_handler(event, context):
  49. job_id = event["pathParameters"]["job_id"]
  50. resp = table.get_item(
  51. Key={
  52. "Id": job_id,
  53. "Sk": job_id,
  54. }
  55. )
  56. item = resp.get("Item")
  57. if not item:
  58. return {"statusCode": 404}
  59. return {"statusCode": 200, "body": json.dumps(item, cls=DecimalEncoder)}
  60. @with_logging
  61. @add_cors_headers
  62. @request_validator(load_schema("list_jobs"))
  63. @catch_errors
  64. def list_jobs_handler(event, context):
  65. qs = event.get("queryStringParameters")
  66. if not qs:
  67. qs = {}
  68. page_size = int(qs.get("page_size", 10))
  69. start_at = int(qs.get("start_at", utc_timestamp()))
  70. items = []
  71. for gsi_bucket in range(0, bucket_count):
  72. response = table.query(
  73. IndexName=index,
  74. KeyConditionExpression=Key("GSIBucket").eq(str(gsi_bucket))
  75. & Key("CreatedAt").lt(start_at),
  76. ScanIndexForward=False,
  77. Limit=page_size,
  78. ProjectionExpression=", ".join(job_summary_attributes),
  79. )
  80. items += response.get("Items", [])
  81. items = sorted(items, key=lambda i: i["CreatedAt"], reverse=True)[:page_size]
  82. if len(items) < page_size:
  83. next_start = None
  84. else:
  85. next_start = min([item["CreatedAt"] for item in items])
  86. return {
  87. "statusCode": 200,
  88. "body": json.dumps(
  89. {
  90. "Jobs": items,
  91. "NextStart": next_start,
  92. },
  93. cls=DecimalEncoder,
  94. ),
  95. }
  96. @with_logging
  97. @add_cors_headers
  98. @request_validator(load_schema("list_job_events"))
  99. @catch_errors
  100. def list_job_events_handler(event, context):
  101. # Input parsing
  102. job_id = event["pathParameters"]["job_id"]
  103. qs = event.get("queryStringParameters")
  104. mvqs = event.get("multiValueQueryStringParameters")
  105. if not qs:
  106. qs = {}
  107. mvqs = {}
  108. page_size = int(qs.get("page_size", 20))
  109. start_at = qs.get("start_at", "0")
  110. # Check the job exists
  111. job = table.get_item(
  112. Key={
  113. "Id": job_id,
  114. "Sk": job_id,
  115. }
  116. ).get("Item")
  117. if not job:
  118. return {"statusCode": 404}
  119. watermark_boundary_mu = (job.get("JobFinishTime", utc_timestamp()) + 1) * 1000
  120. # Check the watermark is not "future"
  121. if int(start_at.split("#")[0]) > watermark_boundary_mu:
  122. raise ValueError("Watermark {} is out of bounds for this job".format(start_at))
  123. # Apply filters
  124. filter_expression = Attr("Type").eq("JobEvent")
  125. user_filters = mvqs.get("filter", [])
  126. for f in user_filters:
  127. k, v = f.split("=")
  128. filter_expression = filter_expression & Attr(k).begins_with(v)
  129. # Because result may contain both JobEvent and Job items, we request max page_size+1 items then apply the type
  130. # filter as FilterExpression. We then limit the list size to the requested page size in case the number of
  131. # items after filtering is still page_size+1 i.e. the Job item wasn't on the page.
  132. items = []
  133. query_start_key = str(start_at)
  134. last_evaluated = None
  135. last_query_size = 0
  136. while len(items) < page_size:
  137. resp = table.query(
  138. KeyConditionExpression=Key("Id").eq(job_id),
  139. ScanIndexForward=True,
  140. FilterExpression=filter_expression,
  141. Limit=100 if len(user_filters) else page_size + 1,
  142. ExclusiveStartKey={"Id": job_id, "Sk": query_start_key},
  143. )
  144. results = resp.get("Items", [])
  145. last_query_size = len(results)
  146. items.extend(results[: page_size - len(items)])
  147. query_start_key = resp.get("LastEvaluatedKey", {}).get("Sk")
  148. if not query_start_key:
  149. break
  150. last_evaluated = query_start_key
  151. next_start = _get_watermark(
  152. items, start_at, page_size, job["JobStatus"], last_evaluated, last_query_size
  153. )
  154. resp = {
  155. k: v
  156. for k, v in {"JobEvents": items, "NextStart": next_start}.items()
  157. if v is not None
  158. }
  159. return {"statusCode": 200, "body": json.dumps(resp, cls=DecimalEncoder)}
  160. def _get_watermark(
  161. items,
  162. initial_start_key,
  163. page_size,
  164. job_status,
  165. last_evaluated_ddb_key,
  166. last_query_size,
  167. ):
  168. """
  169. Work out the watermark to return to the user using the following logic:
  170. 1. If the job is in progress, we always return a watermark but the source of the watermark
  171. is determined as follows:
  172. a. We've reached the last available items in DDB but filtering has left us with less than the desired page
  173. size but we have a LastEvaluatedKey that allows the client to skip the filtered items next time
  174. b. There is at least 1 event and there are (or will be) more items available
  175. c. There's no events after the supplied watermark so just return whatever the user sent
  176. 2. If the job is finished, return a watermark if the last query executed indicates there *might* be more
  177. results
  178. """
  179. next_start = None
  180. if job_status not in end_statuses:
  181. # Job is in progress
  182. if len(items) < page_size and last_evaluated_ddb_key:
  183. next_start = last_evaluated_ddb_key
  184. elif 0 < len(items) <= page_size:
  185. next_start = items[len(items) - 1]["Sk"]
  186. else:
  187. next_start = str(initial_start_key)
  188. # Job is finished but there are potentially more results
  189. elif last_query_size >= page_size:
  190. next_start = items[len(items) - 1]["Sk"]
  191. return next_start