test_queue.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. import json
  2. import os
  3. from types import SimpleNamespace
  4. from decimal import Decimal
  5. import pytest
  6. from mock import patch, ANY
  7. with patch.dict(os.environ, {"DeletionQueueTable": "DeletionQueueTable"}):
  8. from backend.lambdas.queue import handlers
  9. pytestmark = [pytest.mark.unit, pytest.mark.api, pytest.mark.queue]
  10. autorization_mock = {
  11. "authorizer": {
  12. "claims": {"sub": "cognitoSub", "cognito:username": "cognitoUsername"}
  13. }
  14. }
  15. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  16. def test_it_retrieves_all_items(table):
  17. table.scan.return_value = {"Items": []}
  18. response = handlers.get_handler({}, SimpleNamespace())
  19. assert {
  20. "statusCode": 200,
  21. "body": json.dumps({"MatchIds": [], "NextStart": None}),
  22. "headers": ANY,
  23. } == response
  24. table.scan.assert_called_with(Limit=10)
  25. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  26. def test_it_retrieves_all_items_with_size_and_pagination(table):
  27. table.scan.return_value = {
  28. "Items": [
  29. {
  30. "DeletionQueueItemId": "id123",
  31. "MatchId": "foo",
  32. "DataMappers": [],
  33. "CreatedAt": 123456789,
  34. }
  35. ]
  36. }
  37. response = handlers.get_handler(
  38. {"queryStringParameters": {"page_size": "1", "start_at": "id000"}},
  39. SimpleNamespace(),
  40. )
  41. assert {
  42. "statusCode": 200,
  43. "body": json.dumps(
  44. {
  45. "MatchIds": [
  46. {
  47. "Type": "Simple",
  48. "DeletionQueueItemId": "id123",
  49. "MatchId": "foo",
  50. "DataMappers": [],
  51. "CreatedAt": 123456789,
  52. }
  53. ],
  54. "NextStart": "id123",
  55. }
  56. ),
  57. "headers": ANY,
  58. } == response
  59. table.scan.assert_called_with(
  60. Limit=1, ExclusiveStartKey={"DeletionQueueItemId": "id000"}
  61. )
  62. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  63. def test_it_adds_to_queue(table):
  64. response = handlers.enqueue_handler(
  65. {
  66. "body": json.dumps({"MatchId": "test", "DataMappers": ["a"]}),
  67. "requestContext": autorization_mock,
  68. },
  69. SimpleNamespace(),
  70. )
  71. assert 201 == response["statusCode"]
  72. assert {
  73. "DeletionQueueItemId": ANY,
  74. "MatchId": "test",
  75. "Type": "Simple",
  76. "CreatedAt": ANY,
  77. "DataMappers": ["a"],
  78. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  79. } == json.loads(response["body"])
  80. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  81. def test_it_adds_composite_to_queue(table):
  82. mid = [{"Column": "first_name", "Value": "test"}]
  83. response = handlers.enqueue_handler(
  84. {
  85. "body": json.dumps(
  86. {
  87. "MatchId": mid,
  88. "Type": "Composite",
  89. "DataMappers": ["a"],
  90. }
  91. ),
  92. "requestContext": autorization_mock,
  93. },
  94. SimpleNamespace(),
  95. )
  96. assert 201 == response["statusCode"]
  97. assert {
  98. "DeletionQueueItemId": ANY,
  99. "MatchId": mid,
  100. "Type": "Composite",
  101. "CreatedAt": ANY,
  102. "DataMappers": ["a"],
  103. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  104. } == json.loads(response["body"])
  105. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  106. def test_it_adds_batch_to_queue(table):
  107. response = handlers.enqueue_batch_handler(
  108. {
  109. "body": json.dumps(
  110. {
  111. "Matches": [
  112. {"MatchId": "test", "DataMappers": ["a"]},
  113. {"MatchId": "test2", "DataMappers": ["a"]},
  114. ]
  115. }
  116. ),
  117. "requestContext": autorization_mock,
  118. },
  119. SimpleNamespace(),
  120. )
  121. assert 201 == response["statusCode"]
  122. assert {
  123. "Matches": [
  124. {
  125. "DeletionQueueItemId": ANY,
  126. "MatchId": "test",
  127. "Type": "Simple",
  128. "CreatedAt": ANY,
  129. "DataMappers": ["a"],
  130. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  131. },
  132. {
  133. "DeletionQueueItemId": ANY,
  134. "MatchId": "test2",
  135. "Type": "Simple",
  136. "CreatedAt": ANY,
  137. "DataMappers": ["a"],
  138. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  139. },
  140. ]
  141. } == json.loads(response["body"])
  142. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  143. def test_it_provides_default_data_mappers(table):
  144. response = handlers.enqueue_handler(
  145. {
  146. "body": json.dumps(
  147. {
  148. "MatchId": "test",
  149. }
  150. ),
  151. "requestContext": autorization_mock,
  152. },
  153. SimpleNamespace(),
  154. )
  155. assert 201 == response["statusCode"]
  156. assert {
  157. "DeletionQueueItemId": ANY,
  158. "MatchId": "test",
  159. "Type": "Simple",
  160. "CreatedAt": ANY,
  161. "DataMappers": [],
  162. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  163. } == json.loads(response["body"])
  164. @patch("backend.lambdas.queue.handlers.running_job_exists")
  165. @patch("backend.lambdas.queue.handlers.deletion_queue_table")
  166. def test_it_cancels_deletions(table, mock_running_job):
  167. mock_running_job.return_value = False
  168. response = handlers.cancel_handler(
  169. {
  170. "body": json.dumps(
  171. {
  172. "Matches": [{"DeletionQueueItemId": "id123"}],
  173. }
  174. )
  175. },
  176. SimpleNamespace(),
  177. )
  178. assert {"statusCode": 204, "headers": ANY} == response
  179. @patch("backend.lambdas.queue.handlers.running_job_exists")
  180. def test_it_prevents_cancelling_whilst_running_jobs(mock_running_job):
  181. mock_running_job.return_value = True
  182. response = handlers.cancel_handler(
  183. {
  184. "body": json.dumps(
  185. {
  186. "Matches": [
  187. {
  188. "MatchId": "test",
  189. "CreatedAt": 123456789,
  190. }
  191. ],
  192. }
  193. )
  194. },
  195. SimpleNamespace(),
  196. )
  197. assert 400 == response["statusCode"]
  198. assert "headers" in response
  199. @patch("backend.lambdas.queue.handlers.bucket_count", 1)
  200. @patch("backend.lambdas.queue.handlers.uuid")
  201. @patch("backend.lambdas.queue.handlers.jobs_table")
  202. @patch("backend.lambdas.queue.handlers.running_job_exists")
  203. @patch("backend.lambdas.queue.handlers.get_config")
  204. def test_it_process_queue(mock_config, mock_running_job, job_table, uuid):
  205. mock_running_job.return_value = False
  206. mock_config.return_value = {
  207. "AthenaConcurrencyLimit": 15,
  208. "AthenaQueryMaxRetries": 2,
  209. "DeletionTasksMaxNumber": 50,
  210. "QueryExecutionWaitSeconds": 5,
  211. "QueryQueueWaitSeconds": 5,
  212. "ForgetQueueWaitSeconds": 30,
  213. }
  214. uuid.uuid4.return_value = 123
  215. response = handlers.process_handler(
  216. {"body": "", "requestContext": autorization_mock}, SimpleNamespace()
  217. )
  218. job_table.put_item.assert_called_with(
  219. Item={
  220. "Id": "123",
  221. "Sk": "123",
  222. "Type": "Job",
  223. "JobStatus": "QUEUED",
  224. "GSIBucket": "0",
  225. "CreatedAt": ANY,
  226. "AthenaConcurrencyLimit": 15,
  227. "AthenaQueryMaxRetries": 2,
  228. "DeletionTasksMaxNumber": 50,
  229. "QueryExecutionWaitSeconds": 5,
  230. "QueryQueueWaitSeconds": 5,
  231. "ForgetQueueWaitSeconds": 30,
  232. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  233. }
  234. )
  235. assert 202 == response["statusCode"]
  236. assert "headers" in response
  237. assert {
  238. "Id": "123",
  239. "Sk": "123",
  240. "Type": "Job",
  241. "JobStatus": "QUEUED",
  242. "GSIBucket": "0",
  243. "CreatedAt": ANY,
  244. "AthenaConcurrencyLimit": 15,
  245. "AthenaQueryMaxRetries": 2,
  246. "DeletionTasksMaxNumber": 50,
  247. "QueryExecutionWaitSeconds": 5,
  248. "QueryQueueWaitSeconds": 5,
  249. "ForgetQueueWaitSeconds": 30,
  250. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  251. } == json.loads(response["body"])
  252. @patch("backend.lambdas.queue.handlers.bucket_count", 1)
  253. @patch("backend.lambdas.queue.handlers.uuid")
  254. @patch("backend.lambdas.queue.handlers.jobs_table")
  255. @patch("backend.lambdas.queue.handlers.running_job_exists")
  256. @patch("backend.lambdas.queue.handlers.get_config")
  257. @patch("backend.lambdas.queue.handlers.utc_timestamp")
  258. def test_it_applies_expiry(mock_utc, mock_config, mock_running_job, job_table, uuid):
  259. mock_running_job.return_value = False
  260. mock_utc.return_value = 12346789
  261. mock_config.return_value = {
  262. "AthenaConcurrencyLimit": 15,
  263. "AthenaQueryMaxRetries": 2,
  264. "DeletionTasksMaxNumber": 50,
  265. "JobDetailsRetentionDays": 30,
  266. "QueryExecutionWaitSeconds": 5,
  267. "QueryQueueWaitSeconds": 5,
  268. "ForgetQueueWaitSeconds": 30,
  269. }
  270. uuid.uuid4.return_value = 123
  271. response = handlers.process_handler(
  272. {"body": "", "requestContext": autorization_mock}, SimpleNamespace()
  273. )
  274. mock_utc.assert_called_with(days=30)
  275. job_table.put_item.assert_called_with(
  276. Item={
  277. "Id": "123",
  278. "Sk": "123",
  279. "Type": "Job",
  280. "JobStatus": "QUEUED",
  281. "GSIBucket": "0",
  282. "CreatedAt": ANY,
  283. "Expires": 12346789,
  284. "AthenaConcurrencyLimit": 15,
  285. "AthenaQueryMaxRetries": 2,
  286. "DeletionTasksMaxNumber": 50,
  287. "QueryExecutionWaitSeconds": 5,
  288. "QueryQueueWaitSeconds": 5,
  289. "ForgetQueueWaitSeconds": 30,
  290. "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
  291. }
  292. )
  293. assert 202 == response["statusCode"]
  294. @patch("backend.lambdas.queue.handlers.running_job_exists")
  295. def test_it_prevents_concurrent_running_jobs(mock_running_job):
  296. mock_running_job.return_value = True
  297. response = handlers.process_handler(
  298. {"body": "", "requestContext": autorization_mock}, SimpleNamespace()
  299. )
  300. assert 400 == response["statusCode"]
  301. assert "headers" in response
  302. def test_it_validates_composite_queue_item_for_matchid_not_array():
  303. items = [
  304. {
  305. "Type": "Composite",
  306. "MatchId": "Test",
  307. "Columns": ["column"],
  308. "DataMappers": [],
  309. }
  310. ]
  311. with pytest.raises(ValueError) as e:
  312. handlers.validate_queue_items(items)
  313. assert e.value.args[0] == "MatchIds of Composite type need to be specified as array"
  314. def test_it_validates_composite_queue_item_for_matchid_empty_array():
  315. items = [
  316. {"Type": "Composite", "MatchId": [], "Columns": ["column"], "DataMappers": []}
  317. ]
  318. with pytest.raises(ValueError) as e:
  319. handlers.validate_queue_items(items)
  320. assert (
  321. e.value.args[0]
  322. == "MatchIds of Composite type need to have a value for at least one column"
  323. )
  324. def test_it_validates_composite_queue_item_for_data_mapper_empty():
  325. items = [
  326. {
  327. "Type": "Composite",
  328. "MatchId": [{"Column": "first_name", "Value": "Test"}],
  329. "Columns": ["column"],
  330. "DataMappers": [],
  331. }
  332. ]
  333. with pytest.raises(ValueError) as e:
  334. handlers.validate_queue_items(items)
  335. assert (
  336. e.value.args[0]
  337. == "MatchIds of Composite type need to be associated to exactly one Data Mapper"
  338. )
  339. def test_it_validates_composite_queue_item_for_too_many_data_mappers():
  340. items = [
  341. {
  342. "Type": "Composite",
  343. "MatchId": [{"Column": "first_name", "Value": "Test"}],
  344. "Columns": ["column"],
  345. "DataMappers": ["foo", "bar"],
  346. }
  347. ]
  348. with pytest.raises(ValueError) as e:
  349. handlers.validate_queue_items(items)
  350. assert (
  351. e.value.args[0]
  352. == "MatchIds of Composite type need to be associated to exactly one Data Mapper"
  353. )