test_main.py 47 KB


  1. import os
  2. from io import BytesIO
  3. from argparse import Namespace
  4. from errno import ENOENT
  5. from os import strerror
  6. import boto3
  7. from botocore.exceptions import ClientError
  8. from mock import patch, MagicMock, ANY, call
  9. import pyarrow as pa
  10. import pytest
  11. from pyarrow.lib import ArrowException
  12. from s3 import DeleteOldVersionsError, IntegrityCheckFailedError
  13. with patch.dict(
  14. os.environ,
  15. {
  16. "DELETE_OBJECTS_QUEUE": "https://url/q.fifo",
  17. "DLQ": "https://url/q",
  18. },
  19. ):
  20. from backend.ecs_tasks.delete_files.main import (
  21. build_matches,
  22. kill_handler,
  23. execute,
  24. handle_error,
  25. handle_skip,
  26. get_queue,
  27. main,
  28. parse_args,
  29. delete_matches_from_file,
  30. )
  31. pytestmark = [pytest.mark.unit, pytest.mark.ecs_tasks]
  32. def get_list_object_versions_error():
  33. return ClientError(
  34. {
  35. "Error": {
  36. "Code": "InvalidArgument",
  37. "Message": "Invalid version id specified",
  38. }
  39. },
  40. "ListObjectVersions",
  41. )
  42. @patch.dict(os.environ, {"JobTable": "test"})
  43. @patch(
  44. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  45. MagicMock(return_value=True),
  46. )
  47. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  48. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  49. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  50. @patch("backend.ecs_tasks.delete_files.main.get_session")
  51. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  52. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  53. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event")
  54. @patch("backend.ecs_tasks.delete_files.main.save")
  55. @patch("backend.ecs_tasks.delete_files.main.build_matches")
  56. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  57. def test_happy_path_when_queue_not_empty(
  58. mock_get_object_info,
  59. mock_build_matches,
  60. mock_save,
  61. mock_emit,
  62. mock_delete,
  63. mock_fs,
  64. mock_session,
  65. mock_verify_integrity,
  66. message_stub,
  67. ):
  68. column = {"Column": "customer_id", "MatchIds": ["12345", "23456"]}
  69. mock_build_matches.return_value = [column]
  70. mock_fs.S3FileSystem.return_value = mock_fs
  71. mock_file = MagicMock()
  72. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  73. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  74. mock_get_object_info.return_value = {"Metadata": {}}, None
  75. mock_save.return_value = "new_version123"
  76. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  77. execute(
  78. "https://queue/url",
  79. message_stub(Object="s3://bucket/path/basic.parquet"),
  80. "receipt_handle",
  81. )
  82. mock_fs.open_input_stream.assert_called_with(
  83. "bucket/path/basic.parquet", buffer_size=5 * 2**20
  84. )
  85. mock_delete.assert_called_with(ANY, [column], "parquet", False)
  86. mock_save.assert_called_with(ANY, ANY, "bucket", "path/basic.parquet", {}, "abc123")
  87. mock_emit.assert_called()
  88. mock_session.assert_called_with(None, "s3f2")
  89. mock_verify_integrity.assert_called_with(
  90. ANY, "bucket", "path/basic.parquet", "abc123", "new_version123"
  91. )
  92. buf = mock_save.call_args[0][1]
  93. assert buf.read
  94. assert isinstance(buf, pa.BufferReader) # must be BufferReader for zero-copy
  95. @patch.dict(os.environ, {"JobTable": "test"})
  96. @patch(
  97. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  98. MagicMock(return_value=True),
  99. )
  100. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  101. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  102. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  103. @patch("backend.ecs_tasks.delete_files.main.get_session")
  104. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  105. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  106. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event")
  107. @patch("backend.ecs_tasks.delete_files.main.save")
  108. @patch("backend.ecs_tasks.delete_files.main.build_matches")
  109. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  110. def test_happy_path_when_queue_not_empty_for_compressed_json(
  111. mock_get_object_info,
  112. mock_build_matches,
  113. mock_save,
  114. mock_emit,
  115. mock_delete,
  116. mock_fs,
  117. mock_session,
  118. mock_verify_integrity,
  119. message_stub,
  120. ):
  121. column = {"Column": "customer_id", "MatchIds": ["12345", "23456"]}
  122. mock_build_matches.return_value = [column]
  123. mock_fs.S3FileSystem.return_value = mock_fs
  124. mock_file = MagicMock()
  125. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  126. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  127. mock_get_object_info.return_value = {"Metadata": {}}, None
  128. mock_save.return_value = "new_version123"
  129. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  130. execute(
  131. "https://queue/url",
  132. message_stub(Object="s3://bucket/path/basic.json.gz", Format="json"),
  133. "receipt_handle",
  134. )
  135. mock_fs.open_input_stream.assert_called_with(
  136. "bucket/path/basic.json.gz", buffer_size=5 * 2**20
  137. )
  138. mock_delete.assert_called_with(ANY, [column], "json", True)
  139. mock_save.assert_called_with(ANY, ANY, "bucket", "path/basic.json.gz", {}, "abc123")
  140. mock_emit.assert_called()
  141. mock_session.assert_called_with(None, "s3f2")
  142. mock_verify_integrity.assert_called_with(
  143. ANY, "bucket", "path/basic.json.gz", "abc123", "new_version123"
  144. )
  145. buf = mock_save.call_args[0][1]
  146. assert buf.read
  147. assert isinstance(buf, pa.BufferReader) # must be BufferReader for zero-copy
  148. @patch.dict(os.environ, {"JobTable": "test"})
  149. @patch(
  150. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  151. MagicMock(return_value=True),
  152. )
  153. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  154. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  155. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  156. @patch("backend.ecs_tasks.delete_files.main.get_session")
  157. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  158. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  159. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event")
  160. @patch("backend.ecs_tasks.delete_files.main.save")
  161. @patch("backend.ecs_tasks.delete_files.main.build_matches")
  162. @patch("backend.ecs_tasks.delete_files.main.is_kms_cse_encrypted")
  163. @patch("backend.ecs_tasks.delete_files.main.encrypt")
  164. @patch("backend.ecs_tasks.delete_files.main.decrypt")
  165. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  166. def test_cse_kms_encrypted(
  167. mock_get_object_info,
  168. mock_decrypt,
  169. mock_encrypt,
  170. mock_is_encrypted,
  171. mock_build_matches,
  172. mock_save,
  173. mock_emit,
  174. mock_delete,
  175. mock_fs,
  176. mock_session,
  177. mock_verify_integrity,
  178. message_stub,
  179. ):
  180. column = {"Column": "customer_id", "MatchIds": ["12345", "23456"]}
  181. metadata = {"x-amz-wrap-alg": "kms", "x-amz-key-v2": "key123"}
  182. mock_build_matches.return_value = [column]
  183. mock_fs.S3FileSystem.return_value = mock_fs
  184. mock_file = MagicMock()
  185. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  186. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  187. mock_get_object_info.return_value = {"Metadata": metadata}, None
  188. mock_save.return_value = "new_version123"
  189. mock_file_decrypted = BytesIO(b"")
  190. mock_is_encrypted.return_value = True
  191. redacted = pa.BufferOutputStream()
  192. redacted_encrypted = BytesIO(b"")
  193. mock_delete.return_value = redacted, {"DeletedRows": 1}
  194. mock_decrypt.return_value = mock_file_decrypted
  195. mock_encrypt.return_value = redacted_encrypted, {"new_metadata": "foo"}
  196. execute(
  197. "https://queue/url",
  198. message_stub(Object="s3://bucket/path/basic.parquet"),
  199. "receipt_handle",
  200. )
  201. mock_is_encrypted.assert_called_with(metadata)
  202. mock_decrypt.assert_called_with(mock_file, metadata, ANY)
  203. mock_fs.open_input_stream.assert_called_with(
  204. "bucket/path/basic.parquet", buffer_size=5 * 2**20
  205. )
  206. mock_delete.assert_called_with(mock_file_decrypted, [column], "parquet", False)
  207. mock_encrypt.assert_called_with(ANY, metadata, ANY)
  208. mock_save.assert_called_with(
  209. ANY,
  210. redacted_encrypted,
  211. "bucket",
  212. "path/basic.parquet",
  213. {"new_metadata": "foo"},
  214. "abc123",
  215. )
  216. mock_emit.assert_called()
  217. mock_session.assert_called_with(None, "s3f2")
  218. mock_verify_integrity.assert_called_with(
  219. ANY, "bucket", "path/basic.parquet", "abc123", "new_version123"
  220. )
  221. @patch.dict(os.environ, {"JobTable": "test"})
  222. @patch(
  223. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  224. MagicMock(return_value=True),
  225. )
  226. @patch(
  227. "backend.ecs_tasks.delete_files.main.verify_object_versions_integrity",
  228. MagicMock(return_value=True),
  229. )
  230. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  231. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  232. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event", MagicMock())
  233. @patch("backend.ecs_tasks.delete_files.main.save", MagicMock())
  234. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  235. @patch("backend.ecs_tasks.delete_files.main.get_session")
  236. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  237. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  238. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  239. def test_it_assumes_role(
  240. mock_get_object_info, mock_delete, mock_fs, mock_session, message_stub
  241. ):
  242. mock_fs.S3FileSystem.return_value = mock_fs
  243. mock_file = MagicMock()
  244. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  245. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  246. mock_get_object_info.return_value = {"Metadata": {}}, None
  247. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  248. execute(
  249. "https://queue/url",
  250. message_stub(
  251. RoleArn="arn:aws:iam:account_id:role/rolename",
  252. Object="s3://bucket/path/basic.parquet",
  253. ),
  254. "receipt_handle",
  255. )
  256. mock_session.assert_called_with("arn:aws:iam:account_id:role/rolename", "s3f2")
  257. @patch.dict(os.environ, {"JobTable": "test"})
  258. @patch(
  259. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  260. MagicMock(return_value=True),
  261. )
  262. @patch(
  263. "backend.ecs_tasks.delete_files.main.verify_object_versions_integrity",
  264. MagicMock(return_value=True),
  265. )
  266. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  267. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  268. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event", MagicMock())
  269. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  270. @patch("backend.ecs_tasks.delete_files.main.save")
  271. @patch("backend.ecs_tasks.delete_files.main.delete_old_versions")
  272. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  273. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  274. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  275. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  276. def test_it_removes_old_versions(
  277. mock_get_object_info,
  278. mock_delete,
  279. mock_fs,
  280. mock_delete_versions,
  281. mock_save,
  282. message_stub,
  283. ):
  284. mock_fs.S3FileSystem.return_value = mock_fs
  285. mock_file = MagicMock()
  286. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  287. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  288. mock_get_object_info.return_value = {"Metadata": {}}, None
  289. mock_save.return_value = "new_version123"
  290. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  291. execute(
  292. "https://queue/url",
  293. message_stub(
  294. RoleArn="arn:aws:iam:account_id:role/rolename",
  295. DeleteOldVersions=True,
  296. Object="s3://bucket/path/basic.parquet",
  297. ),
  298. "receipt_handle",
  299. )
  300. mock_delete_versions.assert_called_with(ANY, ANY, ANY, "new_version123")
  301. @patch.dict(os.environ, {"JobTable": "test"})
  302. @patch(
  303. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  304. MagicMock(return_value=True),
  305. )
  306. @patch(
  307. "backend.ecs_tasks.delete_files.main.verify_object_versions_integrity",
  308. MagicMock(return_value=True),
  309. )
  310. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  311. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  312. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event", MagicMock())
  313. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  314. @patch("backend.ecs_tasks.delete_files.main.save")
  315. @patch("backend.ecs_tasks.delete_files.main.delete_old_versions")
  316. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  317. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  318. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  319. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  320. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  321. def test_it_handles_old_version_delete_failures(
  322. mock_get_object_info,
  323. mock_handle,
  324. mock_delete,
  325. mock_fs,
  326. mock_delete_versions,
  327. mock_save,
  328. message_stub,
  329. ):
  330. mock_fs.S3FileSystem.return_value = mock_fs
  331. mock_file = MagicMock()
  332. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  333. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  334. mock_get_object_info.return_value = {"Metadata": {}}, None
  335. mock_save.return_value = "new_version123"
  336. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  337. mock_delete_versions.side_effect = DeleteOldVersionsError(errors=["access denied"])
  338. execute(
  339. "https://queue/url",
  340. message_stub(
  341. RoleArn="arn:aws:iam:account_id:role/rolename",
  342. DeleteOldVersions=True,
  343. Object="s3://bucket/path/basic.parquet",
  344. ),
  345. "receipt_handle",
  346. )
  347. mock_handle.assert_called_with(
  348. ANY, ANY, "Unable to delete previous versions: access denied"
  349. )
  350. @patch.dict(os.environ, {"JobTable": "test"})
  351. @patch(
  352. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  353. MagicMock(return_value=True),
  354. )
  355. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  356. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  357. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  358. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  359. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  360. @patch("backend.ecs_tasks.delete_files.main.emit_deletion_event")
  361. @patch("backend.ecs_tasks.delete_files.main.save")
  362. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  363. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  364. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  365. def test_it_handles_no_deletions(
  366. mock_get_object_info,
  367. mock_handle,
  368. mock_save,
  369. mock_emit,
  370. mock_delete,
  371. mock_fs,
  372. message_stub,
  373. ):
  374. mock_fs.S3FileSystem.return_value = mock_fs
  375. mock_file = MagicMock()
  376. mock_file.metadata.return_value = {"VersionId": b"abc123"}
  377. mock_fs.open_input_stream.return_value.__enter__.return_value = mock_file
  378. mock_get_object_info.return_value = {"Metadata": {}}, None
  379. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 0}
  380. execute(
  381. "https://queue/url",
  382. message_stub(Object="s3://bucket/path/basic.parquet"),
  383. "receipt_handle",
  384. )
  385. mock_fs.open_input_stream.assert_called_with(
  386. "bucket/path/basic.parquet", buffer_size=5 * 2**20
  387. )
  388. mock_save.assert_not_called()
  389. mock_emit.assert_not_called()
  390. mock_handle.assert_called_with(
  391. ANY,
  392. ANY,
  393. "Unprocessable message: The object s3://bucket/path/basic.parquet "
  394. "was processed successfully but no rows required deletion",
  395. )
  396. @patch.dict(os.environ, {"JobTable": "test"})
  397. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  398. @patch(
  399. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  400. MagicMock(return_value=True),
  401. )
  402. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  403. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  404. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  405. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  406. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  407. @patch("backend.ecs_tasks.delete_files.main.build_matches")
  408. def test_it_handles_missing_col_exceptions(
  409. mock_build_matches, mock_error_handler, mock_delete, message_stub
  410. ):
  411. # Arrange
  412. mock_delete.side_effect = KeyError("FAIL")
  413. # Act
  414. execute("https://queue/url", message_stub(), "receipt_handle")
  415. # Assert
  416. mock_error_handler.assert_called_with(
  417. ANY, ANY, "Apache Arrow processing error: 'FAIL'"
  418. )
  419. @patch.dict(os.environ, {"JobTable": "test"})
  420. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  421. @patch(
  422. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  423. MagicMock(return_value=True),
  424. )
  425. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  426. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  427. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  428. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  429. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  430. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  431. def test_it_handles_arrow_exceptions(mock_error_handler, mock_delete, message_stub):
  432. # Arrange
  433. mock_delete.side_effect = ArrowException("FAIL")
  434. # Act
  435. execute("https://queue/url", message_stub(), "receipt_handle")
  436. # Assert
  437. mock_error_handler.assert_called_with(
  438. ANY, ANY, "Apache Arrow processing error: FAIL"
  439. )
  440. @patch.dict(os.environ, {"JobTable": "test"})
  441. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  442. @patch(
  443. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  444. MagicMock(return_value=True),
  445. )
  446. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  447. def test_it_validates_messages_with_missing_keys(mock_error_handler):
  448. # Act
  449. execute("https://queue/url", "{}", "receipt_handle")
  450. # Assert
  451. mock_error_handler.assert_called()
  452. @patch.dict(os.environ, {"JobTable": "test"})
  453. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  454. @patch(
  455. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  456. MagicMock(return_value=True),
  457. )
  458. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  459. def test_it_validates_messages_with_invalid_body(mock_error_handler):
  460. # Act
  461. execute("https://queue/url", "NOT JSON", "receipt_handle")
  462. mock_error_handler.assert_called()
  463. @patch.dict(os.environ, {"JobTable": "test"})
  464. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  465. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  466. @patch(
  467. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  468. MagicMock(return_value=True),
  469. )
  470. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  471. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  472. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  473. def test_it_handles_s3_permission_issues(mock_error_handler, mock_fs, message_stub):
  474. mock_fs.S3FileSystem.return_value = mock_fs
  475. mock_fs.open_input_stream.side_effect = ClientError({}, "GetObject")
  476. # Act
  477. execute("https://queue/url", message_stub(), "receipt_handle")
  478. # Assert
  479. msg = mock_error_handler.call_args[0][2]
  480. assert msg.startswith("ClientError:")
  481. @patch.dict(os.environ, {"JobTable": "test"})
  482. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  483. @patch(
  484. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  485. MagicMock(return_value=True),
  486. )
  487. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  488. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  489. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  490. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  491. def test_it_handles_io_errors(mock_error_handler, mock_fs, message_stub):
  492. # Arrange
  493. mock_fs.S3FileSystem.return_value = mock_fs
  494. mock_fs.open_input_stream.side_effect = IOError("an error")
  495. # Act
  496. execute("https://queue/url", message_stub(), "receipt_handle")
  497. # Assert
  498. mock_error_handler.assert_called_with(
  499. ANY, ANY, "Unable to retrieve object: an error"
  500. )
  501. @patch.dict(os.environ, {"JobTable": "test"})
  502. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  503. @patch(
  504. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  505. MagicMock(return_value=True),
  506. )
  507. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  508. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  509. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  510. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  511. def test_it_handles_file_too_big(mock_error_handler, mock_fs, message_stub):
  512. # Arrange
  513. mock_fs.S3FileSystem.return_value = mock_fs
  514. mock_fs.open_input_stream.side_effect = MemoryError("Too big")
  515. # Act
  516. execute("https://queue/url", message_stub(), "receipt_handle")
  517. # Assert
  518. mock_error_handler.assert_called_with(
  519. ANY, ANY, "Insufficient memory to work on object: Too big"
  520. )
  521. @patch.dict(os.environ, {"JobTable": "test"})
  522. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  523. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  524. @patch(
  525. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  526. MagicMock(return_value=True),
  527. )
  528. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  529. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  530. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  531. def test_it_does_not_ignore_not_found_error_by_default(
  532. mock_error_handler, mock_fs, message_stub
  533. ):
  534. mock_fs.S3FileSystem.return_value = mock_fs
  535. mock_fs.open_input_stream.side_effect = ClientError(
  536. {"Error": {"Code": "404"}}, "HeadObject"
  537. )
  538. # Act
  539. execute("https://queue/url", message_stub(), "receipt_handle")
  540. # Assert
  541. msg = mock_error_handler.call_args[0][2]
  542. assert msg.startswith("ClientError:")
  543. @patch.dict(os.environ, {"JobTable": "test"})
  544. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  545. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  546. @patch(
  547. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  548. MagicMock(return_value=True),
  549. )
  550. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  551. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  552. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  553. @patch("backend.ecs_tasks.delete_files.main.handle_skip")
  554. @patch("backend.ecs_tasks.delete_files.main.get_object_info")
  555. def test_it_ignores_boto_not_found_error_if_param_is_true(
  556. mock_get_object_info, mock_skip_handler, mock_error_handler, mock_fs, message_stub
  557. ):
  558. mock_fs.S3FileSystem.return_value = mock_fs
  559. mock_get_object_info.side_effect = ClientError(
  560. {"Error": {"Code": "404"}}, "HeadObject"
  561. )
  562. # Act
  563. execute(
  564. "https://queue/url",
  565. message_stub(IgnoreObjectNotFoundExceptions=True),
  566. "receipt_handle",
  567. )
  568. # Assert
  569. mock_error_handler.assert_not_called()
  570. msg = mock_skip_handler.call_args[0][2]
  571. assert msg.startswith("Ignored error: ClientError:")
  572. @patch.dict(os.environ, {"JobTable": "test"})
  573. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  574. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  575. @patch(
  576. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  577. MagicMock(return_value=True),
  578. )
  579. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  580. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  581. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  582. @patch("backend.ecs_tasks.delete_files.main.handle_skip")
  583. def test_it_ignores_arrow_not_found_error_if_param_is_true(
  584. mock_skip_handler, mock_error_handler, mock_fs, message_stub
  585. ):
  586. mock_fs.S3FileSystem.return_value = mock_fs
  587. mock_fs.open_input_stream.side_effect = FileNotFoundError(
  588. ENOENT, strerror(ENOENT), "bucket/key"
  589. )
  590. # Act
  591. execute(
  592. "https://queue/url",
  593. message_stub(IgnoreObjectNotFoundExceptions=True),
  594. "receipt_handle",
  595. )
  596. # Assert
  597. mock_error_handler.assert_not_called()
  598. msg = mock_skip_handler.call_args[0][2]
  599. assert msg.startswith("Ignored error: Apache Arrow S3FileSystem Error:")
  600. @patch.dict(os.environ, {"JobTable": "test"})
  601. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  602. @patch(
  603. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  604. MagicMock(return_value=True),
  605. )
  606. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  607. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  608. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  609. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  610. def test_it_handles_not_found_error(mock_error_handler, mock_fs, message_stub):
  611. # Arrange
  612. mock_fs.S3FileSystem.return_value = mock_fs
  613. mock_fs.open_input_stream.side_effect = FileNotFoundError(
  614. ENOENT, strerror(ENOENT), "bucket/key"
  615. )
  616. # Act
  617. execute("https://queue/url", message_stub(), "receipt_handle")
  618. # Assert
  619. mock_error_handler.assert_called_with(
  620. ANY,
  621. ANY,
  622. "Apache Arrow S3FileSystem Error: [Errno 2] No such file or directory: 'bucket/key'",
  623. )
  624. @patch.dict(os.environ, {"JobTable": "test"})
  625. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  626. @patch(
  627. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  628. MagicMock(return_value=True),
  629. )
  630. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  631. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  632. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  633. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  634. def test_it_handles_generic_error(mock_error_handler, mock_fs, message_stub):
  635. # Arrange
  636. mock_fs.S3FileSystem.return_value = mock_fs
  637. mock_fs.open_input_stream.side_effect = RuntimeError("Some Error")
  638. # Act
  639. execute("https://queue/url", message_stub(), "receipt_handle")
  640. # Assert
  641. mock_error_handler.assert_called_with(
  642. ANY, ANY, "Unknown error during message processing: Some Error"
  643. )
  644. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  645. @patch("backend.ecs_tasks.delete_files.main.validate_bucket_versioning")
  646. @patch("backend.ecs_tasks.delete_files.main.pa.fs")
  647. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  648. def test_it_handles_unversioned_buckets(
  649. mock_error_handler, mock_fs, mock_versioning, message_stub
  650. ):
  651. # Arrange
  652. mock_fs.S3FileSystem.return_value = mock_fs
  653. mock_versioning.side_effect = ValueError("Versioning validation Error")
  654. # Act
  655. execute("https://queue/url", message_stub(), "receipt_handle")
  656. # Assert
  657. mock_error_handler.assert_called_with(
  658. ANY, ANY, "Unprocessable message: Versioning validation Error"
  659. )
  660. mock_versioning.assert_called_with(ANY, "bucket")
  661. @patch.dict(os.environ, {"JobTable": "test"})
  662. @patch(
  663. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  664. MagicMock(return_value=True),
  665. )
  666. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  667. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  668. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  669. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  670. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  671. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  672. @patch("backend.ecs_tasks.delete_files.main.save")
  673. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  674. def test_it_provides_logs_for_acl_fail(
  675. mock_save, mock_error_handler, mock_delete, message_stub
  676. ):
  677. mock_save.side_effect = ClientError({}, "PutObjectAcl")
  678. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  679. execute("https://queue/url", message_stub(), "receipt_handle")
  680. mock_save.assert_called()
  681. mock_error_handler.assert_called_with(
  682. ANY,
  683. ANY,
  684. "ClientError: An error occurred (Unknown) when calling the PutObjectAcl "
  685. "operation: Unknown. Redacted object uploaded successfully but unable to "
  686. "restore WRITE ACL",
  687. )
  688. @patch.dict(os.environ, {"JobTable": "test"})
  689. @patch(
  690. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  691. MagicMock(return_value=True),
  692. )
  693. @patch(
  694. "backend.ecs_tasks.delete_files.main.save", MagicMock(return_value="new_version")
  695. )
  696. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  697. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  698. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  699. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  700. @patch("backend.ecs_tasks.delete_files.main.rollback_object_version")
  701. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  702. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  703. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  704. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  705. def test_it_provides_logs_for_failed_version_integrity_check_and_performs_rollback(
  706. mock_error_handler,
  707. mock_delete,
  708. mock_verify_integrity,
  709. rollback_mock,
  710. message_stub,
  711. ):
  712. mock_verify_integrity.side_effect = IntegrityCheckFailedError(
  713. "Some error", MagicMock(), "bucket", "path/basic.parquet", "new_version"
  714. )
  715. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  716. execute("https://queue/url", message_stub(), "receipt_handle")
  717. mock_verify_integrity.assert_called()
  718. mock_error_handler.assert_called_with(
  719. ANY, ANY, "Object version integrity check failed: Some error"
  720. )
  721. rollback_mock.assert_called_with(
  722. ANY, "bucket", "path/basic.parquet", "new_version", on_error=ANY
  723. )
  724. @patch.dict(os.environ, {"JobTable": "test"})
  725. @patch(
  726. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  727. MagicMock(return_value=True),
  728. )
  729. @patch(
  730. "backend.ecs_tasks.delete_files.main.save", MagicMock(return_value="new_version")
  731. )
  732. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  733. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  734. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  735. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  736. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  737. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  738. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  739. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  740. def test_it_provides_logs_for_get_latest_version_fail(
  741. mock_error_handler, mock_delete, mock_verify_integrity, message_stub
  742. ):
  743. mock_verify_integrity.side_effect = get_list_object_versions_error()
  744. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  745. execute("https://queue/url", message_stub(), "receipt_handle")
  746. mock_verify_integrity.assert_called()
  747. mock_error_handler.assert_called_with(
  748. ANY,
  749. ANY,
  750. "ClientError: An error occurred (InvalidArgument) when calling the "
  751. "ListObjectVersions operation: Invalid version id specified. Could "
  752. "not verify redacted object version integrity",
  753. )
  754. @patch.dict(os.environ, {"JobTable": "test"})
  755. @patch(
  756. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  757. MagicMock(return_value=True),
  758. )
  759. @patch(
  760. "backend.ecs_tasks.delete_files.main.save", MagicMock(return_value="new_version")
  761. )
  762. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  763. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  764. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  765. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  766. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  767. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  768. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  769. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  770. def test_it_provides_logs_for_failed_rollback_client_error(
  771. mock_error_handler, mock_delete, mock_verify_integrity, message_stub
  772. ):
  773. mock_s3 = MagicMock()
  774. mock_s3.delete_object.side_effect = ClientError({}, "DeleteObject")
  775. mock_verify_integrity.side_effect = IntegrityCheckFailedError(
  776. "Some error", mock_s3, "bucket", "test/basic.parquet", "new_version"
  777. )
  778. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  779. execute("https://queue/url", message_stub(), "receipt_handle")
  780. mock_verify_integrity.assert_called()
  781. assert mock_error_handler.call_args_list == [
  782. call(ANY, ANY, "Object version integrity check failed: Some error"),
  783. call(
  784. ANY,
  785. ANY,
  786. "ClientError: An error occurred (Unknown) when calling the DeleteObject operation: Unknown. "
  787. "Version rollback caused by version integrity conflict failed",
  788. "ObjectRollbackFailed",
  789. False,
  790. ),
  791. ]
  792. @patch.dict(os.environ, {"JobTable": "test"})
  793. @patch(
  794. "backend.ecs_tasks.delete_files.main.validate_bucket_versioning",
  795. MagicMock(return_value=True),
  796. )
  797. @patch(
  798. "backend.ecs_tasks.delete_files.main.save", MagicMock(return_value="new_version")
  799. )
  800. @patch("backend.ecs_tasks.delete_files.main.validate_message", MagicMock())
  801. @patch("backend.ecs_tasks.delete_files.main.pa.fs", MagicMock())
  802. @patch("backend.ecs_tasks.delete_files.main.get_session", MagicMock())
  803. @patch("backend.ecs_tasks.delete_files.main.get_queue", MagicMock())
  804. @patch("backend.ecs_tasks.delete_files.main.verify_object_versions_integrity")
  805. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_file")
  806. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  807. @patch("backend.ecs_tasks.delete_files.main.build_matches", MagicMock())
  808. def test_it_provides_logs_for_failed_rollback_generic_error(
  809. mock_error_handler, mock_delete, mock_verify_integrity, message_stub
  810. ):
  811. mock_s3 = MagicMock()
  812. mock_s3.delete_object.side_effect = Exception("error!!")
  813. mock_verify_integrity.side_effect = IntegrityCheckFailedError(
  814. "Some error", mock_s3, "bucket", "test/basic.parquet", "new_version"
  815. )
  816. mock_delete.return_value = pa.BufferOutputStream(), {"DeletedRows": 1}
  817. execute("https://queue/url", message_stub(), "receipt_handle")
  818. mock_verify_integrity.assert_called()
  819. assert mock_error_handler.call_args_list == [
  820. call(ANY, ANY, "Object version integrity check failed: Some error"),
  821. call(
  822. ANY,
  823. ANY,
  824. "Unknown error: error!!. Version rollback caused by version integrity conflict failed",
  825. "ObjectRollbackFailed",
  826. False,
  827. ),
  828. ]
  829. @patch("backend.ecs_tasks.delete_files.main.sanitize_message")
  830. @patch("backend.ecs_tasks.delete_files.main.emit_failure_event")
  831. def test_it_gracefully_handles_invalid_message_bodies(mock_emit, mock_sanitize):
  832. sqs_message = MagicMock()
  833. mock_emit.side_effect = ValueError("Bad message")
  834. handle_error(sqs_message, "{}", "Some error")
  835. # Verify it attempts to emit the failure
  836. mock_sanitize.assert_called()
  837. mock_emit.assert_called()
  838. # Verify even if emitting fails, the message visibility changes
  839. sqs_message.change_visibility.assert_called()
  840. @patch("backend.ecs_tasks.delete_files.main.sanitize_message")
  841. @patch("backend.ecs_tasks.delete_files.main.emit_failure_event")
  842. def test_it_gracefully_handles_invalid_job_id(mock_emit, mock_sanitize):
  843. sqs_message = MagicMock()
  844. mock_emit.side_effect = KeyError("Invalid Job ID")
  845. handle_error(sqs_message, "{}", "Some error")
  846. # Verify it attempts to emit the failure
  847. mock_sanitize.assert_called()
  848. mock_emit.assert_called()
  849. # Verify even if emitting fails, the message visibility changes
  850. sqs_message.change_visibility.assert_called()
  851. @patch("backend.ecs_tasks.delete_files.main.sanitize_message")
  852. @patch("backend.ecs_tasks.delete_files.main.emit_failure_event")
  853. def test_it_gracefully_handles_client_errors(mock_emit, mock_sanitize):
  854. sqs_message = MagicMock()
  855. mock_emit.side_effect = ClientError({}, "PutItem")
  856. handle_error(sqs_message, "{}", "Some error")
  857. # Verify it attempts to emit the failure
  858. mock_sanitize.assert_called()
  859. mock_emit.assert_called()
  860. # Verify even if emitting fails, the message visibility changes
  861. sqs_message.change_visibility.assert_called()
  862. @patch("backend.ecs_tasks.delete_files.main.sanitize_message")
  863. @patch("backend.ecs_tasks.delete_files.main.emit_failure_event")
  864. def test_it_doesnt_change_message_visibility_when_rollback_fails(
  865. mock_emit, mock_sanitize
  866. ):
  867. sqs_message = MagicMock()
  868. mock_emit.side_effect = ClientError({}, "DeleteObjectVersion")
  869. handle_error(sqs_message, "{}", "Some error", "ObjectRollbackFailed", False)
  870. # Verify it attempts to emit the failure
  871. mock_sanitize.assert_called()
  872. mock_emit.assert_called()
  873. # Verify that the visibility doesn't change for a rollback event
  874. sqs_message.change_visibility.assert_not_called()
  875. @patch("backend.ecs_tasks.delete_files.main.emit_failure_event")
  876. def test_it_gracefully_handles_change_message_visibility_failure(mock_emit):
  877. sqs_message = MagicMock()
  878. e = boto3.client("sqs").exceptions.ReceiptHandleIsInvalid
  879. sqs_message.meta.client.exceptions.MessageNotInflight = e
  880. sqs_message.meta.client.exceptions.ReceiptHandleIsInvalid = e
  881. sqs_message.change_visibility.side_effect = e({}, "ReceiptHandleIsInvalid")
  882. handle_error(sqs_message, "{}", "Some error")
  883. # Verify it attempts to emit the failure
  884. mock_emit.assert_called()
  885. sqs_message.change_visibility.assert_called() # Implicit graceful handling
  886. @patch("backend.ecs_tasks.delete_files.main.emit_failure_event")
  887. def test_error_handler(mock_emit):
  888. msg = MagicMock()
  889. handle_error(msg, "{}", "Test Error")
  890. mock_emit.assert_called_with("{}", "Test Error", "ObjectUpdateFailed")
  891. msg.change_visibility.assert_called_with(VisibilityTimeout=0)
  892. @patch("backend.ecs_tasks.delete_files.main.sanitize_message")
  893. @patch("backend.ecs_tasks.delete_files.main.emit_skipped_event")
  894. def test_skip_handler(mock_emit, mock_sanitize):
  895. sqs_message = MagicMock()
  896. handle_skip(
  897. sqs_message, {"Object": "s3://bucket/path/basic.parquet"}, "Ignored error"
  898. )
  899. # Verify it deletes the message
  900. sqs_message.delete.assert_called()
  901. # Verify it emits the skip event
  902. mock_sanitize.assert_called()
  903. mock_emit.assert_called()
  904. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  905. def test_kill_handler_cleans_up(mock_error_handler):
  906. with pytest.raises(SystemExit) as e:
  907. mock_pool = MagicMock()
  908. mock_msg = MagicMock()
  909. kill_handler([mock_msg], mock_pool)
  910. mock_pool.terminate.assert_called()
  911. mock_error_handler.assert_called()
  912. assert 1 == e.value.code
  913. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  914. def test_kill_handler_exits_successfully_when_done(mock_error_handler):
  915. with pytest.raises(SystemExit) as e:
  916. mock_pool = MagicMock()
  917. kill_handler([], mock_pool)
  918. mock_pool.terminate.assert_called()
  919. mock_error_handler.assert_not_called()
  920. assert 0 == e.value.code
  921. @patch("backend.ecs_tasks.delete_files.main.handle_error")
  922. def test_it_gracefully_handles_cleanup_issues(mock_error_handler):
  923. with pytest.raises(SystemExit):
  924. mock_pool = MagicMock()
  925. mock_msg = MagicMock()
  926. mock_error_handler.side_effect = ValueError()
  927. kill_handler([mock_msg, mock_msg], mock_pool)
  928. assert 2 == mock_error_handler.call_count
  929. mock_pool.terminate.assert_called()
  930. @patch.dict(os.environ, {"DELETE_OBJECTS_QUEUE": "https://queue/url"})
  931. def test_it_inits_arg_parser_with_defaults():
  932. res = parse_args([])
  933. assert isinstance(res, Namespace)
  934. assert all(
  935. [
  936. hasattr(res, attr)
  937. for attr in ["wait_time", "max_messages", "sleep_time", "queue_url"]
  938. ]
  939. )
  940. assert isinstance(res.wait_time, int)
  941. assert isinstance(res.max_messages, int)
  942. assert isinstance(res.sleep_time, int)
  943. assert isinstance(res.queue_url, str)
  944. @patch("backend.ecs_tasks.delete_files.main.boto3")
  945. @patch.dict(os.environ, {"AWS_DEFAULT_REGION": "eu-west-2"})
  946. @patch.dict(os.environ, {"AWS_URL_SUFFIX": "amazonaws.com"})
  947. def test_it_inits_queue_with_regional_url(mock_boto):
  948. get_queue("https://queue/rule")
  949. mock_boto.resource.assert_called_with(
  950. "sqs", endpoint_url="https://sqs.eu-west-2.amazonaws.com"
  951. )
  952. @patch("backend.ecs_tasks.delete_files.main.boto3")
  953. @patch("os.getenv", MagicMock(return_value=None))
  954. def test_it_uses_default_if_region_not_in_env(mock_boto):
  955. get_queue("https://queue/rule")
  956. mock_boto.resource.assert_called_with("sqs")
  957. @patch("backend.ecs_tasks.delete_files.main.boto3")
  958. def test_it_does_not_override_user_supplied_endpoint_url(mock_boto):
  959. get_queue("https://queue/rule", endpoint_url="https://my/url")
  960. mock_boto.resource.assert_called_with("sqs", endpoint_url="https://my/url")
  961. @patch("backend.ecs_tasks.delete_files.main.signal", MagicMock())
  962. @patch("backend.ecs_tasks.delete_files.main.Pool")
  963. @patch("backend.ecs_tasks.delete_files.main.get_queue")
  964. def test_it_starts_subprocesses(mock_queue, mock_pool):
  965. mock_queue.return_value = mock_queue
  966. mock_message = MagicMock()
  967. mock_queue.receive_messages.return_value = [mock_message]
  968. # Break out of while loop
  969. mock_pool.return_value = mock_pool
  970. mock_pool.__enter__.return_value = mock_pool
  971. mock_pool.starmap.side_effect = RuntimeError("Break loop")
  972. with pytest.raises(RuntimeError):
  973. main("https://queue/url", 1, 1, 1)
  974. mock_pool.assert_called_with(maxtasksperchild=1)
  975. mock_pool.starmap.assert_called_with(
  976. ANY, [("https://queue/url", mock_message.body, mock_message.receipt_handle)]
  977. )
  978. mock_queue.receive_messages.assert_called_with(
  979. WaitTimeSeconds=1, MaxNumberOfMessages=1
  980. )
  981. @patch("backend.ecs_tasks.delete_files.main.Pool", MagicMock())
  982. @patch("backend.ecs_tasks.delete_files.main.signal", MagicMock())
  983. @patch("backend.ecs_tasks.delete_files.main.get_queue")
  984. @patch("backend.ecs_tasks.delete_files.main.time")
  985. def test_it_sleeps_where_no_messages(mock_time, mock_queue):
  986. mock_queue.return_value = mock_queue
  987. mock_queue.receive_messages.return_value = []
  988. # Break out of while loop
  989. mock_time.sleep.side_effect = RuntimeError("Break Loop")
  990. with pytest.raises(RuntimeError):
  991. main("https://queue/url", 1, 1, 1)
  992. mock_time.sleep.assert_called_with(1)
  993. @patch("backend.ecs_tasks.delete_files.main.Pool", MagicMock())
  994. @patch("backend.ecs_tasks.delete_files.main.signal")
  995. @patch("backend.ecs_tasks.delete_files.main.get_queue")
  996. def test_it_sets_kill_handlers(mock_queue, mock_signal):
  997. mock_queue.return_value = mock_queue
  998. # Break out of while loop
  999. mock_queue.receive_messages.side_effect = RuntimeError("Break Loop")
  1000. with pytest.raises(RuntimeError):
  1001. main("https://queue/url", 1, 1, 1)
  1002. assert mock_signal.SIGINT, ANY == mock_signal.signal.call_args_list[0][0]
  1003. assert mock_signal.SIGTERM, ANY == mock_signal.signal.call_args_list[1][0]
  1004. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_json_file")
  1005. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_parquet_file")
  1006. def test_it_deletes_from_json_file(mock_parquet, mock_json):
  1007. f = MagicMock()
  1008. cols = MagicMock()
  1009. delete_matches_from_file(f, cols, "json", False)
  1010. mock_json.assert_called_with(f, cols, False)
  1011. mock_parquet.assert_not_called()
  1012. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_json_file")
  1013. @patch("backend.ecs_tasks.delete_files.main.delete_matches_from_parquet_file")
  1014. def test_it_deletes_from_parquet_file(mock_parquet, mock_json):
  1015. f = MagicMock()
  1016. cols = MagicMock()
  1017. delete_matches_from_file(f, cols, "parquet")
  1018. mock_parquet.assert_called_with(f, cols)
  1019. mock_json.assert_not_called()
  1020. @patch("backend.ecs_tasks.delete_files.main.fetch_manifest")
  1021. def test_it_builds_matches_grouped_by_column_simple(mock_fetch):
  1022. cols = [{"Column": "customer_id"}]
  1023. mock_fetch.return_value = (
  1024. '{"Columns":["customer_id"], "MatchId": ["12345"], "QueryableColumns": "customer_id"}\n'
  1025. '{"Columns":["customer_id"], "MatchId": ["23456"], "QueryableColumns": "customer_id"}\n'
  1026. )
  1027. matches = build_matches(cols, "s3://path-to-manifest.json")
  1028. assert matches == [
  1029. {"Column": "customer_id", "MatchIds": ["12345", "23456"]},
  1030. ]
  1031. @patch("backend.ecs_tasks.delete_files.main.fetch_manifest")
  1032. def test_it_builds_matches_grouped_by_column_composite(mock_fetch):
  1033. cols = [
  1034. {"Columns": ["first_name", "last_name"]},
  1035. ]
  1036. mock_fetch.return_value = (
  1037. '{"Columns":["first_name", "last_name"], "MatchId": ["john", "doe"], "QueryableColumns": "first_name_S3F2COMP_last_name"}\n'
  1038. '{"Columns":["first_name", "last_name"], "MatchId": ["jane", "doe"], "QueryableColumns": "first_name_S3F2COMP_last_name"}\n'
  1039. )
  1040. matches = build_matches(cols, "s3://path-to-manifest.json")
  1041. assert matches == [
  1042. {
  1043. "Columns": ["first_name", "last_name"],
  1044. "MatchIds": [["john", "doe"], ["jane", "doe"]],
  1045. },
  1046. ]
  1047. @patch("backend.ecs_tasks.delete_files.main.fetch_manifest")
  1048. def test_it_builds_matches_grouped_by_column_mixed(mock_fetch):
  1049. # example in which first_name and last_name are the col identifiers for given Data Mapper
  1050. cols = [
  1051. {"Columns": ["first_name", "last_name"]},
  1052. {"Column": "first_name"},
  1053. {"Column": "last_name"},
  1054. ]
  1055. # Simple => "smith" value to be searched in any column, Composite => particular tuples or single value ("parker")
  1056. mock_fetch.return_value = (
  1057. '{"Columns":["first_name", "last_name"], "MatchId": ["john", "doe"], "QueryableColumns": "first_name_S3F2COMP_last_name"}\n'
  1058. '{"Columns":["first_name", "last_name"], "MatchId": ["jane", "doe"], "QueryableColumns": "first_name_S3F2COMP_last_name"}\n'
  1059. '{"Columns":["first_name"], "MatchId": ["smith"], "QueryableColumns": "first_name"}\n'
  1060. '{"Columns":["last_name"], "MatchId": ["smith"], "QueryableColumns": "last_name"}\n'
  1061. '{"Columns":["last_name"], "MatchId": ["parker"], "QueryableColumns": "last_name"}\n'
  1062. )
  1063. matches = build_matches(cols, "s3://path-to-manifest.json")
  1064. assert matches == [
  1065. {
  1066. "Columns": ["first_name", "last_name"],
  1067. "MatchIds": [["john", "doe"], ["jane", "doe"]],
  1068. },
  1069. {"Column": "first_name", "MatchIds": ["smith"]},
  1070. {"Column": "last_name", "MatchIds": ["smith", "parker"]},
  1071. ]