test_generate_queries.py 72 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893
  1. import json
  2. import os
  3. from types import SimpleNamespace
  4. import mock
  5. import pytest
  6. from mock import patch, MagicMock
  7. with patch.dict(os.environ, {"QueryQueue": "test"}):
  8. from backend.lambdas.tasks.generate_queries import (
  9. cast_to_type,
  10. column_mapper,
  11. generate_athena_queries,
  12. get_data_mappers,
  13. get_deletion_queue,
  14. get_inner_children,
  15. get_nested_children,
  16. get_partitions,
  17. get_table,
  18. handler,
  19. write_partitions,
  20. )
  21. pytestmark = [pytest.mark.unit, pytest.mark.task]
  22. def lists_equal_ignoring_order(a, b):
  23. a = a.copy()
  24. try:
  25. for item in b:
  26. a.remove(item)
  27. except ValueError:
  28. return False
  29. return not a
  30. @patch("backend.lambdas.tasks.generate_queries.write_partitions")
  31. @patch("backend.lambdas.tasks.generate_queries.batch_sqs_msgs")
  32. @patch("backend.lambdas.tasks.generate_queries.get_deletion_queue")
  33. @patch("backend.lambdas.tasks.generate_queries.get_data_mappers")
  34. @patch("backend.lambdas.tasks.generate_queries.generate_athena_queries")
  35. def test_it_generates_queries_writes_manifests_populates_queue_and_returns_result(
  36. gen_athena_queries,
  37. get_data_mappers,
  38. get_del_q,
  39. batch_sqs_msgs_mock,
  40. write_partitions_mock,
  41. ):
  42. queue = [{"MatchId": "hi", "DeletionQueueItemId": "id123"}]
  43. queries = [
  44. {
  45. "DataMapperId": "a",
  46. "QueryExecutor": "athena",
  47. "Format": "parquet",
  48. "Database": "test_db",
  49. "Table": "test_table",
  50. "Columns": [{"Column": "customer_id"}],
  51. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  52. "DeleteOldVersions": True,
  53. "IgnoreObjectNotFoundExceptions": False,
  54. "Manifest": "s3://S3F2-manifests-bucket/manifests/test/a/manifest.json",
  55. }
  56. ]
  57. data_mapper = {
  58. "DataMapperId": "a",
  59. "QueryExecutor": "athena",
  60. "Columns": ["customer_id"],
  61. "Format": "parquet",
  62. "QueryExecutorParameters": {
  63. "DataCatalogProvider": "glue",
  64. "Database": "test_db",
  65. "Table": "test_table",
  66. },
  67. }
  68. get_del_q.return_value = queue
  69. gen_athena_queries.return_value = queries
  70. get_data_mappers.return_value = iter([data_mapper])
  71. result = handler({"ExecutionName": "test"}, SimpleNamespace())
  72. gen_athena_queries.assert_called_with(data_mapper, queue, "test")
  73. write_partitions_mock.assert_called_with([["test", "a"]])
  74. batch_sqs_msgs_mock.assert_called_with(mock.ANY, queries)
  75. assert result == {
  76. "GeneratedQueries": 1,
  77. "DeletionQueueSize": 1,
  78. "Manifests": ["s3://S3F2-manifests-bucket/manifests/test/a/manifest.json"],
  79. }
  80. @patch("backend.lambdas.tasks.generate_queries.batch_sqs_msgs")
  81. @patch("backend.lambdas.tasks.generate_queries.get_deletion_queue")
  82. @patch("backend.lambdas.tasks.generate_queries.get_data_mappers")
  83. def test_it_raises_for_unknown_query_executor(
  84. get_data_mappers, get_del_q, batch_sqs_msgs_mock
  85. ):
  86. get_del_q.return_value = [{"MatchId": "hi"}]
  87. get_data_mappers.return_value = iter(
  88. [
  89. {
  90. "DataMapperId": "a",
  91. "QueryExecutor": "invalid",
  92. "Columns": ["customer_id"],
  93. "Format": "parquet",
  94. "QueryExecutorParameters": {
  95. "DataCatalogProvider": "glue",
  96. "Database": "test_db",
  97. "Table": "test_table",
  98. },
  99. }
  100. ]
  101. )
  102. with pytest.raises(NotImplementedError):
  103. handler({"ExecutionName": "test"}, SimpleNamespace())
  104. batch_sqs_msgs_mock.assert_not_called()
  105. class TestAthenaQueries:
  106. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  107. @patch("backend.lambdas.tasks.generate_queries.get_table")
  108. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  109. def test_it_handles_single_columns(
  110. self, get_partitions_mock, get_table_mock, bucket_mock
  111. ):
  112. put_object_mock = MagicMock()
  113. bucket_mock.return_value = put_object_mock
  114. columns = [{"Name": "customer_id"}]
  115. partition_keys = ["product_category"]
  116. partitions = [["Books"]]
  117. get_table_mock.return_value = table_stub(columns, partition_keys)
  118. get_partitions_mock.return_value = [
  119. partition_stub(p, columns) for p in partitions
  120. ]
  121. resp = generate_athena_queries(
  122. {
  123. "DataMapperId": "a",
  124. "QueryExecutor": "athena",
  125. "Columns": [col["Name"] for col in columns],
  126. "Format": "parquet",
  127. "QueryExecutorParameters": {
  128. "DataCatalogProvider": "glue",
  129. "Database": "test_db",
  130. "Table": "test_table",
  131. },
  132. },
  133. [
  134. {
  135. "MatchId": "hi",
  136. "CreatedAt": 1614698440,
  137. "DeletionQueueItemId": "id-01",
  138. }
  139. ],
  140. "job_1234567890",
  141. )
  142. assert resp == [
  143. {
  144. "DataMapperId": "a",
  145. "QueryExecutor": "athena",
  146. "Format": "parquet",
  147. "Database": "test_db",
  148. "Table": "test_table",
  149. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  150. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  151. "DeleteOldVersions": True,
  152. "IgnoreObjectNotFoundExceptions": False,
  153. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  154. }
  155. ]
  156. put_object_mock.put_object.assert_called_with(
  157. Key="manifests/job_1234567890/a/manifest.json",
  158. Body=json.dumps(
  159. {
  160. "Columns": ["customer_id"],
  161. "MatchId": ["hi"],
  162. "DeletionQueueItemId": "id-01",
  163. "CreatedAt": 1614698440,
  164. "QueryableColumns": "customer_id",
  165. "QueryableMatchId": "hi",
  166. }
  167. )
  168. + "\n",
  169. )
  170. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  171. @patch("backend.lambdas.tasks.generate_queries.get_table")
  172. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  173. def test_it_handles_int_matches(
  174. self, get_partitions_mock, get_table_mock, bucket_mock
  175. ):
  176. put_object_mock = MagicMock()
  177. bucket_mock.return_value = put_object_mock
  178. columns = [{"Name": "customer_id", "Type": "int"}]
  179. partition_keys = ["product_category"]
  180. partitions = [["Books"]]
  181. get_table_mock.return_value = table_stub(columns, partition_keys)
  182. get_partitions_mock.return_value = [
  183. partition_stub(p, columns) for p in partitions
  184. ]
  185. resp = generate_athena_queries(
  186. {
  187. "DataMapperId": "a",
  188. "QueryExecutor": "athena",
  189. "Columns": [col["Name"] for col in columns],
  190. "Format": "parquet",
  191. "QueryExecutorParameters": {
  192. "DataCatalogProvider": "glue",
  193. "Database": "test_db",
  194. "Table": "test_table",
  195. },
  196. },
  197. [
  198. {
  199. "MatchId": 12345,
  200. "CreatedAt": 1614698440,
  201. "DeletionQueueItemId": "id-01",
  202. },
  203. {
  204. "MatchId": 23456,
  205. "CreatedAt": 1614698440,
  206. "DeletionQueueItemId": "id-02",
  207. },
  208. ],
  209. "job_1234567890",
  210. )
  211. assert resp == [
  212. {
  213. "DataMapperId": "a",
  214. "QueryExecutor": "athena",
  215. "Format": "parquet",
  216. "Database": "test_db",
  217. "Table": "test_table",
  218. "Columns": [
  219. {
  220. "Column": "customer_id",
  221. "Type": "Simple",
  222. }
  223. ],
  224. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  225. "DeleteOldVersions": True,
  226. "IgnoreObjectNotFoundExceptions": False,
  227. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  228. }
  229. ]
  230. put_object_mock.put_object.assert_called_with(
  231. Key="manifests/job_1234567890/a/manifest.json",
  232. Body=(
  233. json.dumps(
  234. {
  235. "Columns": ["customer_id"],
  236. "MatchId": [12345],
  237. "DeletionQueueItemId": "id-01",
  238. "CreatedAt": 1614698440,
  239. "QueryableColumns": "customer_id",
  240. "QueryableMatchId": "12345",
  241. }
  242. )
  243. + "\n"
  244. + json.dumps(
  245. {
  246. "Columns": ["customer_id"],
  247. "MatchId": [23456],
  248. "DeletionQueueItemId": "id-02",
  249. "CreatedAt": 1614698440,
  250. "QueryableColumns": "customer_id",
  251. "QueryableMatchId": "23456",
  252. }
  253. )
  254. + "\n"
  255. ),
  256. )
  257. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  258. @patch("backend.lambdas.tasks.generate_queries.get_table")
  259. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  260. def test_it_handles_decimal_matches(
  261. self, get_partitions_mock, get_table_mock, bucket_mock
  262. ):
  263. put_object_mock = MagicMock()
  264. bucket_mock.return_value = put_object_mock
  265. columns = [{"Name": "customer_id", "Type": "decimal"}]
  266. partition_keys = ["product_category"]
  267. partitions = [["Books"]]
  268. get_table_mock.return_value = table_stub(columns, partition_keys)
  269. get_partitions_mock.return_value = [
  270. partition_stub(p, columns) for p in partitions
  271. ]
  272. resp = generate_athena_queries(
  273. {
  274. "DataMapperId": "a",
  275. "QueryExecutor": "athena",
  276. "Columns": [col["Name"] for col in columns],
  277. "Format": "parquet",
  278. "QueryExecutorParameters": {
  279. "DataCatalogProvider": "glue",
  280. "Database": "test_db",
  281. "Table": "test_table",
  282. },
  283. },
  284. [
  285. {
  286. "MatchId": "12.30",
  287. "CreatedAt": 1614698440,
  288. "DeletionQueueItemId": "id-01",
  289. },
  290. {
  291. "MatchId": "23.400",
  292. "CreatedAt": 1614698440,
  293. "DeletionQueueItemId": "id-02",
  294. },
  295. ],
  296. "job_1234567890",
  297. )
  298. assert resp == [
  299. {
  300. "DataMapperId": "a",
  301. "QueryExecutor": "athena",
  302. "Format": "parquet",
  303. "Database": "test_db",
  304. "Table": "test_table",
  305. "Columns": [
  306. {
  307. "Column": "customer_id",
  308. "Type": "Simple",
  309. }
  310. ],
  311. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  312. "DeleteOldVersions": True,
  313. "IgnoreObjectNotFoundExceptions": False,
  314. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  315. }
  316. ]
  317. put_object_mock.put_object.assert_called_with(
  318. Key="manifests/job_1234567890/a/manifest.json",
  319. Body=(
  320. json.dumps(
  321. {
  322. "Columns": ["customer_id"],
  323. "MatchId": ["12.30"],
  324. "DeletionQueueItemId": "id-01",
  325. "CreatedAt": 1614698440,
  326. "QueryableColumns": "customer_id",
  327. "QueryableMatchId": "12.30",
  328. }
  329. )
  330. + "\n"
  331. + json.dumps(
  332. {
  333. "Columns": ["customer_id"],
  334. "MatchId": ["23.400"],
  335. "DeletionQueueItemId": "id-02",
  336. "CreatedAt": 1614698440,
  337. "QueryableColumns": "customer_id",
  338. "QueryableMatchId": "23.400",
  339. }
  340. )
  341. + "\n"
  342. ),
  343. )
  344. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  345. @patch("backend.lambdas.tasks.generate_queries.get_table")
  346. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  347. def test_it_handles_int_partitions(
  348. self, get_partitions_mock, get_table_mock, bucket_mock
  349. ):
  350. put_object_mock = MagicMock()
  351. bucket_mock.return_value = put_object_mock
  352. columns = [{"Name": "customer_id"}]
  353. partition_keys = ["year"]
  354. partitions = [["2010"]]
  355. get_table_mock.return_value = table_stub(
  356. columns, partition_keys, partition_keys_type="int"
  357. )
  358. get_partitions_mock.return_value = [
  359. partition_stub(p, columns) for p in partitions
  360. ]
  361. resp = generate_athena_queries(
  362. {
  363. "DataMapperId": "a",
  364. "QueryExecutor": "athena",
  365. "Columns": [col["Name"] for col in columns],
  366. "Format": "parquet",
  367. "QueryExecutorParameters": {
  368. "DataCatalogProvider": "glue",
  369. "Database": "test_db",
  370. "Table": "test_table",
  371. },
  372. },
  373. [
  374. {
  375. "MatchId": "hi",
  376. "CreatedAt": 1614698440,
  377. "DeletionQueueItemId": "id-01",
  378. }
  379. ],
  380. "job_1234567890",
  381. )
  382. assert resp == [
  383. {
  384. "DataMapperId": "a",
  385. "QueryExecutor": "athena",
  386. "Format": "parquet",
  387. "Database": "test_db",
  388. "Table": "test_table",
  389. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  390. "PartitionKeys": [{"Key": "year", "Value": 2010}],
  391. "DeleteOldVersions": True,
  392. "IgnoreObjectNotFoundExceptions": False,
  393. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  394. }
  395. ]
  396. put_object_mock.put_object.assert_called_with(
  397. Key="manifests/job_1234567890/a/manifest.json",
  398. Body=json.dumps(
  399. {
  400. "Columns": ["customer_id"],
  401. "MatchId": ["hi"],
  402. "DeletionQueueItemId": "id-01",
  403. "CreatedAt": 1614698440,
  404. "QueryableColumns": "customer_id",
  405. "QueryableMatchId": "hi",
  406. }
  407. )
  408. + "\n",
  409. )
  410. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  411. @patch("backend.lambdas.tasks.generate_queries.get_table")
  412. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  413. def test_it_handles_multiple_columns(
  414. self, get_partitions_mock, get_table_mock, bucket_mock
  415. ):
  416. put_object_mock = MagicMock()
  417. bucket_mock.return_value = put_object_mock
  418. columns = [{"Name": "customer_id"}, {"Name": "alt_customer_id"}]
  419. partition_keys = ["product_category"]
  420. partitions = [["Books"]]
  421. get_table_mock.return_value = table_stub(columns, partition_keys)
  422. get_partitions_mock.return_value = [
  423. partition_stub(p, columns) for p in partitions
  424. ]
  425. resp = generate_athena_queries(
  426. {
  427. "DataMapperId": "a",
  428. "QueryExecutor": "athena",
  429. "Columns": [col["Name"] for col in columns],
  430. "Format": "parquet",
  431. "QueryExecutorParameters": {
  432. "DataCatalogProvider": "glue",
  433. "Database": "test_db",
  434. "Table": "test_table",
  435. },
  436. },
  437. [
  438. {
  439. "MatchId": "hi",
  440. "CreatedAt": 1614698440,
  441. "DeletionQueueItemId": "id-01",
  442. }
  443. ],
  444. "job_1234567890",
  445. )
  446. assert resp == [
  447. {
  448. "DataMapperId": "a",
  449. "QueryExecutor": "athena",
  450. "Format": "parquet",
  451. "Database": "test_db",
  452. "Table": "test_table",
  453. "Columns": [
  454. {"Column": "customer_id", "Type": "Simple"},
  455. {"Column": "alt_customer_id", "Type": "Simple"},
  456. ],
  457. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  458. "DeleteOldVersions": True,
  459. "IgnoreObjectNotFoundExceptions": False,
  460. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  461. }
  462. ]
  463. put_object_mock.put_object.assert_called_with(
  464. Key="manifests/job_1234567890/a/manifest.json",
  465. Body=(
  466. json.dumps(
  467. {
  468. "Columns": ["customer_id"],
  469. "MatchId": ["hi"],
  470. "DeletionQueueItemId": "id-01",
  471. "CreatedAt": 1614698440,
  472. "QueryableColumns": "customer_id",
  473. "QueryableMatchId": "hi",
  474. }
  475. )
  476. + "\n"
  477. + json.dumps(
  478. {
  479. "Columns": ["alt_customer_id"],
  480. "MatchId": ["hi"],
  481. "DeletionQueueItemId": "id-01",
  482. "CreatedAt": 1614698440,
  483. "QueryableColumns": "alt_customer_id",
  484. "QueryableMatchId": "hi",
  485. }
  486. )
  487. + "\n"
  488. ),
  489. )
  490. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  491. @patch("backend.lambdas.tasks.generate_queries.get_table")
  492. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  493. def test_it_handles_composite_columns(
  494. self, get_partitions_mock, get_table_mock, bucket_mock
  495. ):
  496. put_object_mock = MagicMock()
  497. bucket_mock.return_value = put_object_mock
  498. columns = [
  499. {"Name": "first_name"},
  500. {"Name": "last_name"},
  501. ]
  502. partition_keys = ["product_category"]
  503. partitions = [["Books"]]
  504. get_table_mock.return_value = table_stub(columns, partition_keys)
  505. get_partitions_mock.return_value = [
  506. partition_stub(p, columns) for p in partitions
  507. ]
  508. resp = generate_athena_queries(
  509. {
  510. "DataMapperId": "a",
  511. "QueryExecutor": "athena",
  512. "Columns": [col["Name"] for col in columns],
  513. "Format": "parquet",
  514. "QueryExecutorParameters": {
  515. "DataCatalogProvider": "glue",
  516. "Database": "test_db",
  517. "Table": "test_table",
  518. },
  519. },
  520. [
  521. {
  522. "MatchId": [
  523. {"Column": "first_name", "Value": "John"},
  524. {"Column": "last_name", "Value": "Doe"},
  525. ],
  526. "Type": "Composite",
  527. "DataMappers": ["a"],
  528. "CreatedAt": 1614698440,
  529. "DeletionQueueItemId": "id1234",
  530. }
  531. ],
  532. "job_1234567890",
  533. )
  534. assert resp == [
  535. {
  536. "DataMapperId": "a",
  537. "QueryExecutor": "athena",
  538. "Format": "parquet",
  539. "Database": "test_db",
  540. "Table": "test_table",
  541. "Columns": [
  542. {
  543. "Columns": ["first_name", "last_name"],
  544. "Type": "Composite",
  545. }
  546. ],
  547. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  548. "DeleteOldVersions": True,
  549. "IgnoreObjectNotFoundExceptions": False,
  550. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  551. }
  552. ]
  553. put_object_mock.put_object.assert_called_with(
  554. Key="manifests/job_1234567890/a/manifest.json",
  555. Body=(
  556. json.dumps(
  557. {
  558. "Columns": ["first_name", "last_name"],
  559. "MatchId": ["John", "Doe"],
  560. "DeletionQueueItemId": "id1234",
  561. "CreatedAt": 1614698440,
  562. "QueryableColumns": "first_name_S3F2COMP_last_name",
  563. "QueryableMatchId": "John_S3F2COMP_Doe",
  564. }
  565. )
  566. + "\n"
  567. ),
  568. )
  569. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  570. @patch("backend.lambdas.tasks.generate_queries.get_table")
  571. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  572. def test_it_handles_single_composite_column(
  573. self, get_partitions_mock, get_table_mock, bucket_mock
  574. ):
  575. put_object_mock = MagicMock()
  576. bucket_mock.return_value = put_object_mock
  577. columns = [{"Name": "first_name"}]
  578. partition_keys = ["product_category"]
  579. partitions = [["Books"]]
  580. get_table_mock.return_value = table_stub(columns, partition_keys)
  581. get_partitions_mock.return_value = [
  582. partition_stub(p, columns) for p in partitions
  583. ]
  584. resp = generate_athena_queries(
  585. {
  586. "DataMapperId": "a",
  587. "QueryExecutor": "athena",
  588. "Columns": [col["Name"] for col in columns],
  589. "Format": "parquet",
  590. "QueryExecutorParameters": {
  591. "DataCatalogProvider": "glue",
  592. "Database": "test_db",
  593. "Table": "test_table",
  594. },
  595. },
  596. [
  597. {
  598. "MatchId": [{"Column": "first_name", "Value": "John"}],
  599. "Type": "Composite",
  600. "DataMappers": ["a"],
  601. "CreatedAt": 1614698440,
  602. "DeletionQueueItemId": "id1234",
  603. }
  604. ],
  605. "job_1234567890",
  606. )
  607. assert resp == [
  608. {
  609. "DataMapperId": "a",
  610. "QueryExecutor": "athena",
  611. "Format": "parquet",
  612. "Database": "test_db",
  613. "Table": "test_table",
  614. "Columns": [
  615. {
  616. "Columns": ["first_name"],
  617. "Type": "Composite",
  618. }
  619. ],
  620. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  621. "DeleteOldVersions": True,
  622. "IgnoreObjectNotFoundExceptions": False,
  623. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  624. }
  625. ]
  626. put_object_mock.put_object.assert_called_with(
  627. Key="manifests/job_1234567890/a/manifest.json",
  628. Body=(
  629. json.dumps(
  630. {
  631. "Columns": ["first_name"],
  632. "MatchId": ["John"],
  633. "DeletionQueueItemId": "id1234",
  634. "CreatedAt": 1614698440,
  635. "QueryableColumns": "first_name",
  636. "QueryableMatchId": "John",
  637. }
  638. )
  639. + "\n"
  640. ),
  641. )
  642. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  643. @patch("backend.lambdas.tasks.generate_queries.get_table")
  644. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  645. def test_it_handles_mixed_columns(
  646. self, get_partitions_mock, get_table_mock, bucket_mock
  647. ):
  648. put_object_mock = MagicMock()
  649. bucket_mock.return_value = put_object_mock
  650. columns = [
  651. {"Name": "customer_id"},
  652. {"Name": "first_name"},
  653. {"Name": "last_name"},
  654. {"Name": "age", "Type": "int"},
  655. ]
  656. partition_keys = ["product_category"]
  657. partitions = [["Books"]]
  658. get_table_mock.return_value = table_stub(columns, partition_keys)
  659. get_partitions_mock.return_value = [
  660. partition_stub(p, columns) for p in partitions
  661. ]
  662. resp = generate_athena_queries(
  663. {
  664. "DataMapperId": "a",
  665. "QueryExecutor": "athena",
  666. "Columns": [col["Name"] for col in columns],
  667. "Format": "parquet",
  668. "QueryExecutorParameters": {
  669. "DataCatalogProvider": "glue",
  670. "Database": "test_db",
  671. "Table": "test_table",
  672. },
  673. },
  674. [
  675. {
  676. "MatchId": "12345",
  677. "Type": "Simple",
  678. "CreatedAt": 1614698440,
  679. "DeletionQueueItemId": "id001",
  680. },
  681. {
  682. "MatchId": "23456",
  683. "Type": "Simple",
  684. "CreatedAt": 1614698440,
  685. "DeletionQueueItemId": "id002",
  686. },
  687. {
  688. "MatchId": "23456",
  689. "Type": "Simple",
  690. "CreatedAt": 1614698440,
  691. "DeletionQueueItemId": "id003",
  692. },
  693. {
  694. "MatchId": [
  695. {"Column": "first_name", "Value": "John"},
  696. {"Column": "last_name", "Value": "Doe"},
  697. ],
  698. "Type": "Composite",
  699. "DataMappers": ["a"],
  700. "CreatedAt": 1614698440,
  701. "DeletionQueueItemId": "id004",
  702. },
  703. {
  704. "MatchId": [
  705. {"Column": "first_name", "Value": "Jane"},
  706. {"Column": "last_name", "Value": "Doe"},
  707. ],
  708. "Type": "Composite",
  709. "DataMappers": ["a"],
  710. "CreatedAt": 1614698440,
  711. "DeletionQueueItemId": "id005",
  712. },
  713. {
  714. "MatchId": [
  715. {"Column": "first_name", "Value": "Jane"},
  716. {"Column": "last_name", "Value": "Doe"},
  717. ],
  718. "Type": "Composite",
  719. "DataMappers": ["a"],
  720. "CreatedAt": 1614698440,
  721. "DeletionQueueItemId": "id006",
  722. },
  723. {
  724. "MatchId": [
  725. {"Column": "last_name", "Value": "Smith"},
  726. {"Column": "age", "Value": "28"},
  727. ],
  728. "Type": "Composite",
  729. "DataMappers": ["a"],
  730. "CreatedAt": 1614698440,
  731. "DeletionQueueItemId": "id007",
  732. },
  733. ],
  734. "job1234567890",
  735. )
  736. assert resp == [
  737. {
  738. "DataMapperId": "a",
  739. "QueryExecutor": "athena",
  740. "Format": "parquet",
  741. "Database": "test_db",
  742. "Table": "test_table",
  743. "Columns": [
  744. {
  745. "Column": "customer_id",
  746. "Type": "Simple",
  747. },
  748. {
  749. "Column": "first_name",
  750. "Type": "Simple",
  751. },
  752. {
  753. "Column": "last_name",
  754. "Type": "Simple",
  755. },
  756. {"Column": "age", "Type": "Simple"},
  757. {
  758. "Columns": ["first_name", "last_name"],
  759. "Type": "Composite",
  760. },
  761. {
  762. "Columns": ["age", "last_name"],
  763. "Type": "Composite",
  764. },
  765. ],
  766. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  767. "DeleteOldVersions": True,
  768. "IgnoreObjectNotFoundExceptions": False,
  769. "Manifest": "s3://S3F2-manifests-bucket/manifests/job1234567890/a/manifest.json",
  770. }
  771. ]
  772. put_object_mock.put_object.assert_called_with(
  773. Key="manifests/job1234567890/a/manifest.json",
  774. Body=(
  775. # id001 simple on all columns
  776. json.dumps(
  777. {
  778. "Columns": ["customer_id"],
  779. "MatchId": ["12345"],
  780. "DeletionQueueItemId": "id001",
  781. "CreatedAt": 1614698440,
  782. "QueryableColumns": "customer_id",
  783. "QueryableMatchId": "12345",
  784. }
  785. )
  786. + "\n"
  787. + json.dumps(
  788. {
  789. "Columns": ["first_name"],
  790. "MatchId": ["12345"],
  791. "DeletionQueueItemId": "id001",
  792. "CreatedAt": 1614698440,
  793. "QueryableColumns": "first_name",
  794. "QueryableMatchId": "12345",
  795. }
  796. )
  797. + "\n"
  798. + json.dumps(
  799. {
  800. "Columns": ["last_name"],
  801. "MatchId": ["12345"],
  802. "DeletionQueueItemId": "id001",
  803. "CreatedAt": 1614698440,
  804. "QueryableColumns": "last_name",
  805. "QueryableMatchId": "12345",
  806. }
  807. )
  808. + "\n"
  809. + json.dumps(
  810. {
  811. "Columns": ["age"],
  812. "MatchId": [12345],
  813. "DeletionQueueItemId": "id001",
  814. "CreatedAt": 1614698440,
  815. "QueryableColumns": "age",
  816. "QueryableMatchId": "12345",
  817. }
  818. )
  819. + "\n"
  820. # id002 simple on all columns
  821. + json.dumps(
  822. {
  823. "Columns": ["customer_id"],
  824. "MatchId": ["23456"],
  825. "DeletionQueueItemId": "id002",
  826. "CreatedAt": 1614698440,
  827. "QueryableColumns": "customer_id",
  828. "QueryableMatchId": "23456",
  829. }
  830. )
  831. + "\n"
  832. + json.dumps(
  833. {
  834. "Columns": ["first_name"],
  835. "MatchId": ["23456"],
  836. "DeletionQueueItemId": "id002",
  837. "CreatedAt": 1614698440,
  838. "QueryableColumns": "first_name",
  839. "QueryableMatchId": "23456",
  840. }
  841. )
  842. + "\n"
  843. + json.dumps(
  844. {
  845. "Columns": ["last_name"],
  846. "MatchId": ["23456"],
  847. "DeletionQueueItemId": "id002",
  848. "CreatedAt": 1614698440,
  849. "QueryableColumns": "last_name",
  850. "QueryableMatchId": "23456",
  851. }
  852. )
  853. + "\n"
  854. + json.dumps(
  855. {
  856. "Columns": ["age"],
  857. "MatchId": [23456],
  858. "DeletionQueueItemId": "id002",
  859. "CreatedAt": 1614698440,
  860. "QueryableColumns": "age",
  861. "QueryableMatchId": "23456",
  862. }
  863. )
  864. + "\n"
  865. # id003 is a id002 clone
  866. # Values are same as id002 but we cannot deduplicate
  867. # as we need id003 too for the cleanup phase
  868. + json.dumps(
  869. {
  870. "Columns": ["customer_id"],
  871. "MatchId": ["23456"],
  872. "DeletionQueueItemId": "id003",
  873. "CreatedAt": 1614698440,
  874. "QueryableColumns": "customer_id",
  875. "QueryableMatchId": "23456",
  876. }
  877. )
  878. + "\n"
  879. + json.dumps(
  880. {
  881. "Columns": ["first_name"],
  882. "MatchId": ["23456"],
  883. "DeletionQueueItemId": "id003",
  884. "CreatedAt": 1614698440,
  885. "QueryableColumns": "first_name",
  886. "QueryableMatchId": "23456",
  887. }
  888. )
  889. + "\n"
  890. + json.dumps(
  891. {
  892. "Columns": ["last_name"],
  893. "MatchId": ["23456"],
  894. "DeletionQueueItemId": "id003",
  895. "CreatedAt": 1614698440,
  896. "QueryableColumns": "last_name",
  897. "QueryableMatchId": "23456",
  898. }
  899. )
  900. + "\n"
  901. + json.dumps(
  902. {
  903. "Columns": ["age"],
  904. "MatchId": [23456],
  905. "DeletionQueueItemId": "id003",
  906. "CreatedAt": 1614698440,
  907. "QueryableColumns": "age",
  908. "QueryableMatchId": "23456",
  909. }
  910. )
  911. + "\n"
  912. # id004 composite multi-column
  913. + json.dumps(
  914. {
  915. "Columns": ["first_name", "last_name"],
  916. "MatchId": ["John", "Doe"],
  917. "DeletionQueueItemId": "id004",
  918. "CreatedAt": 1614698440,
  919. "QueryableColumns": "first_name_S3F2COMP_last_name",
  920. "QueryableMatchId": "John_S3F2COMP_Doe",
  921. }
  922. )
  923. + "\n"
  924. # id005 composite multi-column
  925. + json.dumps(
  926. {
  927. "Columns": ["first_name", "last_name"],
  928. "MatchId": ["Jane", "Doe"],
  929. "DeletionQueueItemId": "id005",
  930. "CreatedAt": 1614698440,
  931. "QueryableColumns": "first_name_S3F2COMP_last_name",
  932. "QueryableMatchId": "Jane_S3F2COMP_Doe",
  933. }
  934. )
  935. + "\n"
  936. # id006 is a id005 clone
  937. + json.dumps(
  938. {
  939. "Columns": ["first_name", "last_name"],
  940. "MatchId": ["Jane", "Doe"],
  941. "DeletionQueueItemId": "id006",
  942. "CreatedAt": 1614698440,
  943. "QueryableColumns": "first_name_S3F2COMP_last_name",
  944. "QueryableMatchId": "Jane_S3F2COMP_Doe",
  945. }
  946. )
  947. + "\n"
  948. # id007 composite multi-column with different types
  949. # note that columns are sorted alphabetically
  950. + json.dumps(
  951. {
  952. "Columns": ["age", "last_name"],
  953. "MatchId": [28, "Smith"],
  954. "DeletionQueueItemId": "id007",
  955. "CreatedAt": 1614698440,
  956. "QueryableColumns": "age_S3F2COMP_last_name",
  957. "QueryableMatchId": "28_S3F2COMP_Smith",
  958. }
  959. )
  960. + "\n"
  961. ),
  962. )
  963. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  964. @patch("backend.lambdas.tasks.generate_queries.get_table")
  965. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  966. def test_it_handles_multiple_partition_keys(
  967. self, get_partitions_mock, get_table_mock, bucket_mock
  968. ):
  969. put_object_mock = MagicMock()
  970. bucket_mock.return_value = put_object_mock
  971. columns = [{"Name": "customer_id"}]
  972. partition_keys = ["year", "month"]
  973. partitions = [["2019", "01"]]
  974. get_table_mock.return_value = table_stub(columns, partition_keys)
  975. get_partitions_mock.return_value = [
  976. partition_stub(p, columns) for p in partitions
  977. ]
  978. resp = generate_athena_queries(
  979. {
  980. "DataMapperId": "a",
  981. "QueryExecutor": "athena",
  982. "Columns": [col["Name"] for col in columns],
  983. "Format": "parquet",
  984. "QueryExecutorParameters": {
  985. "DataCatalogProvider": "glue",
  986. "Database": "test_db",
  987. "Table": "test_table",
  988. },
  989. },
  990. [
  991. {
  992. "MatchId": "hi",
  993. "CreatedAt": 1614698440,
  994. "DeletionQueueItemId": "id1234",
  995. }
  996. ],
  997. "job_1234567890",
  998. )
  999. assert resp == [
  1000. {
  1001. "DataMapperId": "a",
  1002. "QueryExecutor": "athena",
  1003. "Format": "parquet",
  1004. "Database": "test_db",
  1005. "Table": "test_table",
  1006. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1007. "PartitionKeys": [
  1008. {"Key": "year", "Value": "2019"},
  1009. {"Key": "month", "Value": "01"},
  1010. ],
  1011. "DeleteOldVersions": True,
  1012. "IgnoreObjectNotFoundExceptions": False,
  1013. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1014. }
  1015. ]
  1016. put_object_mock.put_object.assert_called_with(
  1017. Key="manifests/job_1234567890/a/manifest.json",
  1018. Body=(
  1019. json.dumps(
  1020. {
  1021. "Columns": ["customer_id"],
  1022. "MatchId": ["hi"],
  1023. "DeletionQueueItemId": "id1234",
  1024. "CreatedAt": 1614698440,
  1025. "QueryableColumns": "customer_id",
  1026. "QueryableMatchId": "hi",
  1027. }
  1028. )
  1029. + "\n"
  1030. ),
  1031. )
  1032. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1033. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1034. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1035. def test_it_handles_multiple_partition_values(
  1036. self, get_partitions_mock, get_table_mock, bucket_mock
  1037. ):
  1038. put_object_mock = MagicMock()
  1039. bucket_mock.return_value = put_object_mock
  1040. columns = [{"Name": "customer_id"}]
  1041. partition_keys = ["year", "month"]
  1042. partitions = [["2018", "12"], ["2019", "01"], ["2019", "02"]]
  1043. get_table_mock.return_value = table_stub(columns, partition_keys)
  1044. get_partitions_mock.return_value = [
  1045. partition_stub(p, columns) for p in partitions
  1046. ]
  1047. resp = generate_athena_queries(
  1048. {
  1049. "DataMapperId": "a",
  1050. "Columns": [col["Name"] for col in columns],
  1051. "Format": "parquet",
  1052. "QueryExecutor": "athena",
  1053. "QueryExecutorParameters": {
  1054. "DataCatalogProvider": "glue",
  1055. "Database": "test_db",
  1056. "Table": "test_table",
  1057. },
  1058. },
  1059. [
  1060. {
  1061. "MatchId": "hi",
  1062. "CreatedAt": 1614698440,
  1063. "DeletionQueueItemId": "item1234",
  1064. }
  1065. ],
  1066. "job_1234567890",
  1067. )
  1068. assert lists_equal_ignoring_order(
  1069. resp,
  1070. [
  1071. {
  1072. "DataMapperId": "a",
  1073. "Database": "test_db",
  1074. "Table": "test_table",
  1075. "QueryExecutor": "athena",
  1076. "Format": "parquet",
  1077. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1078. "PartitionKeys": [
  1079. {"Key": "year", "Value": "2018"},
  1080. {"Key": "month", "Value": "12"},
  1081. ],
  1082. "DeleteOldVersions": True,
  1083. "IgnoreObjectNotFoundExceptions": False,
  1084. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1085. },
  1086. {
  1087. "DataMapperId": "a",
  1088. "Database": "test_db",
  1089. "Table": "test_table",
  1090. "QueryExecutor": "athena",
  1091. "Format": "parquet",
  1092. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1093. "PartitionKeys": [
  1094. {"Key": "year", "Value": "2019"},
  1095. {"Key": "month", "Value": "01"},
  1096. ],
  1097. "DeleteOldVersions": True,
  1098. "IgnoreObjectNotFoundExceptions": False,
  1099. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1100. },
  1101. {
  1102. "DataMapperId": "a",
  1103. "Database": "test_db",
  1104. "Table": "test_table",
  1105. "QueryExecutor": "athena",
  1106. "Format": "parquet",
  1107. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1108. "PartitionKeys": [
  1109. {"Key": "year", "Value": "2019"},
  1110. {"Key": "month", "Value": "02"},
  1111. ],
  1112. "DeleteOldVersions": True,
  1113. "IgnoreObjectNotFoundExceptions": False,
  1114. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1115. },
  1116. ],
  1117. )
  1118. put_object_mock.put_object.assert_called_with(
  1119. Key="manifests/job_1234567890/a/manifest.json",
  1120. Body=(
  1121. json.dumps(
  1122. {
  1123. "Columns": ["customer_id"],
  1124. "MatchId": ["hi"],
  1125. "DeletionQueueItemId": "item1234",
  1126. "CreatedAt": 1614698440,
  1127. "QueryableColumns": "customer_id",
  1128. "QueryableMatchId": "hi",
  1129. }
  1130. )
  1131. + "\n"
  1132. ),
  1133. )
  1134. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1135. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1136. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1137. def test_it_propagates_optional_properties(
  1138. self, get_partitions_mock, get_table_mock, bucket_mock
  1139. ):
  1140. put_object_mock = MagicMock()
  1141. bucket_mock.return_value = put_object_mock
  1142. columns = [{"Name": "customer_id"}]
  1143. partition_keys = ["year", "month"]
  1144. partitions = [["2018", "12"], ["2019", "01"]]
  1145. get_table_mock.return_value = table_stub(columns, partition_keys)
  1146. get_partitions_mock.return_value = [
  1147. partition_stub(p, columns) for p in partitions
  1148. ]
  1149. resp = generate_athena_queries(
  1150. {
  1151. "DataMapperId": "a",
  1152. "QueryExecutor": "athena",
  1153. "Columns": [col["Name"] for col in columns],
  1154. "Format": "parquet",
  1155. "QueryExecutorParameters": {
  1156. "DataCatalogProvider": "glue",
  1157. "Database": "test_db",
  1158. "Table": "test_table",
  1159. },
  1160. "RoleArn": "arn:aws:iam::accountid:role/rolename",
  1161. "DeleteOldVersions": True,
  1162. "IgnoreObjectNotFoundExceptions": True,
  1163. },
  1164. [
  1165. {
  1166. "MatchId": "hi",
  1167. "CreatedAt": 1614698440,
  1168. "DeletionQueueItemId": "item1234",
  1169. }
  1170. ],
  1171. "job_1234567890",
  1172. )
  1173. assert lists_equal_ignoring_order(
  1174. resp,
  1175. [
  1176. {
  1177. "DataMapperId": "a",
  1178. "Database": "test_db",
  1179. "Table": "test_table",
  1180. "QueryExecutor": "athena",
  1181. "Format": "parquet",
  1182. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1183. "PartitionKeys": [
  1184. {"Key": "year", "Value": "2018"},
  1185. {"Key": "month", "Value": "12"},
  1186. ],
  1187. "RoleArn": "arn:aws:iam::accountid:role/rolename",
  1188. "DeleteOldVersions": True,
  1189. "IgnoreObjectNotFoundExceptions": True,
  1190. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1191. },
  1192. {
  1193. "DataMapperId": "a",
  1194. "Database": "test_db",
  1195. "Table": "test_table",
  1196. "QueryExecutor": "athena",
  1197. "Format": "parquet",
  1198. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1199. "PartitionKeys": [
  1200. {"Key": "year", "Value": "2019"},
  1201. {"Key": "month", "Value": "01"},
  1202. ],
  1203. "RoleArn": "arn:aws:iam::accountid:role/rolename",
  1204. "DeleteOldVersions": True,
  1205. "IgnoreObjectNotFoundExceptions": True,
  1206. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1207. },
  1208. ],
  1209. )
  1210. put_object_mock.put_object.assert_called_with(
  1211. Key="manifests/job_1234567890/a/manifest.json",
  1212. Body=(
  1213. json.dumps(
  1214. {
  1215. "Columns": ["customer_id"],
  1216. "MatchId": ["hi"],
  1217. "DeletionQueueItemId": "item1234",
  1218. "CreatedAt": 1614698440,
  1219. "QueryableColumns": "customer_id",
  1220. "QueryableMatchId": "hi",
  1221. }
  1222. )
  1223. + "\n"
  1224. ),
  1225. )
  1226. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1227. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1228. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1229. def test_it_filters_users_from_non_applicable_tables(
  1230. self, get_partitions_mock, get_table_mock, bucket_mock
  1231. ):
  1232. put_object_mock = MagicMock()
  1233. bucket_mock.return_value = put_object_mock
  1234. columns = [{"Name": "customer_id"}]
  1235. partition_keys = ["product_category"]
  1236. partitions = [["Books"]]
  1237. get_table_mock.return_value = table_stub(columns, partition_keys)
  1238. get_partitions_mock.return_value = [
  1239. partition_stub(p, columns) for p in partitions
  1240. ]
  1241. resp = generate_athena_queries(
  1242. {
  1243. "DataMapperId": "B",
  1244. "QueryExecutor": "athena",
  1245. "Columns": [col["Name"] for col in columns],
  1246. "Format": "parquet",
  1247. "QueryExecutorParameters": {
  1248. "DataCatalogProvider": "glue",
  1249. "Database": "test_db",
  1250. "Table": "B",
  1251. },
  1252. },
  1253. [
  1254. {
  1255. "MatchId": "123",
  1256. "CreatedAt": 1614698440,
  1257. "DataMappers": ["A"],
  1258. "DeletionQueueItemId": "id1",
  1259. },
  1260. {
  1261. "MatchId": "456",
  1262. "CreatedAt": 1614698440,
  1263. "DataMappers": [],
  1264. "DeletionQueueItemId": "id2",
  1265. },
  1266. ],
  1267. "job_1234567890",
  1268. )
  1269. assert resp == [
  1270. {
  1271. "DataMapperId": "B",
  1272. "Database": "test_db",
  1273. "Table": "B",
  1274. "QueryExecutor": "athena",
  1275. "Format": "parquet",
  1276. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1277. "PartitionKeys": [{"Key": "product_category", "Value": "Books"}],
  1278. "DeleteOldVersions": True,
  1279. "IgnoreObjectNotFoundExceptions": False,
  1280. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/B/manifest.json",
  1281. }
  1282. ]
  1283. put_object_mock.put_object.assert_called_with(
  1284. Key="manifests/job_1234567890/B/manifest.json",
  1285. Body=(
  1286. json.dumps(
  1287. {
  1288. "Columns": ["customer_id"],
  1289. "MatchId": ["456"],
  1290. "DeletionQueueItemId": "id2",
  1291. "CreatedAt": 1614698440,
  1292. "QueryableColumns": "customer_id",
  1293. "QueryableMatchId": "456",
  1294. }
  1295. )
  1296. + "\n"
  1297. ),
  1298. )
  1299. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1300. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1301. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1302. def test_it_handles_unpartitioned_data(
  1303. self, get_partitions_mock, get_table_mock, bucket_mock
  1304. ):
  1305. put_object_mock = MagicMock()
  1306. bucket_mock.return_value = put_object_mock
  1307. columns = [{"Name": "customer_id"}]
  1308. get_table_mock.return_value = table_stub(columns, [])
  1309. get_partitions_mock.return_value = []
  1310. resp = generate_athena_queries(
  1311. {
  1312. "DataMapperId": "a",
  1313. "QueryExecutor": "athena",
  1314. "Columns": [col["Name"] for col in columns],
  1315. "Format": "parquet",
  1316. "QueryExecutorParameters": {
  1317. "DataCatalogProvider": "glue",
  1318. "Database": "test_db",
  1319. "Table": "test_table",
  1320. },
  1321. },
  1322. [
  1323. {
  1324. "MatchId": "hi",
  1325. "CreatedAt": 1614698440,
  1326. "DeletionQueueItemId": "item1234",
  1327. }
  1328. ],
  1329. "job_1234567890",
  1330. )
  1331. assert resp == [
  1332. {
  1333. "DataMapperId": "a",
  1334. "Database": "test_db",
  1335. "Table": "test_table",
  1336. "QueryExecutor": "athena",
  1337. "Format": "parquet",
  1338. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1339. "PartitionKeys": [],
  1340. "DeleteOldVersions": True,
  1341. "IgnoreObjectNotFoundExceptions": False,
  1342. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1343. }
  1344. ]
  1345. put_object_mock.put_object.assert_called_with(
  1346. Key="manifests/job_1234567890/a/manifest.json",
  1347. Body=(
  1348. json.dumps(
  1349. {
  1350. "Columns": ["customer_id"],
  1351. "MatchId": ["hi"],
  1352. "DeletionQueueItemId": "item1234",
  1353. "CreatedAt": 1614698440,
  1354. "QueryableColumns": "customer_id",
  1355. "QueryableMatchId": "hi",
  1356. }
  1357. )
  1358. + "\n"
  1359. ),
  1360. )
  1361. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1362. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1363. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1364. def test_it_propagates_role_arn_for_unpartitioned_data(
  1365. self, get_partitions_mock, get_table_mock, bucket_mock
  1366. ):
  1367. put_object_mock = MagicMock()
  1368. bucket_mock.return_value = put_object_mock
  1369. columns = [{"Name": "customer_id"}]
  1370. get_table_mock.return_value = table_stub(columns, [])
  1371. get_partitions_mock.return_value = []
  1372. resp = generate_athena_queries(
  1373. {
  1374. "DataMapperId": "a",
  1375. "QueryExecutor": "athena",
  1376. "Columns": [col["Name"] for col in columns],
  1377. "Format": "parquet",
  1378. "QueryExecutorParameters": {
  1379. "DataCatalogProvider": "glue",
  1380. "Database": "test_db",
  1381. "Table": "test_table",
  1382. },
  1383. "RoleArn": "arn:aws:iam::accountid:role/rolename",
  1384. },
  1385. [
  1386. {
  1387. "MatchId": "hi",
  1388. "CreatedAt": 1614698440,
  1389. "DeletionQueueItemId": "item1234",
  1390. }
  1391. ],
  1392. "job_1234567890",
  1393. )
  1394. assert resp == [
  1395. {
  1396. "DataMapperId": "a",
  1397. "Database": "test_db",
  1398. "Table": "test_table",
  1399. "QueryExecutor": "athena",
  1400. "Format": "parquet",
  1401. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1402. "PartitionKeys": [],
  1403. "RoleArn": "arn:aws:iam::accountid:role/rolename",
  1404. "DeleteOldVersions": True,
  1405. "IgnoreObjectNotFoundExceptions": False,
  1406. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1407. }
  1408. ]
  1409. put_object_mock.put_object.assert_called_with(
  1410. Key="manifests/job_1234567890/a/manifest.json",
  1411. Body=(
  1412. json.dumps(
  1413. {
  1414. "Columns": ["customer_id"],
  1415. "MatchId": ["hi"],
  1416. "DeletionQueueItemId": "item1234",
  1417. "CreatedAt": 1614698440,
  1418. "QueryableColumns": "customer_id",
  1419. "QueryableMatchId": "hi",
  1420. }
  1421. )
  1422. + "\n"
  1423. ),
  1424. )
  1425. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1426. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1427. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1428. def test_it_removes_queries_with_no_applicable_matches(
  1429. self, get_partitions_mock, get_table_mock, bucket_mock
  1430. ):
  1431. put_object_mock = MagicMock()
  1432. bucket_mock.return_value = put_object_mock
  1433. columns = [{"Name": "customer_id"}]
  1434. get_table_mock.return_value = table_stub(columns, [])
  1435. get_partitions_mock.return_value = []
  1436. resp = generate_athena_queries(
  1437. {
  1438. "DataMapperId": "A",
  1439. "QueryExecutor": "athena",
  1440. "Columns": [col["Name"] for col in columns],
  1441. "Format": "parquet",
  1442. "QueryExecutorParameters": {
  1443. "DataCatalogProvider": "glue",
  1444. "Database": "test_db",
  1445. "Table": "test_table",
  1446. },
  1447. },
  1448. [{"MatchId": "123", "DataMappers": ["B"], "DeletionQueueItemId": "id1234"}],
  1449. "job_1234567890",
  1450. )
  1451. assert resp == []
  1452. assert not put_object_mock.put_object.called
  1453. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1454. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1455. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1456. def test_it_removes_queries_with_no_applicable_matches_for_partitioned_data(
  1457. self, get_partitions_mock, get_table_mock, bucket_mock
  1458. ):
  1459. put_object_mock = MagicMock()
  1460. bucket_mock.return_value = put_object_mock
  1461. columns = [{"Name": "customer_id"}]
  1462. partition_keys = ["product_category"]
  1463. partitions = [["Books"], ["Beauty"]]
  1464. get_table_mock.return_value = table_stub(columns, partition_keys)
  1465. get_partitions_mock.return_value = [
  1466. partition_stub(p, columns) for p in partitions
  1467. ]
  1468. resp = generate_athena_queries(
  1469. {
  1470. "DataMapperId": "A",
  1471. "QueryExecutor": "athena",
  1472. "Columns": [col["Name"] for col in columns],
  1473. "Format": "parquet",
  1474. "QueryExecutorParameters": {
  1475. "DataCatalogProvider": "glue",
  1476. "Database": "test_db",
  1477. "Table": "test_table",
  1478. },
  1479. },
  1480. [{"MatchId": "123", "DataMappers": ["C"], "DeletionQueueItemId": "id1234"}],
  1481. "job_1234567890",
  1482. )
  1483. assert resp == []
  1484. assert not put_object_mock.put_object.called
  1485. @patch("backend.lambdas.tasks.generate_queries.glue_client")
  1486. def test_it_returns_table(self, client):
  1487. client.get_table.return_value = {"Table": {"Name": "test"}}
  1488. result = get_table("test_db", "test_table")
  1489. assert {"Name": "test"} == result
  1490. client.get_table.assert_called_with(DatabaseName="test_db", Name="test_table")
  1491. @patch("backend.lambdas.tasks.generate_queries.paginate")
  1492. def test_it_returns_all_partitions(self, paginate):
  1493. paginate.return_value = iter(["blah"])
  1494. result = list(get_partitions("test_db", "test_table"))
  1495. assert ["blah"] == result
  1496. paginate.assert_called_with(
  1497. mock.ANY,
  1498. mock.ANY,
  1499. ["Partitions"],
  1500. **{
  1501. "DatabaseName": "test_db",
  1502. "TableName": "test_table",
  1503. "ExcludeColumnSchema": True,
  1504. }
  1505. )
  1506. def test_it_converts_supported_types(self):
  1507. for scenario in [
  1508. {"value": "m", "type": "char", "expected": "m"},
  1509. {"value": "mystr", "type": "string", "expected": "mystr"},
  1510. {"value": "mystr", "type": "varchar", "expected": "mystr"},
  1511. {"value": "2", "type": "bigint", "expected": 2},
  1512. {"value": "2", "type": "int", "expected": 2},
  1513. {"value": "2", "type": "smallint", "expected": 2},
  1514. {"value": "2", "type": "tinyint", "expected": 2},
  1515. {"value": "2.23", "type": "double", "expected": 2.23},
  1516. {"value": "2.23", "type": "float", "expected": 2.23},
  1517. ]:
  1518. res = cast_to_type(
  1519. scenario["value"],
  1520. "test_col",
  1521. "TableName",
  1522. [
  1523. {
  1524. "Name": "test_col",
  1525. "Type": scenario["type"],
  1526. "CanBeIdentifier": True,
  1527. }
  1528. ],
  1529. )
  1530. assert res == scenario["expected"]
  1531. def test_it_converts_supported_types_when_nested_in_struct(self):
  1532. column_type = "struct<type:int,x:map<string,struct<a:int>>,info:struct<user_id:int,name:string>>"
  1533. tree = list(map(column_mapper, [{"Name": "user", "Type": column_type}]))
  1534. for scenario in [
  1535. {"value": "john_doe", "id": "user.info.name", "expected": "john_doe"},
  1536. {"value": "1234567890", "id": "user.info.user_id", "expected": 1234567890},
  1537. {"value": "1", "id": "user.type", "expected": 1},
  1538. ]:
  1539. res = cast_to_type(scenario["value"], scenario["id"], "TableName", tree)
  1540. assert res == scenario["expected"]
  1541. def test_it_throws_for_unknown_col(self):
  1542. with pytest.raises(ValueError) as e:
  1543. cast_to_type(
  1544. "mystr",
  1545. "doesnt_exist",
  1546. "TableName",
  1547. [{"Name": "test_col", "Type": "string", "CanBeIdentifier": True}],
  1548. )
  1549. assert e.value.args[0] == "Column doesnt_exist not found at table TableName"
  1550. def test_it_throws_for_unsupported_complex_nested_types(self):
  1551. for scenario in [
  1552. "array<x:int>",
  1553. "array<struct<x:int>>",
  1554. "struct<a:array<struct<a:int,x:int>>>",
  1555. "array<struct<a:int,b:struct<x:int>>>",
  1556. "struct<a:map<string,struct<x:int>>>",
  1557. "map<string,struct<x:int>>",
  1558. ]:
  1559. with pytest.raises(ValueError):
  1560. cast_to_type(
  1561. 123,
  1562. "user.x",
  1563. "TableName",
  1564. list(map(column_mapper, [{"Name": "user", "Type": scenario}])),
  1565. )
  1566. def test_it_throws_for_unsupported_col_types(self):
  1567. with pytest.raises(ValueError) as e:
  1568. cast_to_type(
  1569. "2.56",
  1570. "test_col",
  1571. "TableName",
  1572. list(map(column_mapper, [{"Name": "test_col", "Type": "foo"}])),
  1573. )
  1574. assert (
  1575. e.value.args[0]
  1576. == "Column test_col at table TableName is not a supported column type for querying"
  1577. )
  1578. def test_it_throws_for_unconvertable_matches(self):
  1579. with pytest.raises(ValueError):
  1580. cast_to_type(
  1581. "mystr",
  1582. "test_col",
  1583. "TableName",
  1584. list(map(column_mapper, [{"Name": "test_col", "Type": "int"}])),
  1585. )
  1586. def test_it_throws_for_invalid_schema_for_inner_children(self):
  1587. with pytest.raises(ValueError) as e:
  1588. get_inner_children("struct<name:string", "struct<", ">")
  1589. assert e.value.args[0] == "Column schema is not valid"
  1590. def test_it_throws_for_invalid_schema_for_nested_children(self):
  1591. with pytest.raises(ValueError) as e:
  1592. get_nested_children(
  1593. "struct<name:string,age:int,s:struct<n:int>,b:string", "struct"
  1594. )
  1595. assert e.value.args[0] == "Column schema is not valid"
  1596. @patch("backend.lambdas.tasks.generate_queries.s3.Bucket")
  1597. @patch("backend.lambdas.tasks.generate_queries.get_table")
  1598. @patch("backend.lambdas.tasks.generate_queries.get_partitions")
  1599. def test_it_handles_partition_filtering(
  1600. self, get_partitions_mock, get_table_mock, bucket_mock
  1601. ):
  1602. put_object_mock = MagicMock()
  1603. bucket_mock.return_value = put_object_mock
  1604. columns = [{"Name": "customer_id"}]
  1605. partition_keys = ["year", "month"]
  1606. partitions = [["2018", "12"], ["2019", "01"], ["2019", "02"]]
  1607. get_table_mock.return_value = table_stub(columns, partition_keys)
  1608. get_partitions_mock.return_value = [
  1609. partition_stub(p, columns) for p in partitions
  1610. ]
  1611. resp = generate_athena_queries(
  1612. {
  1613. "DataMapperId": "a",
  1614. "QueryExecutor": "athena",
  1615. "Columns": [col["Name"] for col in columns],
  1616. "Format": "parquet",
  1617. "QueryExecutorParameters": {
  1618. "DataCatalogProvider": "glue",
  1619. "Database": "test_db",
  1620. "Table": "test_table",
  1621. "PartitionKeys": ["year"],
  1622. },
  1623. },
  1624. [
  1625. {
  1626. "MatchId": "hi",
  1627. "CreatedAt": 1614698440,
  1628. "DeletionQueueItemId": "id1234",
  1629. }
  1630. ],
  1631. "job_1234567890",
  1632. )
  1633. assert lists_equal_ignoring_order(
  1634. resp,
  1635. [
  1636. {
  1637. "DataMapperId": "a",
  1638. "QueryExecutor": "athena",
  1639. "Format": "parquet",
  1640. "Database": "test_db",
  1641. "Table": "test_table",
  1642. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1643. "PartitionKeys": [{"Key": "year", "Value": "2018"}],
  1644. "DeleteOldVersions": True,
  1645. "IgnoreObjectNotFoundExceptions": False,
  1646. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1647. },
  1648. {
  1649. "DataMapperId": "a",
  1650. "QueryExecutor": "athena",
  1651. "Format": "parquet",
  1652. "Database": "test_db",
  1653. "Table": "test_table",
  1654. "Columns": [{"Column": "customer_id", "Type": "Simple"}],
  1655. "PartitionKeys": [{"Key": "year", "Value": "2019"}],
  1656. "DeleteOldVersions": True,
  1657. "IgnoreObjectNotFoundExceptions": False,
  1658. "Manifest": "s3://S3F2-manifests-bucket/manifests/job_1234567890/a/manifest.json",
  1659. },
  1660. ],
  1661. )
  1662. put_object_mock.put_object.assert_called_with(
  1663. Key="manifests/job_1234567890/a/manifest.json",
  1664. Body=(
  1665. json.dumps(
  1666. {
  1667. "Columns": ["customer_id"],
  1668. "MatchId": ["hi"],
  1669. "DeletionQueueItemId": "id1234",
  1670. "CreatedAt": 1614698440,
  1671. "QueryableColumns": "customer_id",
  1672. "QueryableMatchId": "hi",
  1673. }
  1674. )
  1675. + "\n"
  1676. ),
  1677. )
  1678. @patch("backend.lambdas.tasks.generate_queries.deserialize_item")
  1679. @patch("backend.lambdas.tasks.generate_queries.paginate")
  1680. def test_it_fetches_deletion_queue_from_ddb(paginate_mock, deserialize_mock):
  1681. item = {"DeletionQueueItems": [{"DataMappers": [], "MatchId": "123"}]}
  1682. deserialize_mock.return_value = item
  1683. paginate_mock.return_value = iter([item])
  1684. resp = get_deletion_queue()
  1685. assert list(resp) == [item]
  1686. @patch("backend.lambdas.tasks.generate_queries.deserialize_item")
  1687. @patch("backend.lambdas.tasks.generate_queries.paginate")
  1688. def test_it_fetches_deserialized_data_mappers(paginate_mock, deserialize_mock):
  1689. dm = {
  1690. "DataMapperId": "a",
  1691. "QueryExecutor": "athena",
  1692. "Columns": ["customer_id"],
  1693. "Format": "parquet",
  1694. "QueryExecutorParameters": {
  1695. "DataCatalogProvider": "glue",
  1696. "Database": "test_db",
  1697. "Table": "test_table",
  1698. },
  1699. }
  1700. deserialize_mock.return_value = dm
  1701. paginate_mock.return_value = iter([dm])
  1702. resp = get_data_mappers()
  1703. assert list(resp) == [dm]
  1704. @patch("backend.lambdas.tasks.generate_queries.glue_client")
  1705. def test_it_writes_glue_partitions(glue_client):
  1706. write_partitions([["job_1234", "dm_0001"], ["job_1234", "dm_0003"]])
  1707. glue_client.batch_create_partition.assert_called_with(
  1708. DatabaseName="s3f2_manifests_database",
  1709. TableName="s3f2_manifests_table",
  1710. PartitionInputList=[
  1711. {
  1712. "Values": ["job_1234", "dm_0001"],
  1713. "StorageDescriptor": {
  1714. "Columns": [
  1715. {"Name": "columns", "Type": "array<string>"},
  1716. {"Name": "matchid", "Type": "array<string>"},
  1717. {"Name": "deletionqueueitemid", "Type": "string"},
  1718. {"Name": "createdat", "Type": "int"},
  1719. {"Name": "queryablecolumns", "Type": "string"},
  1720. {"Name": "queryablematchid", "Type": "string"},
  1721. ],
  1722. "Location": "s3://S3F2-manifests-bucket/manifests/job_1234/dm_0001/",
  1723. "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
  1724. "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
  1725. "Compressed": False,
  1726. "SerdeInfo": {
  1727. "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe",
  1728. },
  1729. "StoredAsSubDirectories": False,
  1730. },
  1731. },
  1732. {
  1733. "Values": ["job_1234", "dm_0003"],
  1734. "StorageDescriptor": {
  1735. "Columns": [
  1736. {"Name": "columns", "Type": "array<string>"},
  1737. {"Name": "matchid", "Type": "array<string>"},
  1738. {"Name": "deletionqueueitemid", "Type": "string"},
  1739. {"Name": "createdat", "Type": "int"},
  1740. {"Name": "queryablecolumns", "Type": "string"},
  1741. {"Name": "queryablematchid", "Type": "string"},
  1742. ],
  1743. "Location": "s3://S3F2-manifests-bucket/manifests/job_1234/dm_0003/",
  1744. "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
  1745. "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
  1746. "Compressed": False,
  1747. "SerdeInfo": {
  1748. "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe",
  1749. },
  1750. "StoredAsSubDirectories": False,
  1751. },
  1752. },
  1753. ],
  1754. )
  1755. def partition_stub(values, columns, table_name="test_table"):
  1756. return {
  1757. "Values": values,
  1758. "DatabaseName": "test",
  1759. "TableName": table_name,
  1760. "CreationTime": 1572440736.0,
  1761. "LastAccessTime": 0.0,
  1762. "StorageDescriptor": {
  1763. "Columns": [
  1764. {"Name": col["Name"], "Type": col.get("Type", "string")}
  1765. for col in columns
  1766. ],
  1767. "Location": "s3://bucket/location",
  1768. "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
  1769. "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
  1770. "Compressed": False,
  1771. "NumberOfBuckets": -1,
  1772. "SerdeInfo": {
  1773. "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
  1774. "Parameters": {"serialization.format": "1"},
  1775. },
  1776. "BucketColumns": [],
  1777. "SortColumns": [],
  1778. "Parameters": {},
  1779. "SkewedInfo": {
  1780. "SkewedColumnNames": [],
  1781. "SkewedColumnValues": [],
  1782. "SkewedColumnValueLocationMaps": {},
  1783. },
  1784. "StoredAsSubDirectories": False,
  1785. },
  1786. }
  1787. def table_stub(
  1788. columns, partition_keys, table_name="test_table", partition_keys_type="string"
  1789. ):
  1790. return {
  1791. "Name": table_name,
  1792. "DatabaseName": "test",
  1793. "Owner": "test",
  1794. "CreateTime": 1572438253.0,
  1795. "UpdateTime": 1572438253.0,
  1796. "LastAccessTime": 0.0,
  1797. "Retention": 0,
  1798. "StorageDescriptor": {
  1799. "Columns": [
  1800. {"Name": col["Name"], "Type": col.get("Type", "string")}
  1801. for col in columns
  1802. ],
  1803. "Location": "s3://bucket/location",
  1804. "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
  1805. "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
  1806. "Compressed": False,
  1807. "NumberOfBuckets": -1,
  1808. "SerdeInfo": {
  1809. "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
  1810. "Parameters": {"serialization.format": "1"},
  1811. },
  1812. "BucketColumns": [],
  1813. "SortColumns": [],
  1814. "Parameters": {},
  1815. "SkewedInfo": {
  1816. "SkewedColumnNames": [],
  1817. "SkewedColumnValues": [],
  1818. "SkewedColumnValueLocationMaps": {},
  1819. },
  1820. "StoredAsSubDirectories": False,
  1821. },
  1822. "PartitionKeys": [
  1823. {"Name": partition_key, "Type": partition_keys_type}
  1824. for partition_key in partition_keys
  1825. ],
  1826. "TableType": "EXTERNAL_TABLE",
  1827. "Parameters": {"EXTERNAL": "TRUE"},
  1828. }