execute_query.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import os
  2. from operator import itemgetter
  3. import boto3
  4. from decorators import with_logging
  5. client = boto3.client("athena")
  6. COMPOSITE_JOIN_TOKEN = "_S3F2COMP_"
  7. glue_db = os.getenv("GlueDatabase", "s3f2_manifests_database")
  8. glue_table = os.getenv("JobManifestsGlueTable", "s3f2_manifests_table")
  9. @with_logging
  10. def handler(event, context):
  11. response = client.start_query_execution(
  12. QueryString=make_query(event["QueryData"]),
  13. ResultConfiguration={
  14. "OutputLocation": "s3://{bucket}/{prefix}/".format(
  15. bucket=event["Bucket"], prefix=event["Prefix"]
  16. )
  17. },
  18. WorkGroup=os.getenv("WorkGroup", "primary"),
  19. )
  20. return response["QueryExecutionId"]
  21. def make_query(query_data):
  22. """
  23. Returns a query which will look like
  24. SELECT DISTINCT "$path" FROM (
  25. SELECT t."$path"
  26. FROM "db"."table" t,
  27. "manifests_db"."manifests_table" m
  28. WHERE
  29. m."jobid"='job1234' AND
  30. m."datamapperid"='dm123' AND
  31. cast(t."customer_id" as varchar)=m."queryablematchid" AND
  32. m."queryablecolumns"='customer_id'
  33. AND partition_key = value
  34. UNION ALL
  35. SELECT t."$path"
  36. FROM "db"."table" t,
  37. "manifests_db"."manifests_table" m
  38. WHERE
  39. m."jobid"='job1234' AND
  40. m."datamapperid"='dm123' AND
  41. cast(t."other_customer_id" as varchar)=m."queryablematchid" AND
  42. m."queryablecolumns"='other_customer_id'
  43. AND partition_key = value
  44. )
  45. Note: 'queryablematchid' and 'queryablecolumns' is a convenience
  46. stringified value of match_id and its column when the match is simple,
  47. or a stringified joint value when composite (for instance,
  48. "John_S3F2COMP_Doe" and "first_name_S3F2COMP_last_name").
  49. JobId and DataMapperId are both used as partitions for the manifest to
  50. optimize query execution time.
  51. :param query_data: a dict which looks like
  52. {
  53. "Database":"db",
  54. "Table": "table",
  55. "Columns": [
  56. {"Column": "col", "Type": "Simple"},
  57. {
  58. "Columns": ["first_name", "last_name"],
  59. "Type": "Composite"
  60. }
  61. ],
  62. "PartitionKeys": [{"Key":"k", "Value":"val"}]
  63. }
  64. """
  65. distinct_template = """SELECT DISTINCT "$path" FROM ({column_unions})"""
  66. single_column_template = """
  67. SELECT t."$path"
  68. FROM "{db}"."{table}" t,
  69. "{manifest_db}"."{manifest_table}" m
  70. WHERE
  71. m."jobid"='{job_id}' AND
  72. m."datamapperid"='{data_mapper_id}' AND
  73. {queryable_matches}=m."queryablematchid" AND m."queryablecolumns"=\'{queryable_columns}\'
  74. {partition_filters}
  75. """
  76. indent = " " * 4
  77. cast_as_str = "cast(t.{} as varchar)"
  78. columns_composite_join_token = ", '{}', ".format(COMPOSITE_JOIN_TOKEN)
  79. db, table, columns, data_mapper_id, job_id = itemgetter(
  80. "Database", "Table", "Columns", "DataMapperId", "JobId"
  81. )(query_data)
  82. partitions = query_data.get("PartitionKeys", [])
  83. partition_filters = ""
  84. for partition in partitions:
  85. partition_filters += " AND {key} = {value} ".format(
  86. key=escape_column(partition["Key"]),
  87. value=escape_item(partition["Value"]),
  88. )
  89. column_unions = ""
  90. for i, col in enumerate(columns):
  91. if i > 0:
  92. column_unions += "\n" + indent + "UNION ALL\n"
  93. is_simple = col["Type"] == "Simple"
  94. queryable_matches = (
  95. cast_as_str.format(escape_column(col["Column"]))
  96. if is_simple
  97. else cast_as_str.format(escape_column(col["Columns"][0]))
  98. if len(col["Columns"]) == 1
  99. else "concat({})".format(
  100. columns_composite_join_token.join(
  101. "t.{0}".format(escape_column(c)) for c in col["Columns"]
  102. )
  103. )
  104. )
  105. queryable_columns = (
  106. col["Column"] if is_simple else COMPOSITE_JOIN_TOKEN.join(col["Columns"])
  107. )
  108. column_unions += single_column_template.format(
  109. db=db,
  110. table=table,
  111. manifest_db=glue_db,
  112. manifest_table=glue_table,
  113. job_id=job_id,
  114. data_mapper_id=data_mapper_id,
  115. queryable_matches=queryable_matches,
  116. queryable_columns=queryable_columns,
  117. partition_filters=partition_filters,
  118. )
  119. return distinct_template.format(column_unions=column_unions)
  120. def escape_column(item):
  121. return '"{}"'.format(item.replace('"', '""').replace(".", '"."'))
  122. def escape_item(item):
  123. if item is None:
  124. return "NULL"
  125. elif isinstance(item, (int, float)):
  126. return escape_number(item)
  127. elif isinstance(item, str):
  128. return escape_string(item)
  129. else:
  130. raise ValueError("Unable to process supplied value")
  131. def escape_number(item):
  132. return item
  133. def escape_string(item):
  134. return "'{}'".format(item.replace("'", "''"))