__init__.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import logging
  2. import tempfile
  3. from pathlib import Path
  4. import pandas as pd
  5. import pyarrow as pa
  6. import pyarrow.json as pj
  7. import pyarrow.parquet as pq
  8. from cfn_flip import load
  9. from backend.ecs_tasks.delete_files.cse import decrypt
  10. logger = logging.getLogger()
  11. def load_template(template_name):
  12. project_root = Path(__file__).parent.parent.absolute()
  13. with open(project_root.joinpath("templates", template_name)) as f:
  14. return load(f.read())[0]
  15. def get_resources_from_template(template, resource_type=None):
  16. resources = template["Resources"]
  17. if not resource_type:
  18. return resources
  19. return {k: v for k, v in resources.items() if v["Type"] == resource_type}
  20. def get_schema_from_template(ddb_template, logical_identifier):
  21. resource = ddb_template["Resources"].get(logical_identifier)
  22. if not resource:
  23. raise KeyError("Unable to find resource with identifier %s", logical_identifier)
  24. return {
  25. k["KeyType"]: k["AttributeName"] for k in resource["Properties"]["KeySchema"]
  26. }
  27. def generate_parquet_file(items, columns):
  28. df = pd.DataFrame(items, columns=columns)
  29. table = pa.Table.from_pandas(df)
  30. tmp = tempfile.TemporaryFile()
  31. pq.write_table(table, tmp)
  32. return tmp
  33. def query_parquet_file(f, column, val):
  34. table = pq.read_table(f)
  35. return [i for i in table.column(column) if i.as_py() == val]
  36. def query_json_file(f, column, val):
  37. table = pj.read_json(f)
  38. return [i for i in table.column(column) if i.as_py() == val]
  39. def empty_table(table, pk, sk=None):
  40. items = table.scan()["Items"]
  41. with table.batch_writer() as batch:
  42. for item in items:
  43. key = {
  44. pk: item[pk],
  45. }
  46. if sk:
  47. key[sk] = item[sk]
  48. batch.delete_item(Key=key)
  49. def download_and_decrypt(bucket, object_key, kms_client):
  50. encrypted = tempfile.NamedTemporaryFile()
  51. bucket.download_file(object_key, encrypted.name)
  52. metadata = bucket.Object(object_key).metadata
  53. with open(encrypted.name, "rb") as f:
  54. content = decrypt(f, metadata, kms_client)
  55. return content.read(), metadata