submit_query_results.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. """
  2. Submits results from Athena queries to the Fargate deletion queue
  3. """
  4. import os
  5. import boto3
  6. from decorators import with_logging
  7. from boto_utils import paginate, batch_sqs_msgs
  8. athena = boto3.client("athena")
  9. sqs = boto3.resource("sqs")
  10. queue = sqs.Queue(os.getenv("QueueUrl"))
  11. MSG_BATCH_SIZE = 500
  12. @with_logging
  13. def handler(event, context):
  14. query_id = event["QueryId"]
  15. results = paginate(
  16. athena, athena.get_query_results, ["ResultSet.Rows"], QueryExecutionId=query_id
  17. )
  18. messages = []
  19. msg_count = 0
  20. path_field_index = None
  21. for result in results:
  22. is_header_row = path_field_index is None
  23. if is_header_row:
  24. path_field_index = next(
  25. (
  26. index
  27. for (index, d) in enumerate(result["Data"])
  28. if d["VarCharValue"] == "$path"
  29. ),
  30. None,
  31. )
  32. else:
  33. msg_count += 1
  34. path = result["Data"][path_field_index]["VarCharValue"]
  35. msg = {
  36. "JobId": event["JobId"],
  37. "Object": path,
  38. "Columns": event["Columns"],
  39. "RoleArn": event.get("RoleArn", None),
  40. "DeleteOldVersions": event.get("DeleteOldVersions", True),
  41. "IgnoreObjectNotFoundExceptions": event.get(
  42. "IgnoreObjectNotFoundExceptions", False
  43. ),
  44. "Format": event.get("Format"),
  45. "Manifest": event.get("Manifest"),
  46. }
  47. messages.append({k: v for k, v in msg.items() if v is not None})
  48. if len(messages) >= MSG_BATCH_SIZE:
  49. batch_sqs_msgs(queue, messages)
  50. messages = []
  51. if len(messages) > 0:
  52. batch_sqs_msgs(queue, messages)
  53. return msg_count