test_job_iam.py 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241
  1. import json
  2. import tempfile
  3. import uuid
  4. import mock
  5. import pytest
  6. from decimal import Decimal
  7. from tests.acceptance import query_json_file, query_parquet_file, download_and_decrypt
  8. pytestmark = [
  9. pytest.mark.acceptance_iam,
  10. pytest.mark.jobs,
  11. pytest.mark.usefixtures("empty_jobs"),
  12. ]
  13. @pytest.mark.auth
  14. @pytest.mark.api
  15. def test_auth(api_client_iam, jobs_endpoint):
  16. headers = {"Authorization": None}
  17. assert (
  18. 403
  19. == api_client_iam.get(
  20. "{}/{}".format(jobs_endpoint, "a"), headers=headers
  21. ).status_code
  22. )
  23. assert 403 == api_client_iam.get(jobs_endpoint, headers=headers).status_code
  24. assert (
  25. 403
  26. == api_client_iam.get(
  27. "{}/{}/events".format(jobs_endpoint, "a"), headers=headers
  28. ).status_code
  29. )
  30. @pytest.mark.api
  31. def test_it_gets_jobs(
  32. api_client_iam, jobs_endpoint, job_factory, stack, job_table, job_exists_waiter
  33. ):
  34. # Arrange
  35. job_id = job_factory()["Id"]
  36. job_exists_waiter.wait(
  37. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  38. )
  39. # Act
  40. response = api_client_iam.get("{}/{}".format(jobs_endpoint, job_id))
  41. response_body = response.json()
  42. # Assert
  43. assert response.status_code == 200
  44. assert {
  45. "Id": job_id,
  46. "Sk": job_id,
  47. "Type": "Job",
  48. "JobStatus": mock.ANY,
  49. "GSIBucket": mock.ANY,
  50. "CreatedAt": mock.ANY,
  51. "AthenaConcurrencyLimit": mock.ANY,
  52. "AthenaQueryMaxRetries": mock.ANY,
  53. "DeletionTasksMaxNumber": mock.ANY,
  54. "QueryExecutionWaitSeconds": mock.ANY,
  55. "QueryQueueWaitSeconds": mock.ANY,
  56. "ForgetQueueWaitSeconds": mock.ANY,
  57. } == response_body
  58. assert (
  59. response.headers.get("Access-Control-Allow-Origin")
  60. == stack["APIAccessControlAllowOriginHeader"]
  61. )
  62. @pytest.mark.api
  63. def test_it_handles_unknown_jobs(api_client_iam, jobs_endpoint, stack):
  64. # Arrange
  65. job_id = "invalid"
  66. # Act
  67. response = api_client_iam.get("{}/{}".format(jobs_endpoint, job_id))
  68. # Assert
  69. assert response.status_code == 404
  70. assert (
  71. response.headers.get("Access-Control-Allow-Origin")
  72. == stack["APIAccessControlAllowOriginHeader"]
  73. )
  74. @pytest.mark.api
  75. def test_it_lists_jobs_by_date(
  76. api_client_iam, jobs_endpoint, job_factory, stack, job_table, job_exists_waiter
  77. ):
  78. # Arrange
  79. job_id_1 = job_factory(job_id=str(uuid.uuid4()), created_at=1576861489)["Id"]
  80. job_id_2 = job_factory(job_id=str(uuid.uuid4()), created_at=1576861490)["Id"]
  81. job_exists_waiter.wait(
  82. TableName=job_table.name, Key={"Id": {"S": job_id_1}, "Sk": {"S": job_id_1}}
  83. )
  84. job_exists_waiter.wait(
  85. TableName=job_table.name, Key={"Id": {"S": job_id_2}, "Sk": {"S": job_id_2}}
  86. )
  87. # Act
  88. response = api_client_iam.get(jobs_endpoint)
  89. response_body = response.json()
  90. # Assert
  91. assert response.status_code == 200
  92. assert (
  93. response_body["Jobs"][0]["CreatedAt"] >= response_body["Jobs"][1]["CreatedAt"]
  94. )
  95. assert (
  96. response.headers.get("Access-Control-Allow-Origin")
  97. == stack["APIAccessControlAllowOriginHeader"]
  98. )
  99. @pytest.mark.api
  100. def test_it_returns_summary_fields_in_list(
  101. api_client_iam, jobs_endpoint, job_factory, job_table, job_finished_waiter
  102. ):
  103. # Arrange
  104. job_id = job_factory(job_id=str(uuid.uuid4()), created_at=1576861489)["Id"]
  105. job_finished_waiter.wait(
  106. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  107. )
  108. # Act
  109. response = api_client_iam.get(jobs_endpoint)
  110. response_body = response.json()
  111. # Assert
  112. assert response.status_code == 200
  113. for job in response_body["Jobs"]:
  114. assert all(
  115. [
  116. k in job
  117. for k in [
  118. "Id",
  119. "CreatedAt",
  120. "JobStatus",
  121. "JobFinishTime",
  122. "JobStartTime",
  123. "TotalObjectRollbackFailedCount",
  124. "TotalObjectUpdatedCount",
  125. "TotalObjectUpdateSkippedCount",
  126. "TotalObjectUpdateFailedCount",
  127. "TotalQueryCount",
  128. "TotalQueryScannedInBytes",
  129. "TotalQuerySucceededCount",
  130. "TotalQueryTimeInMillis",
  131. ]
  132. ]
  133. )
  134. @pytest.mark.api
  135. def test_it_lists_job_events_by_date(
  136. api_client_iam, jobs_endpoint, job_factory, stack, job_table, job_finished_waiter
  137. ):
  138. # Arrange
  139. job_id = str(uuid.uuid4())
  140. job_id = job_factory(job_id=job_id, created_at=1576861489)["Id"]
  141. job_finished_waiter.wait(
  142. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  143. )
  144. # Act
  145. response = api_client_iam.get("{}/{}/events".format(jobs_endpoint, job_id))
  146. response_body = response.json()
  147. # Assert
  148. assert response.status_code == 200
  149. job_events = response_body["JobEvents"]
  150. assert len(job_events) > 0
  151. assert (
  152. response.headers.get("Access-Control-Allow-Origin")
  153. == stack["APIAccessControlAllowOriginHeader"]
  154. )
  155. assert all(
  156. job_events[i]["CreatedAt"] <= job_events[i + 1]["CreatedAt"]
  157. for i in range(len(job_events) - 1)
  158. )
  159. @pytest.mark.api
  160. def test_it_filters_job_events_by_event_name(
  161. api_client_iam, jobs_endpoint, job_factory, stack, job_table, job_finished_waiter
  162. ):
  163. # Arrange
  164. job_id = str(uuid.uuid4())
  165. job_id = job_factory(job_id=job_id, created_at=1576861489)["Id"]
  166. job_finished_waiter.wait(
  167. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  168. )
  169. # Act
  170. response = api_client_iam.get(
  171. "{}/{}/events?filter=EventName%3DFindPhaseStarted".format(jobs_endpoint, job_id)
  172. )
  173. response_body = response.json()
  174. job_events = response_body["JobEvents"]
  175. # Assert
  176. assert response.status_code == 200
  177. assert len(job_events) == 1
  178. assert "FindPhaseStarted" == job_events[0]["EventName"]
  179. assert (
  180. response.headers.get("Access-Control-Allow-Origin")
  181. == stack["APIAccessControlAllowOriginHeader"]
  182. )
  183. def test_it_runs_for_parquet_happy_path(
  184. del_queue_factory,
  185. job_factory,
  186. dummy_lake,
  187. glue_data_mapper_factory,
  188. data_loader,
  189. job_complete_waiter,
  190. job_table,
  191. ):
  192. # Arrange
  193. glue_data_mapper_factory(
  194. "test",
  195. partition_keys=["year", "month", "day"],
  196. partitions=[["2019", "08", "20"]],
  197. )
  198. item = del_queue_factory("12345")
  199. object_key = "test/2019/08/20/test.parquet"
  200. data_loader(
  201. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  202. )
  203. bucket = dummy_lake["bucket"]
  204. job_id = job_factory(del_queue_items=[item])["Id"]
  205. # Act
  206. job_complete_waiter.wait(
  207. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  208. )
  209. # Assert
  210. tmp = tempfile.NamedTemporaryFile()
  211. bucket.download_fileobj(object_key, tmp)
  212. assert (
  213. "COMPLETED"
  214. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  215. )
  216. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  217. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  218. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  219. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  220. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  221. assert "cache" == bucket.Object(object_key).cache_control
  222. def test_it_runs_for_parquet_cse_kms(
  223. del_queue_factory,
  224. job_factory,
  225. dummy_lake,
  226. glue_data_mapper_factory,
  227. encrypted_data_loader,
  228. job_complete_waiter,
  229. job_table,
  230. kms_factory,
  231. kms_client,
  232. ):
  233. # Arrange
  234. glue_data_mapper_factory(
  235. "test",
  236. partition_keys=["year", "month", "day"],
  237. partitions=[["2019", "08", "20"]],
  238. encrypted=True,
  239. )
  240. item = del_queue_factory("12345")
  241. encryption_key = kms_factory
  242. object_key_cbc = "test/2019/08/20/test_cbc.parquet"
  243. object_key_gcm = "test/2019/08/20/test_gcm.parquet"
  244. encrypted_data_loader(
  245. "basic.parquet",
  246. object_key_cbc,
  247. encryption_key,
  248. "AES/CBC/PKCS5Padding",
  249. CacheControl="cache",
  250. )
  251. encrypted_data_loader(
  252. "basic.parquet",
  253. object_key_gcm,
  254. encryption_key,
  255. "AES/GCM/NoPadding",
  256. CacheControl="cache",
  257. )
  258. bucket = dummy_lake["bucket"]
  259. job_id = job_factory(del_queue_items=[item])["Id"]
  260. # Act
  261. job_complete_waiter.wait(
  262. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  263. )
  264. decrypted_cbc, metadata_cbc = download_and_decrypt(
  265. bucket, object_key_cbc, kms_client
  266. )
  267. decrypted_gcm, metadata_gcm = download_and_decrypt(
  268. bucket, object_key_gcm, kms_client
  269. )
  270. tmp_cbc = tempfile.NamedTemporaryFile()
  271. tmp_gcm = tempfile.NamedTemporaryFile()
  272. open(tmp_cbc.name, "wb").write(decrypted_cbc)
  273. open(tmp_gcm.name, "wb").write(decrypted_gcm)
  274. # Assert
  275. assert (
  276. "COMPLETED"
  277. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  278. )
  279. assert 0 == len(query_parquet_file(tmp_cbc, "customer_id", "12345"))
  280. assert 1 == len(query_parquet_file(tmp_cbc, "customer_id", "23456"))
  281. assert 1 == len(query_parquet_file(tmp_cbc, "customer_id", "34567"))
  282. assert metadata_cbc["x-amz-cek-alg"] == "AES/CBC/PKCS5Padding"
  283. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_cbc)))
  284. assert "cache" == bucket.Object(object_key_cbc).cache_control
  285. assert 0 == len(query_parquet_file(tmp_gcm, "customer_id", "12345"))
  286. assert 1 == len(query_parquet_file(tmp_gcm, "customer_id", "23456"))
  287. assert 1 == len(query_parquet_file(tmp_gcm, "customer_id", "34567"))
  288. assert metadata_gcm["x-amz-cek-alg"] == "AES/GCM/NoPadding"
  289. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_gcm)))
  290. assert "cache" == bucket.Object(object_key_gcm).cache_control
  291. def test_it_runs_for_parquet_backwards_compatible_matches(
  292. del_queue_factory,
  293. job_factory,
  294. dummy_lake,
  295. glue_data_mapper_factory,
  296. data_loader,
  297. job_complete_waiter,
  298. job_table,
  299. ):
  300. # Arrange
  301. glue_data_mapper_factory(
  302. "test",
  303. partition_keys=["year", "month", "day"],
  304. partitions=[["2019", "08", "20"]],
  305. )
  306. # MatchId Type was introduced in 0.19 only and it should default to Simple
  307. item = del_queue_factory("12345", matchid_type=None)
  308. object_key = "test/2019/08/20/test.parquet"
  309. data_loader(
  310. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  311. )
  312. bucket = dummy_lake["bucket"]
  313. job_id = job_factory(del_queue_items=[item])["Id"]
  314. # Act
  315. job_complete_waiter.wait(
  316. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  317. )
  318. # Assert
  319. tmp = tempfile.NamedTemporaryFile()
  320. bucket.download_fileobj(object_key, tmp)
  321. assert (
  322. "COMPLETED"
  323. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  324. )
  325. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  326. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  327. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  328. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  329. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  330. assert "cache" == bucket.Object(object_key).cache_control
  331. def test_it_runs_for_parquet_composite_matches(
  332. del_queue_factory,
  333. job_factory,
  334. dummy_lake,
  335. glue_data_mapper_factory,
  336. data_loader,
  337. job_complete_waiter,
  338. job_table,
  339. ):
  340. # Arrange
  341. glue_data_mapper_factory(
  342. "test",
  343. partition_keys=["year", "month", "day"],
  344. partitions=[["2019", "08", "20"]],
  345. )
  346. item = del_queue_factory(
  347. [
  348. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  349. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  350. ],
  351. "id123",
  352. matchid_type="Composite",
  353. data_mappers=["test"],
  354. )
  355. object_key = "test/2019/08/20/test.parquet"
  356. data_loader(
  357. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  358. )
  359. bucket = dummy_lake["bucket"]
  360. job_id = job_factory(del_queue_items=[item])["Id"]
  361. # Act
  362. job_complete_waiter.wait(
  363. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  364. )
  365. # Assert
  366. tmp = tempfile.NamedTemporaryFile()
  367. bucket.download_fileobj(object_key, tmp)
  368. assert (
  369. "COMPLETED"
  370. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  371. )
  372. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  373. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  374. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  375. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  376. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  377. assert "cache" == bucket.Object(object_key).cache_control
  378. def test_it_runs_for_parquet_mixed_matches(
  379. del_queue_factory,
  380. job_factory,
  381. dummy_lake,
  382. glue_data_mapper_factory,
  383. data_loader,
  384. job_complete_waiter,
  385. job_table,
  386. ):
  387. # Arrange
  388. glue_data_mapper_factory(
  389. "test",
  390. partition_keys=["year", "month", "day"],
  391. partitions=[["2019", "08", "20"]],
  392. )
  393. composite_item = del_queue_factory(
  394. [
  395. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  396. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  397. ],
  398. "id123",
  399. matchid_type="Composite",
  400. data_mappers=["test"],
  401. )
  402. simple_item = del_queue_factory("23456", "id234")
  403. object_key = "test/2019/08/20/test.parquet"
  404. data_loader(
  405. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  406. )
  407. bucket = dummy_lake["bucket"]
  408. job_id = job_factory(del_queue_items=[composite_item, simple_item])["Id"]
  409. # Act
  410. job_complete_waiter.wait(
  411. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  412. )
  413. # Assert
  414. tmp = tempfile.NamedTemporaryFile()
  415. bucket.download_fileobj(object_key, tmp)
  416. assert (
  417. "COMPLETED"
  418. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  419. )
  420. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  421. assert 0 == len(query_parquet_file(tmp, "customer_id", "23456"))
  422. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  423. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  424. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  425. assert "cache" == bucket.Object(object_key).cache_control
  426. def test_it_runs_for_json_happy_path(
  427. del_queue_factory,
  428. job_factory,
  429. dummy_lake,
  430. glue_data_mapper_factory,
  431. data_loader,
  432. job_complete_waiter,
  433. job_table,
  434. ):
  435. # Arrange
  436. glue_data_mapper_factory(
  437. "test",
  438. partition_keys=["year", "month", "day"],
  439. partitions=[["2019", "08", "20"]],
  440. fmt="json",
  441. )
  442. item = del_queue_factory("12345")
  443. object_key = "test/2019/08/20/test.json"
  444. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  445. bucket = dummy_lake["bucket"]
  446. job_id = job_factory(del_queue_items=[item])["Id"]
  447. # Act
  448. job_complete_waiter.wait(
  449. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  450. )
  451. # Assert
  452. tmp = tempfile.NamedTemporaryFile()
  453. bucket.download_file(object_key, tmp.name)
  454. assert (
  455. "COMPLETED"
  456. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  457. )
  458. assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
  459. assert 1 == len(query_json_file(tmp.name, "customer_id", "23456"))
  460. assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
  461. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  462. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  463. assert "cache" == bucket.Object(object_key).cache_control
  464. def test_it_runs_for_json_cse_kms(
  465. del_queue_factory,
  466. job_factory,
  467. dummy_lake,
  468. glue_data_mapper_factory,
  469. encrypted_data_loader,
  470. job_complete_waiter,
  471. job_table,
  472. kms_factory,
  473. kms_client,
  474. ):
  475. # Arrange
  476. glue_data_mapper_factory(
  477. "test",
  478. partition_keys=["year", "month", "day"],
  479. partitions=[["2019", "08", "20"]],
  480. fmt="json",
  481. encrypted=True,
  482. )
  483. item = del_queue_factory("12345")
  484. encryption_key = kms_factory
  485. object_key_cbc = "test/2019/08/20/test_cbc.json"
  486. object_key_gcm = "test/2019/08/20/test_gcm.json"
  487. encrypted_data_loader(
  488. "basic.json",
  489. object_key_cbc,
  490. encryption_key,
  491. "AES/CBC/PKCS5Padding",
  492. CacheControl="cache",
  493. )
  494. encrypted_data_loader(
  495. "basic.json",
  496. object_key_gcm,
  497. encryption_key,
  498. "AES/GCM/NoPadding",
  499. CacheControl="cache",
  500. )
  501. bucket = dummy_lake["bucket"]
  502. job_id = job_factory(del_queue_items=[item])["Id"]
  503. # Act
  504. job_complete_waiter.wait(
  505. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  506. )
  507. decrypted_cbc, metadata_cbc = download_and_decrypt(
  508. bucket, object_key_cbc, kms_client
  509. )
  510. decrypted_gcm, metadata_gcm = download_and_decrypt(
  511. bucket, object_key_gcm, kms_client
  512. )
  513. tmp_cbc = tempfile.NamedTemporaryFile()
  514. tmp_gcm = tempfile.NamedTemporaryFile()
  515. open(tmp_cbc.name, "wb").write(decrypted_cbc)
  516. open(tmp_gcm.name, "wb").write(decrypted_gcm)
  517. # Assert
  518. assert (
  519. "COMPLETED"
  520. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  521. )
  522. assert 0 == len(query_json_file(tmp_cbc.name, "customer_id", "12345"))
  523. assert 1 == len(query_json_file(tmp_cbc.name, "customer_id", "23456"))
  524. assert 1 == len(query_json_file(tmp_cbc.name, "customer_id", "34567"))
  525. assert metadata_cbc["x-amz-cek-alg"] == "AES/CBC/PKCS5Padding"
  526. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_cbc)))
  527. assert "cache" == bucket.Object(object_key_cbc).cache_control
  528. assert 0 == len(query_json_file(tmp_gcm.name, "customer_id", "12345"))
  529. assert 1 == len(query_json_file(tmp_gcm.name, "customer_id", "23456"))
  530. assert 1 == len(query_json_file(tmp_gcm.name, "customer_id", "34567"))
  531. assert metadata_gcm["x-amz-cek-alg"] == "AES/GCM/NoPadding"
  532. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key_gcm)))
  533. assert "cache" == bucket.Object(object_key_gcm).cache_control
  534. def test_it_runs_for_json_composite_matches(
  535. del_queue_factory,
  536. job_factory,
  537. dummy_lake,
  538. glue_data_mapper_factory,
  539. data_loader,
  540. job_complete_waiter,
  541. job_table,
  542. ):
  543. # Arrange
  544. glue_data_mapper_factory(
  545. "test",
  546. partition_keys=["year", "month", "day"],
  547. partitions=[["2019", "08", "20"]],
  548. fmt="json",
  549. )
  550. composite_item = del_queue_factory(
  551. [
  552. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  553. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  554. ],
  555. "id123",
  556. matchid_type="Composite",
  557. data_mappers=["test"],
  558. )
  559. object_key = "test/2019/08/20/test.json"
  560. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  561. bucket = dummy_lake["bucket"]
  562. job_id = job_factory(del_queue_items=[composite_item])["Id"]
  563. # Act
  564. job_complete_waiter.wait(
  565. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  566. )
  567. # Assert
  568. tmp = tempfile.NamedTemporaryFile()
  569. bucket.download_file(object_key, tmp.name)
  570. assert (
  571. "COMPLETED"
  572. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  573. )
  574. assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
  575. assert 1 == len(query_json_file(tmp.name, "customer_id", "23456"))
  576. assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
  577. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  578. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  579. assert "cache" == bucket.Object(object_key).cache_control
  580. def test_it_runs_for_json_mixed_matches(
  581. del_queue_factory,
  582. job_factory,
  583. dummy_lake,
  584. glue_data_mapper_factory,
  585. data_loader,
  586. job_complete_waiter,
  587. job_table,
  588. ):
  589. # Arrange
  590. glue_data_mapper_factory(
  591. "test",
  592. partition_keys=["year", "month", "day"],
  593. partitions=[["2019", "08", "20"]],
  594. fmt="json",
  595. )
  596. composite_item = del_queue_factory(
  597. [
  598. {"Column": "user_info.personal_information.first_name", "Value": "John"},
  599. {"Column": "user_info.personal_information.last_name", "Value": "Doe"},
  600. ],
  601. "id123",
  602. matchid_type="Composite",
  603. data_mappers=["test"],
  604. )
  605. simple_item = del_queue_factory("23456", "id234")
  606. object_key = "test/2019/08/20/test.json"
  607. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  608. bucket = dummy_lake["bucket"]
  609. job_id = job_factory(del_queue_items=[composite_item, simple_item])["Id"]
  610. # Act
  611. job_complete_waiter.wait(
  612. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  613. )
  614. # Assert
  615. tmp = tempfile.NamedTemporaryFile()
  616. bucket.download_file(object_key, tmp.name)
  617. assert (
  618. "COMPLETED"
  619. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  620. )
  621. assert 0 == len(query_json_file(tmp.name, "customer_id", "12345"))
  622. assert 0 == len(query_json_file(tmp.name, "customer_id", "23456"))
  623. assert 1 == len(query_json_file(tmp.name, "customer_id", "34567"))
  624. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  625. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  626. assert "cache" == bucket.Object(object_key).cache_control
  627. def test_it_runs_for_unpartitioned_data(
  628. del_queue_factory,
  629. job_factory,
  630. dummy_lake,
  631. glue_data_mapper_factory,
  632. data_loader,
  633. job_complete_waiter,
  634. job_table,
  635. ):
  636. # Arrange
  637. glue_data_mapper_factory("test")
  638. item = del_queue_factory("12345")
  639. object_key = "test/test.parquet"
  640. data_loader(
  641. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  642. )
  643. bucket = dummy_lake["bucket"]
  644. job_id = job_factory(del_queue_items=[item])["Id"]
  645. # Act
  646. job_complete_waiter.wait(
  647. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  648. )
  649. # Assert
  650. tmp = tempfile.NamedTemporaryFile()
  651. bucket.download_fileobj(object_key, tmp)
  652. assert (
  653. "COMPLETED"
  654. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  655. )
  656. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  657. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  658. assert "cache" == bucket.Object(object_key).cache_control
  659. def test_it_runs_for_complex_types(
  660. del_queue_factory,
  661. job_factory,
  662. dummy_lake,
  663. glue_data_mapper_factory,
  664. data_loader,
  665. job_complete_waiter,
  666. job_table,
  667. ):
  668. # Arrange
  669. glue_data_mapper_factory(
  670. "test", column_identifiers=["user_info.personal_information.first_name"]
  671. )
  672. item = del_queue_factory("John")
  673. object_key = "test/test.parquet"
  674. data_loader(
  675. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  676. )
  677. bucket = dummy_lake["bucket"]
  678. job_id = job_factory(del_queue_items=[item])["Id"]
  679. # Act
  680. job_complete_waiter.wait(
  681. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  682. )
  683. # Assert
  684. tmp = tempfile.NamedTemporaryFile()
  685. bucket.download_fileobj(object_key, tmp)
  686. assert (
  687. "COMPLETED"
  688. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  689. )
  690. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  691. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  692. assert "cache" == bucket.Object(object_key).cache_control
  693. def test_it_runs_for_partitioned_data_with_non_string_partitions(
  694. del_queue_factory,
  695. job_factory,
  696. dummy_lake,
  697. glue_data_mapper_factory,
  698. data_loader,
  699. job_complete_waiter,
  700. job_table,
  701. ):
  702. # Arrange
  703. glue_data_mapper_factory(
  704. "test",
  705. partition_keys=["year", "month", "day"],
  706. partitions=[["2019", "10", "20"]],
  707. partition_key_types="int",
  708. )
  709. item = del_queue_factory("12345")
  710. object_key = "test/2019/10/20/test.parquet"
  711. data_loader(
  712. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  713. )
  714. bucket = dummy_lake["bucket"]
  715. job_id = job_factory(del_queue_items=[item])["Id"]
  716. # Act
  717. job_complete_waiter.wait(
  718. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  719. )
  720. # Assert
  721. tmp = tempfile.NamedTemporaryFile()
  722. bucket.download_fileobj(object_key, tmp)
  723. assert (
  724. "COMPLETED"
  725. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  726. )
  727. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  728. assert 1 == len(query_parquet_file(tmp, "customer_id", "23456"))
  729. assert 1 == len(query_parquet_file(tmp, "customer_id", "34567"))
  730. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  731. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  732. assert "cache" == bucket.Object(object_key).cache_control
  733. def test_it_runs_for_caselowered_identifier_parquet(
  734. del_queue_factory,
  735. job_factory,
  736. dummy_lake,
  737. glue_data_mapper_factory,
  738. data_loader,
  739. job_complete_waiter,
  740. job_table,
  741. ):
  742. # Arrange
  743. glue_data_mapper_factory("test", column_identifiers=["customerid"])
  744. item = del_queue_factory(12345)
  745. object_key = "test/test.parquet"
  746. data_loader(
  747. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  748. )
  749. bucket = dummy_lake["bucket"]
  750. job_id = job_factory(del_queue_items=[item])["Id"]
  751. # Act
  752. job_complete_waiter.wait(
  753. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  754. )
  755. # Assert
  756. tmp = tempfile.NamedTemporaryFile()
  757. bucket.download_fileobj(object_key, tmp)
  758. assert (
  759. "COMPLETED"
  760. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  761. )
  762. assert 0 == len(query_parquet_file(tmp, "customerId", 12345))
  763. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  764. assert "cache" == bucket.Object(object_key).cache_control
  765. def test_it_runs_for_decimal_identifier_parquet(
  766. del_queue_factory,
  767. job_factory,
  768. dummy_lake,
  769. glue_data_mapper_factory,
  770. data_loader,
  771. job_complete_waiter,
  772. job_table,
  773. ):
  774. # Arrange
  775. glue_data_mapper_factory("test", column_identifiers=["customer_id_decimal"])
  776. item = del_queue_factory("123.450")
  777. object_key = "test/test.parquet"
  778. data_loader(
  779. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  780. )
  781. bucket = dummy_lake["bucket"]
  782. job_id = job_factory(del_queue_items=[item])["Id"]
  783. # Act
  784. job_complete_waiter.wait(
  785. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  786. )
  787. # Assert
  788. tmp = tempfile.NamedTemporaryFile()
  789. bucket.download_fileobj(object_key, tmp)
  790. assert (
  791. "COMPLETED"
  792. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  793. )
  794. assert 0 == len(query_parquet_file(tmp, "customer_id_decimal", Decimal("123.450")))
  795. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  796. assert "cache" == bucket.Object(object_key).cache_control
  797. def test_it_does_not_permit_unversioned_buckets(
  798. del_queue_factory,
  799. job_factory,
  800. dummy_lake,
  801. glue_data_mapper_factory,
  802. data_loader,
  803. job_finished_waiter,
  804. job_table,
  805. s3_resource,
  806. ):
  807. try:
  808. # Arrange
  809. s3_resource.BucketVersioning(dummy_lake["bucket_name"]).suspend()
  810. glue_data_mapper_factory(
  811. "test",
  812. partition_keys=["year", "month", "day"],
  813. partitions=[["2019", "08", "20"]],
  814. )
  815. item = del_queue_factory("12345")
  816. object_key = "test/2019/08/20/test.parquet"
  817. data_loader("basic.parquet", object_key)
  818. bucket = dummy_lake["bucket"]
  819. job_id = job_factory(del_queue_items=[item], delete_previous_versions=False)[
  820. "Id"
  821. ]
  822. # Act
  823. job_finished_waiter.wait(
  824. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  825. )
  826. # Assert
  827. tmp = tempfile.NamedTemporaryFile()
  828. bucket.download_fileobj(object_key, tmp)
  829. assert (
  830. "FORGET_PARTIALLY_FAILED"
  831. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  832. )
  833. assert 1 == len(query_parquet_file(tmp, "customer_id", "12345"))
  834. finally:
  835. s3_resource.BucketVersioning(dummy_lake["bucket_name"]).enable()
  836. def test_it_executes_successfully_for_empty_queue(
  837. job_factory, job_finished_waiter, job_table
  838. ):
  839. # Arrange
  840. job_id = job_factory()["Id"]
  841. # Act
  842. job_finished_waiter.wait(
  843. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  844. )
  845. # Assert
  846. assert (
  847. "COMPLETED"
  848. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  849. )
  850. def test_it_supports_data_access_roles(
  851. del_queue_factory,
  852. job_factory,
  853. dummy_lake,
  854. glue_data_mapper_factory,
  855. data_loader,
  856. job_complete_waiter,
  857. job_table,
  858. data_access_role,
  859. ):
  860. # Arrange
  861. glue_data_mapper_factory(
  862. "test",
  863. partition_keys=["year", "month", "day"],
  864. partitions=[["2019", "08", "20"]],
  865. delete_old_versions=False,
  866. role_arn=data_access_role["Arn"],
  867. )
  868. item = del_queue_factory("12345")
  869. object_key = "test/2019/08/20/test.parquet"
  870. data_loader("basic.parquet", object_key)
  871. bucket = dummy_lake["bucket"]
  872. job_id = job_factory(del_queue_items=[item])["Id"]
  873. # Act
  874. job_complete_waiter.wait(
  875. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  876. )
  877. # Assert
  878. tmp = tempfile.NamedTemporaryFile()
  879. bucket.download_fileobj(object_key, tmp)
  880. assert (
  881. "COMPLETED"
  882. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  883. )
  884. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  885. def test_it_deletes_old_versions(
  886. del_queue_factory,
  887. job_factory,
  888. dummy_lake,
  889. glue_data_mapper_factory,
  890. data_loader,
  891. job_complete_waiter,
  892. job_table,
  893. data_access_role,
  894. ):
  895. # Arrange
  896. glue_data_mapper_factory(
  897. "test",
  898. partition_keys=["year", "month", "day"],
  899. partitions=[["2019", "08", "20"]],
  900. delete_old_versions=True,
  901. role_arn=data_access_role["Arn"],
  902. )
  903. item = del_queue_factory("12345")
  904. object_key = "test/2019/08/20/test.parquet"
  905. bucket = dummy_lake["bucket"]
  906. # Create the object, add a deletion marker, then recreate it
  907. data_loader("basic.parquet", object_key)
  908. bucket.Object("basic.parquet").delete()
  909. data_loader("basic.parquet", object_key)
  910. job_id = job_factory(del_queue_items=[item])["Id"]
  911. # Act
  912. job_complete_waiter.wait(
  913. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  914. )
  915. # Assert
  916. tmp = tempfile.NamedTemporaryFile()
  917. bucket.download_fileobj(object_key, tmp)
  918. assert (
  919. "COMPLETED"
  920. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  921. )
  922. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  923. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  924. def test_it_ignores_not_found_exceptions(
  925. del_queue_factory,
  926. job_factory,
  927. dummy_lake,
  928. glue_data_mapper_factory,
  929. data_loader,
  930. job_complete_waiter,
  931. job_table,
  932. data_access_role,
  933. find_phase_ended_waiter,
  934. ):
  935. # Arrange
  936. glue_data_mapper_factory(
  937. "test",
  938. partition_keys=["year", "month", "day"],
  939. partitions=[["2019", "08", "20"]],
  940. ignore_object_not_found_exceptions=True,
  941. role_arn=data_access_role["Arn"],
  942. )
  943. item = del_queue_factory("12345")
  944. object_key = "test/2019/08/20/test.parquet"
  945. bucket = dummy_lake["bucket"]
  946. data_loader("basic.parquet", object_key)
  947. # Start job, wait for find phase to end, delete object
  948. job_id = job_factory(del_queue_items=[item])["Id"]
  949. find_phase_ended_waiter.wait(job_id)
  950. bucket.Object(key=object_key).delete()
  951. # Act
  952. job_complete_waiter.wait(
  953. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  954. )
  955. # Assert
  956. job = job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]
  957. assert "COMPLETED" == job["JobStatus"]
  958. assert 0 == job["TotalObjectUpdatedCount"]
  959. assert 1 == job["TotalObjectUpdateSkippedCount"]
  960. assert 0 == job["TotalObjectUpdateFailedCount"]
  961. assert 0 == job["TotalObjectRollbackFailedCount"]
  962. def test_it_handles_find_permission_issues(
  963. del_queue_factory,
  964. job_factory,
  965. dummy_lake,
  966. glue_data_mapper_factory,
  967. data_loader,
  968. job_finished_waiter,
  969. job_table,
  970. policy_changer,
  971. stack,
  972. arn_partition,
  973. ):
  974. # Arrange
  975. glue_data_mapper_factory(
  976. "test",
  977. partition_keys=["year", "month", "day"],
  978. partitions=[["2019", "08", "20"]],
  979. )
  980. item = del_queue_factory("12345")
  981. object_key = "test/2019/08/20/test.parquet"
  982. data_loader("basic.parquet", object_key)
  983. bucket = dummy_lake["bucket"]
  984. bucket_name = dummy_lake["bucket_name"]
  985. policy_changer(
  986. {
  987. "Version": "2012-10-17",
  988. "Statement": [
  989. {
  990. "Effect": "Deny",
  991. "Principal": {"AWS": [stack["AthenaExecutionRoleArn"]]},
  992. "Action": "s3:*",
  993. "Resource": [
  994. "arn:{}:s3:::{}".format(arn_partition, bucket_name),
  995. "arn:{}:s3:::{}/*".format(arn_partition, bucket_name),
  996. ],
  997. }
  998. ],
  999. }
  1000. )
  1001. job_id = job_factory(del_queue_items=[item])["Id"]
  1002. # Act
  1003. job_finished_waiter.wait(
  1004. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1005. )
  1006. # Assert
  1007. assert (
  1008. "FIND_FAILED"
  1009. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1010. )
  1011. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1012. def test_it_handles_forget_permission_issues(
  1013. del_queue_factory,
  1014. job_factory,
  1015. dummy_lake,
  1016. glue_data_mapper_factory,
  1017. data_loader,
  1018. job_finished_waiter,
  1019. job_table,
  1020. policy_changer,
  1021. stack,
  1022. arn_partition,
  1023. ):
  1024. # Arrange
  1025. glue_data_mapper_factory(
  1026. "test",
  1027. partition_keys=["year", "month", "day"],
  1028. partitions=[["2019", "08", "20"]],
  1029. )
  1030. item = del_queue_factory("12345")
  1031. object_key = "test/2019/08/20/test.parquet"
  1032. data_loader("basic.parquet", object_key)
  1033. bucket = dummy_lake["bucket"]
  1034. bucket_name = dummy_lake["bucket_name"]
  1035. policy = json.loads(dummy_lake["policy"].policy)
  1036. policy["Statement"].append(
  1037. {
  1038. "Effect": "Deny",
  1039. "Principal": {"AWS": [stack["DeleteTaskRoleArn"]]},
  1040. "Action": "s3:*",
  1041. "Resource": [
  1042. "arn:{}:s3:::{}".format(arn_partition, bucket_name),
  1043. "arn:{}:s3:::{}/*".format(arn_partition, bucket_name),
  1044. ],
  1045. }
  1046. )
  1047. policy_changer(policy)
  1048. job_id = job_factory(del_queue_items=[item])["Id"]
  1049. # Act
  1050. job_finished_waiter.wait(
  1051. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1052. )
  1053. # Assert
  1054. assert (
  1055. "FORGET_PARTIALLY_FAILED"
  1056. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1057. )
  1058. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1059. def test_it_handles_forget_invalid_role(
  1060. del_queue_factory,
  1061. job_factory,
  1062. dummy_lake,
  1063. glue_data_mapper_factory,
  1064. data_loader,
  1065. job_finished_waiter,
  1066. job_table,
  1067. arn_partition,
  1068. ):
  1069. # Arrange
  1070. glue_data_mapper_factory(
  1071. "test",
  1072. partition_keys=["year", "month", "day"],
  1073. partitions=[["2019", "08", "20"]],
  1074. role_arn="arn:{}:iam::invalid:role/DoesntExist".format(arn_partition),
  1075. )
  1076. item = del_queue_factory("12345")
  1077. object_key = "test/2019/08/20/test.parquet"
  1078. data_loader("basic.parquet", object_key)
  1079. bucket = dummy_lake["bucket"]
  1080. job_id = job_factory(del_queue_items=[item])["Id"]
  1081. # Act
  1082. job_finished_waiter.wait(
  1083. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1084. )
  1085. # Assert
  1086. tmp = tempfile.NamedTemporaryFile()
  1087. bucket.download_fileobj(object_key, tmp)
  1088. assert (
  1089. "FORGET_PARTIALLY_FAILED"
  1090. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1091. )
  1092. assert 1 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1093. def test_it_handles_json_all_rows_deleted(
  1094. del_queue_factory,
  1095. job_factory,
  1096. dummy_lake,
  1097. glue_data_mapper_factory,
  1098. data_loader,
  1099. job_complete_waiter,
  1100. job_table,
  1101. ):
  1102. # Arrange
  1103. glue_data_mapper_factory(
  1104. "test",
  1105. partition_keys=["year", "month", "day"],
  1106. partitions=[["2019", "08", "20"]],
  1107. fmt="json",
  1108. )
  1109. item = del_queue_factory("12345", "id1")
  1110. item2 = del_queue_factory("23456", "id2")
  1111. item3 = del_queue_factory("34567", "id3")
  1112. object_key = "test/2019/08/20/test.json"
  1113. data_loader("basic.json", object_key, Metadata={"foo": "bar"}, CacheControl="cache")
  1114. bucket = dummy_lake["bucket"]
  1115. job_id = job_factory(del_queue_items=[item, item2, item3])["Id"]
  1116. # Act
  1117. job_complete_waiter.wait(
  1118. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1119. )
  1120. # Assert
  1121. tmp = tempfile.NamedTemporaryFile()
  1122. bucket.download_file(object_key, tmp.name)
  1123. assert (
  1124. "COMPLETED"
  1125. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1126. )
  1127. content = open(tmp.name, "rb").read()
  1128. assert content == b"" # JSON is empty
  1129. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1130. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  1131. assert "cache" == bucket.Object(object_key).cache_control
  1132. def test_it_handles_parquet_all_rows_deleted(
  1133. del_queue_factory,
  1134. job_factory,
  1135. dummy_lake,
  1136. glue_data_mapper_factory,
  1137. data_loader,
  1138. job_complete_waiter,
  1139. job_table,
  1140. ):
  1141. # Arrange
  1142. glue_data_mapper_factory(
  1143. "test",
  1144. partition_keys=["year", "month", "day"],
  1145. partitions=[["2019", "08", "20"]],
  1146. )
  1147. item = del_queue_factory("12345", "id1")
  1148. item2 = del_queue_factory("23456", "id2")
  1149. item3 = del_queue_factory("34567", "id3")
  1150. object_key = "test/2019/08/20/test.parquet"
  1151. data_loader(
  1152. "basic.parquet", object_key, Metadata={"foo": "bar"}, CacheControl="cache"
  1153. )
  1154. bucket = dummy_lake["bucket"]
  1155. job_id = job_factory(del_queue_items=[item, item2, item3])["Id"]
  1156. # Act
  1157. job_complete_waiter.wait(
  1158. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  1159. )
  1160. # Assert
  1161. tmp = tempfile.NamedTemporaryFile()
  1162. bucket.download_fileobj(object_key, tmp)
  1163. assert (
  1164. "COMPLETED"
  1165. == job_table.get_item(Key={"Id": job_id, "Sk": job_id})["Item"]["JobStatus"]
  1166. )
  1167. assert 0 == len(query_parquet_file(tmp, "customer_id", "12345"))
  1168. assert 0 == len(query_parquet_file(tmp, "customer_id", "23456"))
  1169. assert 0 == len(query_parquet_file(tmp, "customer_id", "34567"))
  1170. assert 2 == len(list(bucket.object_versions.filter(Prefix=object_key)))
  1171. assert {"foo": "bar"} == bucket.Object(object_key).metadata
  1172. assert "cache" == bucket.Object(object_key).cache_control