test_job_cognito.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251
  1. import json
  2. import tempfile
  3. import uuid
  4. import mock
  5. import pytest
  6. from decimal import Decimal
  7. from tests.acceptance import query_json_file, query_parquet_file, download_and_decrypt
  8. pytestmark = [
  9. pytest.mark.acceptance_cognito,
  10. pytest.mark.jobs,
  11. pytest.mark.usefixtures("empty_jobs"),
  12. ]
  13. @pytest.mark.auth
  14. @pytest.mark.api
  15. def test_auth(api_client_cognito, jobs_endpoint):
  16. headers = {"Authorization": None}
  17. assert (
  18. 401
  19. == api_client_cognito.get(
  20. "{}/{}".format(jobs_endpoint, "a"), headers=headers
  21. ).status_code
  22. )
  23. assert 401 == api_client_cognito.get(jobs_endpoint, headers=headers).status_code
  24. assert (
  25. 401
  26. == api_client_cognito.get(
  27. "{}/{}/events".format(jobs_endpoint, "a"), headers=headers
  28. ).status_code
  29. )
  30. @pytest.mark.api
  31. def test_it_gets_jobs(
  32. api_client_cognito, jobs_endpoint, job_factory, stack, job_table, job_exists_waiter
  33. ):
  34. # Arrange
  35. job_id = job_factory()["Id"]
  36. job_exists_waiter.wait(
  37. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  38. )
  39. # Act
  40. response = api_client_cognito.get("{}/{}".format(jobs_endpoint, job_id))
  41. response_body = response.json()
  42. # Assert
  43. assert response.status_code == 200
  44. assert {
  45. "Id": job_id,
  46. "Sk": job_id,
  47. "Type": "Job",
  48. "JobStatus": mock.ANY,
  49. "GSIBucket": mock.ANY,
  50. "CreatedAt": mock.ANY,
  51. "AthenaConcurrencyLimit": mock.ANY,
  52. "AthenaQueryMaxRetries": mock.ANY,
  53. "DeletionTasksMaxNumber": mock.ANY,
  54. "QueryExecutionWaitSeconds": mock.ANY,
  55. "QueryQueueWaitSeconds": mock.ANY,
  56. "ForgetQueueWaitSeconds": mock.ANY,
  57. } == response_body
  58. assert (
  59. response.headers.get("Access-Control-Allow-Origin")
  60. == stack["APIAccessControlAllowOriginHeader"]
  61. )
  62. @pytest.mark.api
  63. def test_it_handles_unknown_jobs(api_client_cognito, jobs_endpoint, stack):
  64. # Arrange
  65. job_id = "invalid"
  66. # Act
  67. response = api_client_cognito.get("{}/{}".format(jobs_endpoint, job_id))
  68. # Assert
  69. assert response.status_code == 404
  70. assert (
  71. response.headers.get("Access-Control-Allow-Origin")
  72. == stack["APIAccessControlAllowOriginHeader"]
  73. )
  74. @pytest.mark.api
  75. def test_it_lists_jobs_by_date(
  76. api_client_cognito, jobs_endpoint, job_factory, stack, job_table, job_exists_waiter
  77. ):
  78. # Arrange
  79. job_id_1 = job_factory(job_id=str(uuid.uuid4()), created_at=1576861489)["Id"]
  80. job_id_2 = job_factory(job_id=str(uuid.uuid4()), created_at=1576861490)["Id"]
  81. job_exists_waiter.wait(
  82. TableName=job_table.name, Key={"Id": {"S": job_id_1}, "Sk": {"S": job_id_1}}
  83. )
  84. job_exists_waiter.wait(
  85. TableName=job_table.name, Key={"Id": {"S": job_id_2}, "Sk": {"S": job_id_2}}
  86. )
  87. # Act
  88. response = api_client_cognito.get(jobs_endpoint)
  89. response_body = response.json()
  90. # Assert
  91. assert response.status_code == 200
  92. assert (
  93. response_body["Jobs"][0]["CreatedAt"] >= response_body["Jobs"][1]["CreatedAt"]
  94. )
  95. assert (
  96. response.headers.get("Access-Control-Allow-Origin")
  97. == stack["APIAccessControlAllowOriginHeader"]
  98. )
  99. @pytest.mark.api
  100. def test_it_returns_summary_fields_in_list(
  101. api_client_cognito, jobs_endpoint, job_factory, job_table, job_finished_waiter
  102. ):
  103. # Arrange
  104. job_id = job_factory(job_id=str(uuid.uuid4()), created_at=1576861489)["Id"]
  105. job_finished_waiter.wait(
  106. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  107. )
  108. # Act
  109. response = api_client_cognito.get(jobs_endpoint)
  110. response_body = response.json()
  111. # Assert
  112. assert response.status_code == 200
  113. for job in response_body["Jobs"]:
  114. assert all(
  115. [
  116. k in job
  117. for k in [
  118. "Id",
  119. "CreatedAt",
  120. "JobStatus",
  121. "JobFinishTime",
  122. "JobStartTime",
  123. "TotalObjectRollbackFailedCount",
  124. "TotalObjectUpdatedCount",
  125. "TotalObjectUpdateSkippedCount",
  126. "TotalObjectUpdateFailedCount",
  127. "TotalQueryCount",
  128. "TotalQueryScannedInBytes",
  129. "TotalQuerySucceededCount",
  130. "TotalQueryTimeInMillis",
  131. ]
  132. ]
  133. )
  134. @pytest.mark.api
  135. def test_it_lists_job_events_by_date(
  136. api_client_cognito,
  137. jobs_endpoint,
  138. job_factory,
  139. stack,
  140. job_table,
  141. job_finished_waiter,
  142. ):
  143. # Arrange
  144. job_id = str(uuid.uuid4())
  145. job_id = job_factory(job_id=job_id, created_at=1576861489)["Id"]
  146. job_finished_waiter.wait(
  147. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  148. )
  149. # Act
  150. response = api_client_cognito.get("{}/{}/events".format(jobs_endpoint, job_id))
  151. response_body = response.json()
  152. # Assert
  153. assert response.status_code == 200
  154. job_events = response_body["JobEvents"]
  155. assert len(job_events) > 0
  156. assert (
  157. response.headers.get("Access-Control-Allow-Origin")
  158. == stack["APIAccessControlAllowOriginHeader"]
  159. )
  160. assert all(
  161. job_events[i]["CreatedAt"] <= job_events[i + 1]["CreatedAt"]
  162. for i in range(len(job_events) - 1)
  163. )
  164. @pytest.mark.api
  165. def test_it_filters_job_events_by_event_name(
  166. api_client_cognito,
  167. jobs_endpoint,
  168. job_factory,
  169. stack,
  170. job_table,
  171. job_finished_waiter,
  172. ):
  173. # Arrange
  174. job_id = str(uuid.uuid4())
  175. job_id = job_factory(job_id=job_id, created_at=1576861489)["Id"]
  176. job_finished_waiter.wait(
  177. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  178. )
  179. # Act
  180. response = api_client_cognito.get(
  181. "{}/{}/events?filter=EventName%3DFindPhaseStarted".format(jobs_endpoint, job_id)
  182. )
  183. response_body = response.json()
  184. job_events = response_body["JobEvents"]
  185. # Assert
  186. assert response.status_code == 200
  187. assert len(job_events) == 1
  188. assert "FindPhaseStarted" == job_events[0]["EventName"]
  189. assert (
  190. response.headers.get("Access-Control-Allow-Origin")
  191. == stack["APIAccessControlAllowOriginHeader"]
  192. )
  193. def test_it_runs_for_parquet_happy_path(
  194. del_queue_factory,
  195. job_factory,
  196. dummy_lake,
  197. glue_data_mapper_factory,
  198. data_loader,
  199. job_complete_waiter,
  200. job_table,
  201. ):
  202. # Arrange
  203. glue_data_mapper_factory(
  204. "test",
  205. partition_keys=["year", "month", "day"],
  206. partitions=[["2019", "08", "20"]],
  207. )
  208. item = del_queue_factory("12345")
  209. object_key = "test/2019/08/20/test.parquet"
  210. data_loader(
  211. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  212. )
  213. bucket = dummy_lake["bucket"]
  214. job_id = job_factory(del_queue_items=[item])["Id"]
  215. # Act
  216. job_complete_waiter.wait(
  217. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  218. )
  219. # Assert
  220. tmp = tempfile.NamedTemporaryFile()
  221. bucket.download_fileobj(object_key, tmp)
  222. assert (
  223. "COMPLETED"
  224. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  225. )
  226. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  227. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  228. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  229. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  230. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  231. assert "cache" == bucket.Object(object_key).cache_control
  232. def test_it_runs_for_parquet_cse_kms(
  233. del_queue_factory,
  234. job_factory,
  235. dummy_lake,
  236. glue_data_mapper_factory,
  237. encrypted_data_loader,
  238. job_complete_waiter,
  239. job_table,
  240. kms_factory,
  241. kms_client,
  242. ):
  243. # Arrange
  244. glue_data_mapper_factory(
  245. "test",
  246. partition_keys=["year", "month", "day"],
  247. partitions=[["2019", "08", "20"]],
  248. encrypted=True,
  249. )
  250. item = del_queue_factory("12345")
  251. encryption_key = kms_factory
  252. object_key_cbc = "test/2019/08/20/test_cbc.parquet"
  253. object_key_gcm = "test/2019/08/20/test_gcm.parquet"
  254. encrypted_data_loader(
  255. "basic.parquet",
  256. object_key_cbc,
  257. encryption_key,
  258. "AES/CBC/PKCS5Padding",
  259. CacheControl="cache",
  260. )
  261. encrypted_data_loader(
  262. "basic.parquet",
  263. object_key_gcm,
  264. encryption_key,
  265. "AES/GCM/NoPadding",
  266. CacheControl="cache",
  267. )
  268. bucket = dummy_lake["bucket"]
  269. job_id = job_factory(del_queue_items=[item])["Id"]
  270. # Act
  271. job_complete_waiter.wait(
  272. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  273. )
  274. decrypted_cbc, metadata_cbc = download_and_decrypt(
  275. bucket, object_key_cbc, kms_client
  276. )
  277. decrypted_gcm, metadata_gcm = download_and_decrypt(
  278. bucket, object_key_gcm, kms_client
  279. )
  280. tmp_cbc = tempfile.NamedTemporaryFile()
  281. tmp_gcm = tempfile.NamedTemporaryFile()
  282. open(tmp_cbc.name, "wb").write(decrypted_cbc)
  283. open(tmp_gcm.name, "wb").write(decrypted_gcm)
  284. # Assert
  285. assert (
  286. "COMPLETED"
  287. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  288. )
  289. assert 0 == len(query_parquet_file(tmp_cbc, "customer_id", "12345"))
  290. assert 1 == len(query_parquet_file(tmp_cbc, "customer_id", "23456"))
  291. assert 1 == len(query_parquet_file(tmp_cbc, "customer_id", "34567"))
  292. assert metadata_cbc["x-amz-cek-alg"] == "AES/CBC/PKCS5Padding"
  293. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_cbc)))
  294. assert "cache" == bucket.Object(object_key_cbc).cache_control
  295. assert 0 == len(query_parquet_file(tmp_gcm, "customer_id", "12345"))
  296. assert 1 == len(query_parquet_file(tmp_gcm, "customer_id", "23456"))
  297. assert 1 == len(query_parquet_file(tmp_gcm, "customer_id", "34567"))
  298. assert metadata_gcm["x-amz-cek-alg"] == "AES/GCM/NoPadding"
  299. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_gcm)))
  300. assert "cache" == bucket.Object(object_key_gcm).cache_control
  301. def test_it_runs_for_parquet_backwards_compatible_matches(
  302. del_queue_factory,
  303. job_factory,
  304. dummy_lake,
  305. glue_data_mapper_factory,
  306. data_loader,
  307. job_complete_waiter,
  308. job_table,
  309. ):
  310. # Arrange
  311. glue_data_mapper_factory(
  312. "test",
  313. partition_keys=["year", "month", "day"],
  314. partitions=[["2019", "08", "20"]],
  315. )
  316. # MatchId Type was introduced in 0.19 only and it should default to Simple
  317. item = del_queue_factory("12345", matchid_type=None)
  318. object_key = "test/2019/08/20/test.parquet"
  319. data_loader(
  320. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  321. )
  322. bucket = dummy_lake["bucket"]
  323. job_id = job_factory(del_queue_items=[item])["Id"]
  324. # Act
  325. job_complete_waiter.wait(
  326. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  327. )
  328. # Assert
  329. tmp = tempfile.NamedTemporaryFile()
  330. bucket.download_fileobj(object_key, tmp)
  331. assert (
  332. "COMPLETED"
  333. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  334. )
  335. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  336. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  337. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  338. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  339. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  340. assert "cache" == bucket.Object(object_key).cache_control
  341. def test_it_runs_for_parquet_composite_matches(
  342. del_queue_factory,
  343. job_factory,
  344. dummy_lake,
  345. glue_data_mapper_factory,
  346. data_loader,
  347. job_complete_waiter,
  348. job_table,
  349. ):
  350. # Arrange
  351. glue_data_mapper_factory(
  352. "test",
  353. partition_keys=["year", "month", "day"],
  354. partitions=[["2019", "08", "20"]],
  355. )
  356. item = del_queue_factory(
  357. [
  358. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  359. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  360. ],
  361. "id123",
  362. matchid_type="Composite",
  363. data_mappers=["test"],
  364. )
  365. object_key = "test/2019/08/20/test.parquet"
  366. data_loader(
  367. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  368. )
  369. bucket = dummy_lake["bucket"]
  370. job_id = job_factory(del_queue_items=[item])["Id"]
  371. # Act
  372. job_complete_waiter.wait(
  373. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  374. )
  375. # Assert
  376. tmp = tempfile.NamedTemporaryFile()
  377. bucket.download_fileobj(object_key, tmp)
  378. assert (
  379. "COMPLETED"
  380. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  381. )
  382. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  383. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  384. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  385. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  386. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  387. assert "cache" == bucket.Object(object_key).cache_control
  388. def test_it_runs_for_parquet_mixed_matches(
  389. del_queue_factory,
  390. job_factory,
  391. dummy_lake,
  392. glue_data_mapper_factory,
  393. data_loader,
  394. job_complete_waiter,
  395. job_table,
  396. ):
  397. # Arrange
  398. glue_data_mapper_factory(
  399. "test",
  400. partition_keys=["year", "month", "day"],
  401. partitions=[["2019", "08", "20"]],
  402. )
  403. composite_item = del_queue_factory(
  404. [
  405. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  406. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  407. ],
  408. "id123",
  409. matchid_type="Composite",
  410. data_mappers=["test"],
  411. )
  412. simple_item = del_queue_factory("23456", "id234")
  413. object_key = "test/2019/08/20/test.parquet"
  414. data_loader(
  415. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  416. )
  417. bucket = dummy_lake["bucket"]
  418. job_id = job_factory(del_queue_items=[composite_item, simple_item])["Id"]
  419. # Act
  420. job_complete_waiter.wait(
  421. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  422. )
  423. # Assert
  424. tmp = tempfile.NamedTemporaryFile()
  425. bucket.download_fileobj(object_key, tmp)
  426. assert (
  427. "COMPLETED"
  428. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  429. )
  430. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  431. assert 0 == len(query_parquet_file(tmp, "customer_id", "23456"))
  432. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  433. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  434. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  435. assert "cache" == bucket.Object(object_key).cache_control
  436. def test_it_runs_for_json_happy_path(
  437. del_queue_factory,
  438. job_factory,
  439. dummy_lake,
  440. glue_data_mapper_factory,
  441. data_loader,
  442. job_complete_waiter,
  443. job_table,
  444. ):
  445. # Arrange
  446. glue_data_mapper_factory(
  447. "test",
  448. partition_keys=["year", "month", "day"],
  449. partitions=[["2019", "08", "20"]],
  450. fmt="json",
  451. )
  452. item = del_queue_factory("12345")
  453. object_key = "test/2019/08/20/test.json"
  454. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  455. bucket = dummy_lake["bucket"]
  456. job_id = job_factory(del_queue_items=[item])["Id"]
  457. # Act
  458. job_complete_waiter.wait(
  459. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  460. )
  461. # Assert
  462. tmp = tempfile.NamedTemporaryFile()
  463. bucket.download_file(object_key, tmp.name)
  464. assert (
  465. "COMPLETED"
  466. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  467. )
  468. assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
  469. assert 1 == len(query_json_file(tmp.name, "customer_id", "23456"))
  470. assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
  471. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  472. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  473. assert "cache" == bucket.Object(object_key).cache_control
  474. def test_it_runs_for_json_cse_kms(
  475. del_queue_factory,
  476. job_factory,
  477. dummy_lake,
  478. glue_data_mapper_factory,
  479. encrypted_data_loader,
  480. job_complete_waiter,
  481. job_table,
  482. kms_factory,
  483. kms_client,
  484. ):
  485. # Arrange
  486. glue_data_mapper_factory(
  487. "test",
  488. partition_keys=["year", "month", "day"],
  489. partitions=[["2019", "08", "20"]],
  490. fmt="json",
  491. encrypted=True,
  492. )
  493. item = del_queue_factory("12345")
  494. encryption_key = kms_factory
  495. object_key_cbc = "test/2019/08/20/test_cbc.json"
  496. object_key_gcm = "test/2019/08/20/test_gcm.json"
  497. encrypted_data_loader(
  498. "basic.json",
  499. object_key_cbc,
  500. encryption_key,
  501. "AES/CBC/PKCS5Padding",
  502. CacheControl="cache",
  503. )
  504. encrypted_data_loader(
  505. "basic.json",
  506. object_key_gcm,
  507. encryption_key,
  508. "AES/GCM/NoPadding",
  509. CacheControl="cache",
  510. )
  511. bucket = dummy_lake["bucket"]
  512. job_id = job_factory(del_queue_items=[item])["Id"]
  513. # Act
  514. job_complete_waiter.wait(
  515. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  516. )
  517. decrypted_cbc, metadata_cbc = download_and_decrypt(
  518. bucket, object_key_cbc, kms_client
  519. )
  520. decrypted_gcm, metadata_gcm = download_and_decrypt(
  521. bucket, object_key_gcm, kms_client
  522. )
  523. tmp_cbc = tempfile.NamedTemporaryFile()
  524. tmp_gcm = tempfile.NamedTemporaryFile()
  525. open(tmp_cbc.name, "wb").write(decrypted_cbc)
  526. open(tmp_gcm.name, "wb").write(decrypted_gcm)
  527. # Assert
  528. assert (
  529. "COMPLETED"
  530. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  531. )
  532. assert 0 == len(query_json_file(tmp_cbc.name, "customer_id", "12345"))
  533. assert 1 == len(query_json_file(tmp_cbc.name, "customer_id", "23456"))
  534. assert 1 == len(query_json_file(tmp_cbc.name, "customer_id", "34567"))
  535. assert metadata_cbc["x-amz-cek-alg"] == "AES/CBC/PKCS5Padding"
  536. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_cbc)))
  537. assert "cache" == bucket.Object(object_key_cbc).cache_control
  538. assert 0 == len(query_json_file(tmp_gcm.name, "customer_id", "12345"))
  539. assert 1 == len(query_json_file(tmp_gcm.name, "customer_id", "23456"))
  540. assert 1 == len(query_json_file(tmp_gcm.name, "customer_id", "34567"))
  541. assert metadata_gcm["x-amz-cek-alg"] == "AES/GCM/NoPadding"
  542. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_gcm)))
  543. assert "cache" == bucket.Object(object_key_gcm).cache_control
  544. def test_it_runs_for_json_composite_matches(
  545. del_queue_factory,
  546. job_factory,
  547. dummy_lake,
  548. glue_data_mapper_factory,
  549. data_loader,
  550. job_complete_waiter,
  551. job_table,
  552. ):
  553. # Arrange
  554. glue_data_mapper_factory(
  555. "test",
  556. partition_keys=["year", "month", "day"],
  557. partitions=[["2019", "08", "20"]],
  558. fmt="json",
  559. )
  560. composite_item = del_queue_factory(
  561. [
  562. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  563. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  564. ],
  565. "id123",
  566. matchid_type="Composite",
  567. data_mappers=["test"],
  568. )
  569. object_key = "test/2019/08/20/test.json"
  570. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  571. bucket = dummy_lake["bucket"]
  572. job_id = job_factory(del_queue_items=[composite_item])["Id"]
  573. # Act
  574. job_complete_waiter.wait(
  575. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  576. )
  577. # Assert
  578. tmp = tempfile.NamedTemporaryFile()
  579. bucket.download_file(object_key, tmp.name)
  580. assert (
  581. "COMPLETED"
  582. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  583. )
  584. assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
  585. assert 1 == len(query_json_file(tmp.name, "customer_id", "23456"))
  586. assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
  587. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  588. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  589. assert "cache" == bucket.Object(object_key).cache_control
  590. def test_it_runs_for_json_mixed_matches(
  591. del_queue_factory,
  592. job_factory,
  593. dummy_lake,
  594. glue_data_mapper_factory,
  595. data_loader,
  596. job_complete_waiter,
  597. job_table,
  598. ):
  599. # Arrange
  600. glue_data_mapper_factory(
  601. "test",
  602. partition_keys=["year", "month", "day"],
  603. partitions=[["2019", "08", "20"]],
  604. fmt="json",
  605. )
  606. composite_item = del_queue_factory(
  607. [
  608. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  609. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  610. ],
  611. "id123",
  612. matchid_type="Composite",
  613. data_mappers=["test"],
  614. )
  615. simple_item = del_queue_factory("23456", "id234")
  616. object_key = "test/2019/08/20/test.json"
  617. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  618. bucket = dummy_lake["bucket"]
  619. job_id = job_factory(del_queue_items=[composite_item, simple_item])["Id"]
  620. # Act
  621. job_complete_waiter.wait(
  622. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  623. )
  624. # Assert
  625. tmp = tempfile.NamedTemporaryFile()
  626. bucket.download_file(object_key, tmp.name)
  627. assert (
  628. "COMPLETED"
  629. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  630. )
  631. assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
  632. assert 0 == len(query_json_file(tmp.name, "customer_id", "23456"))
  633. assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
  634. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  635. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  636. assert "cache" == bucket.Object(object_key).cache_control
  637. def test_it_runs_for_unpartitioned_data(
  638. del_queue_factory,
  639. job_factory,
  640. dummy_lake,
  641. glue_data_mapper_factory,
  642. data_loader,
  643. job_complete_waiter,
  644. job_table,
  645. ):
  646. # Arrange
  647. glue_data_mapper_factory("test")
  648. item = del_queue_factory("12345")
  649. object_key = "test/test.parquet"
  650. data_loader(
  651. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  652. )
  653. bucket = dummy_lake["bucket"]
  654. job_id = job_factory(del_queue_items=[item])["Id"]
  655. # Act
  656. job_complete_waiter.wait(
  657. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  658. )
  659. # Assert
  660. tmp = tempfile.NamedTemporaryFile()
  661. bucket.download_fileobj(object_key, tmp)
  662. assert (
  663. "COMPLETED"
  664. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  665. )
  666. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  667. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  668. assert "cache" == bucket.Object(object_key).cache_control
  669. def test_it_runs_for_complex_types(
  670. del_queue_factory,
  671. job_factory,
  672. dummy_lake,
  673. glue_data_mapper_factory,
  674. data_loader,
  675. job_complete_waiter,
  676. job_table,
  677. ):
  678. # Arrange
  679. glue_data_mapper_factory(
  680. "test", column_identifiers=["user_info.personal_information.first_name"]
  681. )
  682. item = del_queue_factory("John")
  683. object_key = "test/test.parquet"
  684. data_loader(
  685. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  686. )
  687. bucket = dummy_lake["bucket"]
  688. job_id = job_factory(del_queue_items=[item])["Id"]
  689. # Act
  690. job_complete_waiter.wait(
  691. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  692. )
  693. # Assert
  694. tmp = tempfile.NamedTemporaryFile()
  695. bucket.download_fileobj(object_key, tmp)
  696. assert (
  697. "COMPLETED"
  698. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  699. )
  700. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  701. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  702. assert "cache" == bucket.Object(object_key).cache_control
  703. def test_it_runs_for_partitioned_data_with_non_string_partitions(
  704. del_queue_factory,
  705. job_factory,
  706. dummy_lake,
  707. glue_data_mapper_factory,
  708. data_loader,
  709. job_complete_waiter,
  710. job_table,
  711. ):
  712. # Arrange
  713. glue_data_mapper_factory(
  714. "test",
  715. partition_keys=["year", "month", "day"],
  716. partitions=[["2019", "10", "20"]],
  717. partition_key_types="int",
  718. )
  719. item = del_queue_factory("12345")
  720. object_key = "test/2019/10/20/test.parquet"
  721. data_loader(
  722. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  723. )
  724. bucket = dummy_lake["bucket"]
  725. job_id = job_factory(del_queue_items=[item])["Id"]
  726. # Act
  727. job_complete_waiter.wait(
  728. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  729. )
  730. # Assert
  731. tmp = tempfile.NamedTemporaryFile()
  732. bucket.download_fileobj(object_key, tmp)
  733. assert (
  734. "COMPLETED"
  735. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  736. )
  737. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  738. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  739. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  740. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  741. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  742. assert "cache" == bucket.Object(object_key).cache_control
  743. def test_it_runs_for_caselowered_identifier_parquet(
  744. del_queue_factory,
  745. job_factory,
  746. dummy_lake,
  747. glue_data_mapper_factory,
  748. data_loader,
  749. job_complete_waiter,
  750. job_table,
  751. ):
  752. # Arrange
  753. glue_data_mapper_factory("test", column_identifiers=["customerid"])
  754. item = del_queue_factory(12345)
  755. object_key = "test/test.parquet"
  756. data_loader(
  757. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  758. )
  759. bucket = dummy_lake["bucket"]
  760. job_id = job_factory(del_queue_items=[item])["Id"]
  761. # Act
  762. job_complete_waiter.wait(
  763. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  764. )
  765. # Assert
  766. tmp = tempfile.NamedTemporaryFile()
  767. bucket.download_fileobj(object_key, tmp)
  768. assert (
  769. "COMPLETED"
  770. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  771. )
  772. assert 0 == len(query_parquet_file(tmp, "customerId", 12345))
  773. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  774. assert "cache" == bucket.Object(object_key).cache_control
  775. def test_it_runs_for_decimal_identifier_parquet(
  776. del_queue_factory,
  777. job_factory,
  778. dummy_lake,
  779. glue_data_mapper_factory,
  780. data_loader,
  781. job_complete_waiter,
  782. job_table,
  783. ):
  784. # Arrange
  785. glue_data_mapper_factory("test", column_identifiers=["customer_id_decimal"])
  786. item = del_queue_factory("123.450")
  787. object_key = "test/test.parquet"
  788. data_loader(
  789. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  790. )
  791. bucket = dummy_lake["bucket"]
  792. job_id = job_factory(del_queue_items=[item])["Id"]
  793. # Act
  794. job_complete_waiter.wait(
  795. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  796. )
  797. # Assert
  798. tmp = tempfile.NamedTemporaryFile()
  799. bucket.download_fileobj(object_key, tmp)
  800. assert (
  801. "COMPLETED"
  802. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  803. )
  804. assert 0 == len(query_parquet_file(tmp, "customer_id_decimal", Decimal("123.450")))
  805. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  806. assert "cache" == bucket.Object(object_key).cache_control
  807. def test_it_does_not_permit_unversioned_buckets(
  808. del_queue_factory,
  809. job_factory,
  810. dummy_lake,
  811. glue_data_mapper_factory,
  812. data_loader,
  813. job_finished_waiter,
  814. job_table,
  815. s3_resource,
  816. ):
  817. try:
  818. # Arrange
  819. s3_resource.BucketVersioning(dummy_lake["bucket_name"]).suspend()
  820. glue_data_mapper_factory(
  821. "test",
  822. partition_keys=["year", "month", "day"],
  823. partitions=[["2019", "08", "20"]],
  824. )
  825. item = del_queue_factory("12345")
  826. object_key = "test/2019/08/20/test.parquet"
  827. data_loader("basic.parquet", object_key)
  828. bucket = dummy_lake["bucket"]
  829. job_id = job_factory(del_queue_items=[item], delete_previous_versions=False)[
  830. "Id"
  831. ]
  832. # Act
  833. job_finished_waiter.wait(
  834. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  835. )
  836. # Assert
  837. tmp = tempfile.NamedTemporaryFile()
  838. bucket.download_fileobj(object_key, tmp)
  839. assert (
  840. "FORGET_PARTIALLY_FAILED"
  841. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  842. )
  843. assert 1 == len(query_parquet_file(tmp, "customer_id", "12345"))
  844. finally:
  845. s3_resource.BucketVersioning(dummy_lake["bucket_name"]).enable()
  846. def test_it_executes_successfully_for_empty_queue(
  847. job_factory, job_finished_waiter, job_table
  848. ):
  849. # Arrange
  850. job_id = job_factory()["Id"]
  851. # Act
  852. job_finished_waiter.wait(
  853. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  854. )
  855. # Assert
  856. assert (
  857. "COMPLETED"
  858. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  859. )
  860. def test_it_supports_data_access_roles(
  861. del_queue_factory,
  862. job_factory,
  863. dummy_lake,
  864. glue_data_mapper_factory,
  865. data_loader,
  866. job_complete_waiter,
  867. job_table,
  868. data_access_role,
  869. ):
  870. # Arrange
  871. glue_data_mapper_factory(
  872. "test",
  873. partition_keys=["year", "month", "day"],
  874. partitions=[["2019", "08", "20"]],
  875. delete_old_versions=False,
  876. role_arn=data_access_role["Arn"],
  877. )
  878. item = del_queue_factory("12345")
  879. object_key = "test/2019/08/20/test.parquet"
  880. data_loader("basic.parquet", object_key)
  881. bucket = dummy_lake["bucket"]
  882. job_id = job_factory(del_queue_items=[item])["Id"]
  883. # Act
  884. job_complete_waiter.wait(
  885. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  886. )
  887. # Assert
  888. tmp = tempfile.NamedTemporaryFile()
  889. bucket.download_fileobj(object_key, tmp)
  890. assert (
  891. "COMPLETED"
  892. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  893. )
  894. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  895. def test_it_deletes_old_versions(
  896. del_queue_factory,
  897. job_factory,
  898. dummy_lake,
  899. glue_data_mapper_factory,
  900. data_loader,
  901. job_complete_waiter,
  902. job_table,
  903. data_access_role,
  904. ):
  905. # Arrange
  906. glue_data_mapper_factory(
  907. "test",
  908. partition_keys=["year", "month", "day"],
  909. partitions=[["2019", "08", "20"]],
  910. delete_old_versions=True,
  911. role_arn=data_access_role["Arn"],
  912. )
  913. item = del_queue_factory("12345")
  914. object_key = "test/2019/08/20/test.parquet"
  915. bucket = dummy_lake["bucket"]
  916. # Create the object, add a deletion marker, then recreate it
  917. data_loader("basic.parquet", object_key)
  918. bucket.Object("basic.parquet").delete()
  919. data_loader("basic.parquet", object_key)
  920. job_id = job_factory(del_queue_items=[item])["Id"]
  921. # Act
  922. job_complete_waiter.wait(
  923. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  924. )
  925. # Assert
  926. tmp = tempfile.NamedTemporaryFile()
  927. bucket.download_fileobj(object_key, tmp)
  928. assert (
  929. "COMPLETED"
  930. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  931. )
  932. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  933. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  934. def test_it_ignores_not_found_exceptions(
  935. del_queue_factory,
  936. job_factory,
  937. dummy_lake,
  938. glue_data_mapper_factory,
  939. data_loader,
  940. job_complete_waiter,
  941. job_table,
  942. data_access_role,
  943. find_phase_ended_waiter,
  944. ):
  945. # Arrange
  946. glue_data_mapper_factory(
  947. "test",
  948. partition_keys=["year", "month", "day"],
  949. partitions=[["2019", "08", "20"]],
  950. ignore_object_not_found_exceptions=True,
  951. role_arn=data_access_role["Arn"],
  952. )
  953. item = del_queue_factory("12345")
  954. object_key = "test/2019/08/20/test.parquet"
  955. bucket = dummy_lake["bucket"]
  956. data_loader("basic.parquet", object_key)
  957. # Start job, wait for find phase to end, delete object
  958. job_id = job_factory(del_queue_items=[item])["Id"]
  959. find_phase_ended_waiter.wait(job_id)
  960. bucket.Object(key=object_key).delete()
  961. # Act
  962. job_complete_waiter.wait(
  963. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  964. )
  965. # Assert
  966. job = job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]
  967. assert "COMPLETED" == job["JobStatus"]
  968. assert 0 == job["TotalObjectUpdatedCount"]
  969. assert 1 == job["TotalObjectUpdateSkippedCount"]
  970. assert 0 == job["TotalObjectUpdateFailedCount"]
  971. assert 0 == job["TotalObjectRollbackFailedCount"]
  972. def test_it_handles_find_permission_issues(
  973. del_queue_factory,
  974. job_factory,
  975. dummy_lake,
  976. glue_data_mapper_factory,
  977. data_loader,
  978. job_finished_waiter,
  979. job_table,
  980. policy_changer,
  981. stack,
  982. arn_partition,
  983. ):
  984. # Arrange
  985. glue_data_mapper_factory(
  986. "test",
  987. partition_keys=["year", "month", "day"],
  988. partitions=[["2019", "08", "20"]],
  989. )
  990. item = del_queue_factory("12345")
  991. object_key = "test/2019/08/20/test.parquet"
  992. data_loader("basic.parquet", object_key)
  993. bucket = dummy_lake["bucket"]
  994. bucket_name = dummy_lake["bucket_name"]
  995. policy_changer(
  996. {
  997. "Version": "2012-10-17",
  998. "Statement": [
  999. {
  1000. "Effect": "Deny",
  1001. "Principal": {"AWS": [stack["AthenaExecutionRoleArn"]]},
  1002. "Action": "s3:*",
  1003. "Resource": [
  1004. "arn:{}:s3:::{}".format(arn_partition, bucket_name),
  1005. "arn:{}:s3:::{}/*".format(arn_partition, bucket_name),
  1006. ],
  1007. }
  1008. ],
  1009. }
  1010. )
  1011. job_id = job_factory(del_queue_items=[item])["Id"]
  1012. # Act
  1013. job_finished_waiter.wait(
  1014. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1015. )
  1016. # Assert
  1017. assert (
  1018. "FIND_FAILED"
  1019. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1020. )
  1021. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1022. def test_it_handles_forget_permission_issues(
  1023. del_queue_factory,
  1024. job_factory,
  1025. dummy_lake,
  1026. glue_data_mapper_factory,
  1027. data_loader,
  1028. job_finished_waiter,
  1029. job_table,
  1030. policy_changer,
  1031. stack,
  1032. arn_partition,
  1033. ):
  1034. # Arrange
  1035. glue_data_mapper_factory(
  1036. "test",
  1037. partition_keys=["year", "month", "day"],
  1038. partitions=[["2019", "08", "20"]],
  1039. )
  1040. item = del_queue_factory("12345")
  1041. object_key = "test/2019/08/20/test.parquet"
  1042. data_loader("basic.parquet", object_key)
  1043. bucket = dummy_lake["bucket"]
  1044. bucket_name = dummy_lake["bucket_name"]
  1045. policy = json.loads(dummy_lake["policy"].policy)
  1046. policy["Statement"].append(
  1047. {
  1048. "Effect": "Deny",
  1049. "Principal": {"AWS": [stack["DeleteTaskRoleArn"]]},
  1050. "Action": "s3:*",
  1051. "Resource": [
  1052. "arn:{}:s3:::{}".format(arn_partition, bucket_name),
  1053. "arn:{}:s3:::{}/*".format(arn_partition, bucket_name),
  1054. ],
  1055. }
  1056. )
  1057. policy_changer(policy)
  1058. job_id = job_factory(del_queue_items=[item])["Id"]
  1059. # Act
  1060. job_finished_waiter.wait(
  1061. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1062. )
  1063. # Assert
  1064. assert (
  1065. "FORGET_PARTIALLY_FAILED"
  1066. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1067. )
  1068. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1069. def test_it_handles_forget_invalid_role(
  1070. del_queue_factory,
  1071. job_factory,
  1072. dummy_lake,
  1073. glue_data_mapper_factory,
  1074. data_loader,
  1075. job_finished_waiter,
  1076. job_table,
  1077. arn_partition,
  1078. ):
  1079. # Arrange
  1080. glue_data_mapper_factory(
  1081. "test",
  1082. partition_keys=["year", "month", "day"],
  1083. partitions=[["2019", "08", "20"]],
  1084. role_arn="arn:{}:iam::invalid:role/DoesntExist".format(arn_partition),
  1085. )
  1086. item = del_queue_factory("12345")
  1087. object_key = "test/2019/08/20/test.parquet"
  1088. data_loader("basic.parquet", object_key)
  1089. bucket = dummy_lake["bucket"]
  1090. job_id = job_factory(del_queue_items=[item])["Id"]
  1091. # Act
  1092. job_finished_waiter.wait(
  1093. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1094. )
  1095. # Assert
  1096. tmp = tempfile.NamedTemporaryFile()
  1097. bucket.download_fileobj(object_key, tmp)
  1098. assert (
  1099. "FORGET_PARTIALLY_FAILED"
  1100. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1101. )
  1102. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1103. def test_it_handles_json_all_rows_deleted(
  1104. del_queue_factory,
  1105. job_factory,
  1106. dummy_lake,
  1107. glue_data_mapper_factory,
  1108. data_loader,
  1109. job_complete_waiter,
  1110. job_table,
  1111. ):
  1112. # Arrange
  1113. glue_data_mapper_factory(
  1114. "test",
  1115. partition_keys=["year", "month", "day"],
  1116. partitions=[["2019", "08", "20"]],
  1117. fmt="json",
  1118. )
  1119. item = del_queue_factory("12345", "id1")
  1120. item2 = del_queue_factory("23456", "id2")
  1121. item3 = del_queue_factory("34567", "id3")
  1122. object_key = "test/2019/08/20/test.json"
  1123. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  1124. bucket = dummy_lake["bucket"]
  1125. job_id = job_factory(del_queue_items=[item, item2, item3])["Id"]
  1126. # Act
  1127. job_complete_waiter.wait(
  1128. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1129. )
  1130. # Assert
  1131. tmp = tempfile.NamedTemporaryFile()
  1132. bucket.download_file(object_key, tmp.name)
  1133. assert (
  1134. "COMPLETED"
  1135. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1136. )
  1137. content = open(tmp.name, "rb").read()
  1138. assert content == b"" # JSON is empty
  1139. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1140. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  1141. assert "cache" == bucket.Object(object_key).cache_control
  1142. def test_it_handles_parquet_all_rows_deleted(
  1143. del_queue_factory,
  1144. job_factory,
  1145. dummy_lake,
  1146. glue_data_mapper_factory,
  1147. data_loader,
  1148. job_complete_waiter,
  1149. job_table,
  1150. ):
  1151. # Arrange
  1152. glue_data_mapper_factory(
  1153. "test",
  1154. partition_keys=["year", "month", "day"],
  1155. partitions=[["2019", "08", "20"]],
  1156. )
  1157. item = del_queue_factory("12345", "id1")
  1158. item2 = del_queue_factory("23456", "id2")
  1159. item3 = del_queue_factory("34567", "id3")
  1160. object_key = "test/2019/08/20/test.parquet"
  1161. data_loader(
  1162. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  1163. )
  1164. bucket = dummy_lake["bucket"]
  1165. job_id = job_factory(del_queue_items=[item, item2, item3])["Id"]
  1166. # Act
  1167. job_complete_waiter.wait(
  1168. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1169. )
  1170. # Assert
  1171. tmp = tempfile.NamedTemporaryFile()
  1172. bucket.download_fileobj(object_key, tmp)
  1173. assert (
  1174. "COMPLETED"
  1175. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1176. )
  1177. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  1178. assert 0 == len(query_parquet_file(tmp, "customer_id", "23456"))
  1179. assert 0 == len(query_parquet_file(tmp, "customer_id", "34567"))
  1180. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1181. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  1182. assert "cache" == bucket.Object(object_key).cache_control