stats_updater.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. """
  2. Job Stats Updater
  3. """
  4. import json
  5. import logging
  6. import boto3
  7. from os import getenv
  8. from collections import Counter
  9. from boto_utils import DecimalEncoder
  10. logger = logging.getLogger()
  11. logger.setLevel(logging.INFO)
  12. ddb = boto3.resource("dynamodb")
  13. table = ddb.Table(getenv("JobTable", "S3F2_Jobs"))
  14. def update_stats(job_id, events):
  15. stats = _aggregate_stats(events)
  16. job = _update_job(job_id, stats)
  17. logger.info("Updated Stats for Job ID %s: %s", job_id, stats)
  18. return job
  19. def _aggregate_stats(events):
  20. stats = Counter({})
  21. for event in events:
  22. event_name = event["EventName"]
  23. event_data = event.get("EventData", {})
  24. if event_name in ["QuerySucceeded", "QueryFailed"]:
  25. stats += Counter(
  26. {
  27. "TotalQueryCount": 1,
  28. "TotalQuerySucceededCount": 1
  29. if event_name == "QuerySucceeded"
  30. else 0,
  31. "TotalQueryFailedCount": 1 if event_name == "QueryFailed" else 0,
  32. "TotalQueryScannedInBytes": event_data.get("Statistics", {}).get(
  33. "DataScannedInBytes", 0
  34. ),
  35. "TotalQueryTimeInMillis": event_data.get("Statistics", {}).get(
  36. "EngineExecutionTimeInMillis", 0
  37. ),
  38. }
  39. )
  40. if event_name in [
  41. "ObjectUpdated",
  42. "ObjectUpdateSkipped",
  43. "ObjectUpdateFailed",
  44. "ObjectRollbackFailed",
  45. ]:
  46. stats += Counter(
  47. {
  48. "TotalObjectUpdatedCount": 1
  49. if event_name == "ObjectUpdated"
  50. else 0,
  51. "TotalObjectUpdateSkippedCount": 1
  52. if event_name == "ObjectUpdateSkipped"
  53. else 0,
  54. "TotalObjectUpdateFailedCount": 1
  55. if event_name == "ObjectUpdateFailed"
  56. else 0,
  57. "TotalObjectRollbackFailedCount": 1
  58. if event_name == "ObjectRollbackFailed"
  59. else 0,
  60. }
  61. )
  62. return stats
  63. def _update_job(job_id, stats):
  64. try:
  65. return table.update_item(
  66. Key={
  67. "Id": job_id,
  68. "Sk": job_id,
  69. },
  70. ConditionExpression="#Id = :Id AND #Sk = :Sk",
  71. UpdateExpression="set #qt = if_not_exists(#qt, :z) + :qt, "
  72. "#qs = if_not_exists(#qs, :z) + :qs, "
  73. "#qf = if_not_exists(#qf, :z) + :qf, "
  74. "#qb = if_not_exists(#qb, :z) + :qb, "
  75. "#qm = if_not_exists(#qm, :z) + :qm, "
  76. "#ou = if_not_exists(#ou, :z) + :ou, "
  77. "#os = if_not_exists(#os, :z) + :os, "
  78. "#of = if_not_exists(#of, :z) + :of, "
  79. "#or = if_not_exists(#or, :z) + :or",
  80. ExpressionAttributeNames={
  81. "#Id": "Id",
  82. "#Sk": "Sk",
  83. "#qt": "TotalQueryCount",
  84. "#qs": "TotalQuerySucceededCount",
  85. "#qf": "TotalQueryFailedCount",
  86. "#qb": "TotalQueryScannedInBytes",
  87. "#qm": "TotalQueryTimeInMillis",
  88. "#ou": "TotalObjectUpdatedCount",
  89. "#os": "TotalObjectUpdateSkippedCount",
  90. "#of": "TotalObjectUpdateFailedCount",
  91. "#or": "TotalObjectRollbackFailedCount",
  92. },
  93. ExpressionAttributeValues={
  94. ":Id": job_id,
  95. ":Sk": job_id,
  96. ":qt": stats.get("TotalQueryCount", 0),
  97. ":qs": stats.get("TotalQuerySucceededCount", 0),
  98. ":qf": stats.get("TotalQueryFailedCount", 0),
  99. ":qb": stats.get("TotalQueryScannedInBytes", 0),
  100. ":qm": stats.get("TotalQueryTimeInMillis", 0),
  101. ":ou": stats.get("TotalObjectUpdatedCount", 0),
  102. ":os": stats.get("TotalObjectUpdateSkippedCount", 0),
  103. ":of": stats.get("TotalObjectUpdateFailedCount", 0),
  104. ":or": stats.get("TotalObjectRollbackFailedCount", 0),
  105. ":z": 0,
  106. },
  107. ReturnValues="ALL_NEW",
  108. )["Attributes"]
  109. except ddb.meta.client.exceptions.ConditionalCheckFailedException:
  110. logger.warning("Job %s does not exist", job_id)