status_updater.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """
  2. Job Status Updater
  3. """
  4. import json
  5. import logging
  6. import os
  7. import boto3
  8. from boto_utils import DecimalEncoder
  9. logger = logging.getLogger()
  10. logger.setLevel(logging.INFO)
  11. ddb = boto3.resource("dynamodb")
  12. table = ddb.Table(os.getenv("JobTable"))
  13. status_map = {
  14. "FindPhaseFailed": "FIND_FAILED",
  15. "ForgetPhaseFailed": "FORGET_FAILED",
  16. "Exception": "FAILED",
  17. "JobStarted": "RUNNING",
  18. "ForgetPhaseEnded": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  19. "CleanupFailed": "COMPLETED_CLEANUP_FAILED",
  20. "CleanupSucceeded": "COMPLETED",
  21. }
  22. unlocked_states = ["RUNNING", "QUEUED", "FORGET_COMPLETED_CLEANUP_IN_PROGRESS"]
  23. skip_cleanup_states = [
  24. "FIND_FAILED",
  25. "FORGET_FAILED",
  26. "FAILED",
  27. "FORGET_PARTIALLY_FAILED",
  28. ]
  29. time_statuses = {
  30. "JobStartTime": ["RUNNING"],
  31. "JobFinishTime": [
  32. "COMPLETED_CLEANUP_FAILED",
  33. "COMPLETED",
  34. "FAILED",
  35. "FIND_FAILED",
  36. "FORGET_FAILED",
  37. "FORGET_PARTIALLY_FAILED",
  38. ],
  39. }
  40. append_data_only_statuses = {
  41. "QueryPlanningComplete": ["GeneratedQueries", "DeletionQueueSize", "Manifests"]
  42. }
  43. def update_status(job_id, events):
  44. attr_updates = {}
  45. for event in events:
  46. # Handle non status events
  47. event_name = event["EventName"]
  48. if event_name not in status_map:
  49. if event_name in append_data_only_statuses:
  50. event_data = event.get("EventData", {})
  51. for attribute in append_data_only_statuses[event_name]:
  52. attr_updates[attribute] = event_data[attribute]
  53. continue
  54. new_status = determine_status(job_id, event_name)
  55. # Only change the status if it's still in an unlocked state
  56. if (
  57. not attr_updates.get("JobStatus")
  58. or attr_updates.get("JobStatus") in unlocked_states
  59. ):
  60. attr_updates["JobStatus"] = new_status
  61. # Update any job attributes
  62. for attr, statuses in time_statuses.items():
  63. if new_status in statuses and not attr_updates.get(attr):
  64. attr_updates[attr] = event["CreatedAt"]
  65. if len(attr_updates) > 0:
  66. job = _update_item(job_id, attr_updates)
  67. logger.info("Updated Status for Job ID %s: %s", job_id, attr_updates)
  68. return job
  69. def determine_status(job_id, event_name):
  70. new_status = status_map[event_name]
  71. if event_name == "ForgetPhaseEnded" and job_has_errors(job_id):
  72. return "FORGET_PARTIALLY_FAILED"
  73. return new_status
  74. def job_has_errors(job_id):
  75. item = table.get_item(
  76. Key={
  77. "Id": job_id,
  78. "Sk": job_id,
  79. },
  80. ConsistentRead=True,
  81. )["Item"]
  82. return (
  83. item.get("TotalObjectUpdateFailedCount", 0) > 0
  84. or item.get("TotalQueryFailedCount") > 0
  85. )
  86. def _update_item(job_id, attr_updates):
  87. try:
  88. update_expression = "set " + ", ".join(
  89. ["#{k} = :{k}".format(k=k) for k, v in attr_updates.items()]
  90. )
  91. attr_names = {}
  92. attr_values = {}
  93. for k, v in attr_updates.items():
  94. attr_names["#{}".format(k)] = k
  95. attr_values[":{}".format(k)] = v
  96. unlocked_states_condition = " OR ".join(
  97. ["#JobStatus = :{}".format(s) for s in unlocked_states]
  98. )
  99. return table.update_item(
  100. Key={
  101. "Id": job_id,
  102. "Sk": job_id,
  103. },
  104. UpdateExpression=update_expression,
  105. ConditionExpression="#Id = :Id AND #Sk = :Sk AND ({})".format(
  106. unlocked_states_condition
  107. ),
  108. ExpressionAttributeNames={
  109. "#Id": "Id",
  110. "#Sk": "Sk",
  111. "#JobStatus": "JobStatus",
  112. **attr_names,
  113. },
  114. ExpressionAttributeValues={
  115. ":Id": job_id,
  116. ":Sk": job_id,
  117. **{":{}".format(s): s for s in unlocked_states},
  118. **attr_values,
  119. },
  120. ReturnValues="ALL_NEW",
  121. )["Attributes"]
  122. except ddb.meta.client.exceptions.ConditionalCheckFailedException:
  123. logger.warning("Job %s is already in a status which cannot be updated", job_id)