123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import os
- from operator import itemgetter
- import boto3
- from decorators import with_logging
- client = boto3.client("athena")
- COMPOSITE_JOIN_TOKEN = "_S3F2COMP_"
- glue_db = os.getenv("GlueDatabase", "s3f2_manifests_database")
- glue_table = os.getenv("JobManifestsGlueTable", "s3f2_manifests_table")
- @with_logging
- def handler(event, context):
- response = client.start_query_execution(
- QueryString=make_query(event["QueryData"]),
- ResultConfiguration={
- "OutputLocation": "s3://{bucket}/{prefix}/".format(
- bucket=event["Bucket"], prefix=event["Prefix"]
- )
- },
- WorkGroup=os.getenv("WorkGroup", "primary"),
- )
- return response["QueryExecutionId"]
- def make_query(query_data):
- """
- Returns a query which will look like
- SELECT DISTINCT "$path" FROM (
- SELECT t."$path"
- FROM "db"."table" t,
- "manifests_db"."manifests_table" m
- WHERE
- m."jobid"='job1234' AND
- m."datamapperid"='dm123' AND
- cast(t."customer_id" as varchar)=m."queryablematchid" AND
- m."queryablecolumns"='customer_id'
- AND partition_key = value
- UNION ALL
- SELECT t."$path"
- FROM "db"."table" t,
- "manifests_db"."manifests_table" m
- WHERE
- m."jobid"='job1234' AND
- m."datamapperid"='dm123' AND
- cast(t."other_customer_id" as varchar)=m."queryablematchid" AND
- m."queryablecolumns"='other_customer_id'
- AND partition_key = value
- )
- Note: 'queryablematchid' and 'queryablecolumns' is a convenience
- stringified value of match_id and its column when the match is simple,
- or a stringified joint value when composite (for instance,
- "John_S3F2COMP_Doe" and "first_name_S3F2COMP_last_name").
- JobId and DataMapperId are both used as partitions for the manifest to
- optimize query execution time.
- :param query_data: a dict which looks like
- {
- "Database":"db",
- "Table": "table",
- "Columns": [
- {"Column": "col", "Type": "Simple"},
- {
- "Columns": ["first_name", "last_name"],
- "Type": "Composite"
- }
- ],
- "PartitionKeys": [{"Key":"k", "Value":"val"}]
- }
- """
- distinct_template = """SELECT DISTINCT "$path" FROM ({column_unions})"""
- single_column_template = """
- SELECT t."$path"
- FROM "{db}"."{table}" t,
- "{manifest_db}"."{manifest_table}" m
- WHERE
- m."jobid"='{job_id}' AND
- m."datamapperid"='{data_mapper_id}' AND
- {queryable_matches}=m."queryablematchid" AND m."queryablecolumns"=\'{queryable_columns}\'
- {partition_filters}
- """
- indent = " " * 4
- cast_as_str = "cast(t.{} as varchar)"
- columns_composite_join_token = ", '{}', ".format(COMPOSITE_JOIN_TOKEN)
- db, table, columns, data_mapper_id, job_id = itemgetter(
- "Database", "Table", "Columns", "DataMapperId", "JobId"
- )(query_data)
- partitions = query_data.get("PartitionKeys", [])
- partition_filters = ""
- for partition in partitions:
- partition_filters += " AND {key} = {value} ".format(
- key=escape_column(partition["Key"]),
- value=escape_item(partition["Value"]),
- )
- column_unions = ""
- for i, col in enumerate(columns):
- if i > 0:
- column_unions += "\n" + indent + "UNION ALL\n"
- is_simple = col["Type"] == "Simple"
- queryable_matches = (
- cast_as_str.format(escape_column(col["Column"]))
- if is_simple
- else cast_as_str.format(escape_column(col["Columns"][0]))
- if len(col["Columns"]) == 1
- else "concat({})".format(
- columns_composite_join_token.join(
- "t.{0}".format(escape_column(c)) for c in col["Columns"]
- )
- )
- )
- queryable_columns = (
- col["Column"] if is_simple else COMPOSITE_JOIN_TOKEN.join(col["Columns"])
- )
- column_unions += single_column_template.format(
- db=db,
- table=table,
- manifest_db=glue_db,
- manifest_table=glue_table,
- job_id=job_id,
- data_mapper_id=data_mapper_id,
- queryable_matches=queryable_matches,
- queryable_columns=queryable_columns,
- partition_filters=partition_filters,
- )
- return distinct_template.format(column_unions=column_unions)
- def escape_column(item):
- return '"{}"'.format(item.replace('"', '""').replace(".", '"."'))
- def escape_item(item):
- if item is None:
- return "NULL"
- elif isinstance(item, (int, float)):
- return escape_number(item)
- elif isinstance(item, str):
- return escape_string(item)
- else:
- raise ValueError("Unable to process supplied value")
- def escape_number(item):
- return item
- def escape_string(item):
- return "'{}'".format(item.replace("'", "''"))
|