test_queue_iam.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. import mock
  2. import pytest
  3. from boto3.dynamodb.conditions import Key
  4. pytestmark = [
  5. pytest.mark.acceptance_iam,
  6. pytest.mark.api,
  7. pytest.mark.queue,
  8. pytest.mark.usefixtures("empty_jobs"),
  9. ]
  10. @pytest.mark.auth
  11. def test_auth(api_client_iam, queue_base_endpoint):
  12. headers = {"Authorization": None}
  13. assert (
  14. 403
  15. == api_client_iam.patch(
  16. queue_base_endpoint, json={}, headers=headers
  17. ).status_code
  18. )
  19. assert 403 == api_client_iam.get(queue_base_endpoint, headers=headers).status_code
  20. assert (
  21. 403
  22. == api_client_iam.delete(
  23. "{}/matches".format(queue_base_endpoint), json={}, headers=headers
  24. ).status_code
  25. )
  26. assert (
  27. 403 == api_client_iam.delete(queue_base_endpoint, headers=headers).status_code
  28. )
  29. def test_it_adds_to_queue(
  30. api_client_iam, queue_base_endpoint, queue_table, stack, iam_arn
  31. ):
  32. # Arrange
  33. key = "test"
  34. item = {
  35. "MatchId": key,
  36. "DataMappers": ["a", "b"],
  37. }
  38. expected = {
  39. "DeletionQueueItemId": mock.ANY,
  40. "MatchId": key,
  41. "CreatedAt": mock.ANY,
  42. "DataMappers": ["a", "b"],
  43. "CreatedBy": {"Username": iam_arn, "Sub": mock.ANY},
  44. "Type": "Simple",
  45. }
  46. # Act
  47. response = api_client_iam.patch(queue_base_endpoint, json=item)
  48. response_body = response.json()
  49. # Assert
  50. # Check the response is ok
  51. assert 201 == response.status_code
  52. assert expected == response_body
  53. assert (
  54. response.headers.get("Access-Control-Allow-Origin")
  55. == stack["APIAccessControlAllowOriginHeader"]
  56. )
  57. # Check the item exists in the DDB Table
  58. query_result = queue_table.get_item(
  59. Key={"DeletionQueueItemId": response_body["DeletionQueueItemId"]}
  60. )
  61. assert query_result["Item"]
  62. assert expected == query_result["Item"]
  63. def test_it_adds_composite_to_queue(
  64. api_client_iam, queue_base_endpoint, queue_table, stack, iam_arn
  65. ):
  66. # Arrange
  67. key = [
  68. {"Column": "first_name", "Value": "John"},
  69. {"Column": "last_name", "Value": "Doe"},
  70. ]
  71. item = {
  72. "MatchId": key,
  73. "Type": "Composite",
  74. "DataMappers": ["a"],
  75. }
  76. expected = {
  77. "DeletionQueueItemId": mock.ANY,
  78. "MatchId": key,
  79. "CreatedAt": mock.ANY,
  80. "DataMappers": ["a"],
  81. "CreatedBy": {"Username": iam_arn, "Sub": mock.ANY},
  82. "Type": "Composite",
  83. }
  84. # Act
  85. response = api_client_iam.patch(queue_base_endpoint, json=item)
  86. response_body = response.json()
  87. # Assert
  88. # Check the response is ok
  89. assert 201 == response.status_code
  90. assert expected == response_body
  91. assert (
  92. response.headers.get("Access-Control-Allow-Origin")
  93. == stack["APIAccessControlAllowOriginHeader"]
  94. )
  95. # Check the item exists in the DDB Table
  96. query_result = queue_table.get_item(
  97. Key={"DeletionQueueItemId": response_body["DeletionQueueItemId"]}
  98. )
  99. assert query_result["Item"]
  100. assert expected == query_result["Item"]
  101. def test_it_adds_batch_to_queue(
  102. api_client_iam, queue_base_endpoint, queue_table, stack, iam_arn
  103. ):
  104. # Arrange
  105. items = {
  106. "Matches": [
  107. {"MatchId": "test", "DataMappers": ["a", "b"]},
  108. {"MatchId": "test1", "DataMappers": ["a", "b"], "Type": "Simple"},
  109. {
  110. "MatchId": [
  111. {"Column": "first_name", "Value": "John"},
  112. {"Column": "last_name", "Value": "Doe"},
  113. ],
  114. "DataMappers": ["a"],
  115. "Type": "Composite",
  116. },
  117. ]
  118. }
  119. created_by_mock = {
  120. "Username": iam_arn,
  121. "Sub": mock.ANY,
  122. }
  123. expected = {
  124. "Matches": [
  125. {
  126. "DeletionQueueItemId": mock.ANY,
  127. "MatchId": "test",
  128. "CreatedAt": mock.ANY,
  129. "DataMappers": ["a", "b"],
  130. "CreatedBy": created_by_mock,
  131. "Type": "Simple",
  132. },
  133. {
  134. "DeletionQueueItemId": mock.ANY,
  135. "MatchId": "test1",
  136. "CreatedAt": mock.ANY,
  137. "DataMappers": ["a", "b"],
  138. "CreatedBy": created_by_mock,
  139. "Type": "Simple",
  140. },
  141. {
  142. "DeletionQueueItemId": mock.ANY,
  143. "MatchId": [
  144. {"Column": "first_name", "Value": "John"},
  145. {"Column": "last_name", "Value": "Doe"},
  146. ],
  147. "CreatedAt": mock.ANY,
  148. "DataMappers": ["a"],
  149. "CreatedBy": created_by_mock,
  150. "Type": "Composite",
  151. },
  152. ]
  153. }
  154. # Act
  155. response = api_client_iam.patch(
  156. "{}/matches".format(queue_base_endpoint), json=items
  157. )
  158. response_body = response.json()
  159. # Assert
  160. # Check the response is ok
  161. assert 201 == response.status_code
  162. assert expected == response_body
  163. assert (
  164. response.headers.get("Access-Control-Allow-Origin")
  165. == stack["APIAccessControlAllowOriginHeader"]
  166. )
  167. # Check the items exists in the DDB Table
  168. for i, match in enumerate(expected["Matches"]):
  169. query_result = queue_table.get_item(
  170. Key={
  171. "DeletionQueueItemId": response_body["Matches"][i][
  172. "DeletionQueueItemId"
  173. ]
  174. }
  175. )
  176. assert query_result["Item"]
  177. assert match == query_result["Item"]
  178. def test_it_rejects_invalid_add_to_queue(api_client_iam, queue_base_endpoint, stack):
  179. scenarios = [
  180. {"INVALID": "PAYLOAD"},
  181. {"Type": "Composite", "DataMappers": ["a"], "MatchId": ["a"]},
  182. {"Type": "Composite", "DataMappers": ["a"], "MatchId": [{}]},
  183. {"Type": "Composite", "DataMappers": ["a"], "MatchId": [{"Column": "a"}]},
  184. {"Type": "Composite", "DataMappers": ["a"], "MatchId": [{"Value": "a"}]},
  185. ]
  186. for scenario in scenarios:
  187. response = api_client_iam.patch(queue_base_endpoint, json=scenario)
  188. assert 422 == response.status_code
  189. assert (
  190. response.headers.get("Access-Control-Allow-Origin")
  191. == stack["APIAccessControlAllowOriginHeader"]
  192. )
  193. def test_it_gets_queue(api_client_iam, queue_base_endpoint, del_queue_factory, stack):
  194. # Arrange
  195. del_queue_item = del_queue_factory()
  196. # Act
  197. response = api_client_iam.get(queue_base_endpoint)
  198. response_body = response.json()
  199. # Assert
  200. assert response.status_code == 200
  201. assert isinstance(response_body.get("MatchIds"), list)
  202. assert del_queue_item in response_body["MatchIds"]
  203. assert (
  204. response.headers.get("Access-Control-Allow-Origin")
  205. == stack["APIAccessControlAllowOriginHeader"]
  206. )
  207. assert response.headers.get("Access-Control-Expose-Headers") == "content-length"
  208. def test_it_rejects_invalid_deletion(
  209. api_client_iam, del_queue_factory, queue_base_endpoint, queue_table, stack
  210. ):
  211. # Arrange
  212. del_queue_item = del_queue_factory()
  213. match_id = del_queue_item["MatchId"]
  214. # Act
  215. response = api_client_iam.delete(
  216. "{}/matches".format(queue_base_endpoint),
  217. json={"Matches": [{"MatchId": match_id}]},
  218. )
  219. # Assert
  220. assert 422 == response.status_code
  221. assert (
  222. response.headers.get("Access-Control-Allow-Origin")
  223. == stack["APIAccessControlAllowOriginHeader"]
  224. )
  225. def test_it_cancels_deletion(
  226. api_client_iam, del_queue_factory, queue_base_endpoint, queue_table, stack
  227. ):
  228. # Arrange
  229. del_queue_item = del_queue_factory()
  230. deletion_queue_item_id = del_queue_item["DeletionQueueItemId"]
  231. # Act
  232. response = api_client_iam.delete(
  233. "{}/matches".format(queue_base_endpoint),
  234. json={"Matches": [{"DeletionQueueItemId": deletion_queue_item_id}]},
  235. )
  236. # Assert
  237. assert 204 == response.status_code
  238. assert (
  239. response.headers.get("Access-Control-Allow-Origin")
  240. == stack["APIAccessControlAllowOriginHeader"]
  241. )
  242. # Check the item doesn't exist in the DDB Table
  243. query_result = queue_table.get_item(
  244. Key={"DeletionQueueItemId": deletion_queue_item_id}
  245. )
  246. assert not "Item" in query_result
  247. def test_it_handles_not_found(
  248. api_client_iam, del_queue_factory, queue_base_endpoint, queue_table, stack
  249. ):
  250. # Arrange
  251. deletion_queue_item_id = "test"
  252. # Act
  253. response = api_client_iam.delete(
  254. "{}/matches".format(queue_base_endpoint),
  255. json={"Matches": [{"DeletionQueueItemId": deletion_queue_item_id}]},
  256. )
  257. # Assert
  258. assert 204 == response.status_code
  259. assert (
  260. response.headers.get("Access-Control-Allow-Origin")
  261. == stack["APIAccessControlAllowOriginHeader"]
  262. )
  263. # Check the item doesn't exist in the DDB Table
  264. query_result = queue_table.get_item(
  265. Key={"DeletionQueueItemId": deletion_queue_item_id}
  266. )
  267. assert not "Item" in query_result
  268. def test_it_disables_cancel_deletion_whilst_job_in_progress(
  269. api_client_iam,
  270. queue_base_endpoint,
  271. sf_client,
  272. job_table,
  273. execution_exists_waiter,
  274. job_finished_waiter,
  275. queue_table,
  276. del_queue_factory,
  277. stack,
  278. ):
  279. # Arrange
  280. del_queue_item = del_queue_factory()
  281. deletion_queue_item_id = del_queue_item["DeletionQueueItemId"]
  282. response = api_client_iam.delete(queue_base_endpoint)
  283. response_body = response.json()
  284. job_id = response_body["Id"]
  285. execution_arn = "{}:{}".format(
  286. stack["StateMachineArn"].replace("stateMachine", "execution"), job_id
  287. )
  288. # Act
  289. response = api_client_iam.delete(
  290. "{}/matches".format(queue_base_endpoint),
  291. json={"Matches": [{"DeletionQueueItemId": deletion_queue_item_id}]},
  292. )
  293. try:
  294. # Assert
  295. assert 400 == response.status_code
  296. # Check the item still exists in the DDB Table
  297. query_result = queue_table.get_item(
  298. Key={"DeletionQueueItemId": deletion_queue_item_id}
  299. )
  300. assert query_result["Item"]
  301. finally:
  302. execution_exists_waiter.wait(executionArn=execution_arn)
  303. sf_client.stop_execution(executionArn=execution_arn)
  304. job_finished_waiter.wait(
  305. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  306. )
  307. def test_it_processes_queue(
  308. api_client_iam,
  309. queue_base_endpoint,
  310. sf_client,
  311. job_table,
  312. stack,
  313. job_complete_waiter,
  314. config_mutator,
  315. ):
  316. # Arrange
  317. config_mutator(JobDetailsRetentionDays=0)
  318. # Act
  319. response = api_client_iam.delete(queue_base_endpoint)
  320. response_body = response.json()
  321. job_id = response_body["Id"]
  322. execution_arn = "{}:{}".format(
  323. stack["StateMachineArn"].replace("stateMachine", "execution"), job_id
  324. )
  325. try:
  326. # Assert
  327. assert 202 == response.status_code
  328. assert "Id" in response_body
  329. # Check the job was written to DynamoDB
  330. job_complete_waiter.wait(
  331. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  332. )
  333. query_result = job_table.query(
  334. KeyConditionExpression=Key("Id").eq(job_id) & Key("Sk").eq(job_id)
  335. )
  336. assert 1 == len(query_result["Items"])
  337. assert (
  338. response.headers.get("Access-Control-Allow-Origin")
  339. == stack["APIAccessControlAllowOriginHeader"]
  340. )
  341. finally:
  342. sf_client.stop_execution(executionArn=execution_arn)
  343. def test_it_sets_expiry(
  344. api_client_iam,
  345. queue_base_endpoint,
  346. sf_client,
  347. job_table,
  348. stack,
  349. job_complete_waiter,
  350. config_mutator,
  351. ):
  352. # Arrange
  353. config_mutator(JobDetailsRetentionDays=1)
  354. # Act
  355. response = api_client_iam.delete(queue_base_endpoint)
  356. response_body = response.json()
  357. job_id = response_body["Id"]
  358. execution_arn = "{}:{}".format(
  359. stack["StateMachineArn"].replace("stateMachine", "execution"), job_id
  360. )
  361. try:
  362. # Assert
  363. assert 202 == response.status_code
  364. assert "Id" in response_body
  365. # Check the job was written to DynamoDB
  366. job_complete_waiter.wait(
  367. TableName=job_table.name, Key={"Id": {"S": job_id}, "Sk": {"S": job_id}}
  368. )
  369. query_result = job_table.query(KeyConditionExpression=Key("Id").eq(job_id))
  370. assert len(query_result["Items"]) > 0
  371. assert all(["Expires" in i for i in query_result["Items"]])
  372. assert (
  373. response.headers.get("Access-Control-Allow-Origin")
  374. == stack["APIAccessControlAllowOriginHeader"]
  375. )
  376. finally:
  377. sf_client.stop_execution(executionArn=execution_arn)
  378. def test_it_only_allows_one_concurrent_execution(
  379. api_client_iam, queue_base_endpoint, sf_client, stack, execution_exists_waiter
  380. ):
  381. # Arrange
  382. # Start a job
  383. response = api_client_iam.delete(queue_base_endpoint)
  384. response_body = response.json()
  385. job_id = response_body["Id"]
  386. execution_arn = "{}:{}".format(
  387. stack["StateMachineArn"].replace("stateMachine", "execution"), job_id
  388. )
  389. # Act
  390. # Start a second job
  391. response = api_client_iam.delete(queue_base_endpoint)
  392. try:
  393. # Assert
  394. assert 400 == response.status_code
  395. assert (
  396. response.headers.get("Access-Control-Allow-Origin")
  397. == stack["APIAccessControlAllowOriginHeader"]
  398. )
  399. finally:
  400. execution_exists_waiter.wait(executionArn=execution_arn)
  401. sf_client.stop_execution(executionArn=execution_arn)