12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import logging
- import tempfile
- from pathlib import Path
- import pandas as pd
- import pyarrow as pa
- import pyarrow.json as pj
- import pyarrow.parquet as pq
- from cfn_flip import load
- from backend.ecs_tasks.delete_files.cse import decrypt
- logger = logging.getLogger()
- def load_template(template_name):
- project_root = Path(__file__).parent.parent.absolute()
- with open(project_root.joinpath("templates", template_name)) as f:
- return load(f.read())[0]
- def get_resources_from_template(template, resource_type=None):
- resources = template["Resources"]
- if not resource_type:
- return resources
- return {k: v for k, v in resources.items() if v["Type"] == resource_type}
- def get_schema_from_template(ddb_template, logical_identifier):
- resource = ddb_template["Resources"].get(logical_identifier)
- if not resource:
- raise KeyError("Unable to find resource with identifier %s", logical_identifier)
- return {
- k["KeyType"]: k["AttributeName"] for k in resource["Properties"]["KeySchema"]
- }
- def generate_parquet_file(items, columns):
- df = pd.DataFrame(items, columns=columns)
- table = pa.Table.from_pandas(df)
- tmp = tempfile.TemporaryFile()
- pq.write_table(table, tmp)
- return tmp
- def query_parquet_file(f, column, val):
- table = pq.read_table(f)
- return [i for i in table.column(column) if i.as_py() == val]
- def query_json_file(f, column, val):
- table = pj.read_json(f)
- return [i for i in table.column(column) if i.as_py() == val]
- def empty_table(table, pk, sk=None):
- items = table.scan()["Items"]
- with table.batch_writer() as batch:
- for item in items:
- key = {
- pk: item[pk],
- }
- if sk:
- key[sk] = item[sk]
- batch.delete_item(Key=key)
- def download_and_decrypt(bucket, object_key, kms_client):
- encrypted = tempfile.NamedTemporaryFile()
- bucket.download_file(object_key, encrypted.name)
- metadata = bucket.Object(object_key).metadata
- with open(encrypted.name, "rb") as f:
- content = decrypt(f, metadata, kms_client)
- return content.read(), metadata
|