test_job.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. import datetime
  2. import json
  3. from types import SimpleNamespace
  4. import mock
  5. import pytest
  6. from boto3.dynamodb.conditions import And, Attr
  7. from botocore.exceptions import ClientError
  8. from mock import patch, ANY
  9. from backend.lambdas.jobs import handlers
  10. pytestmark = [pytest.mark.unit, pytest.mark.api, pytest.mark.jobs]
  11. class TestGetJob:
  12. @patch("backend.lambdas.jobs.handlers.table")
  13. def test_it_retrieves_job(self, table):
  14. mock_job = {"Id": "test"}
  15. table.get_item.return_value = {"Item": mock_job}
  16. response = handlers.get_job_handler(
  17. {"pathParameters": {"job_id": "test"}}, SimpleNamespace()
  18. )
  19. assert 200 == response["statusCode"]
  20. assert mock_job == json.loads(response["body"])
  21. assert ANY == response["headers"]
  22. @patch("backend.lambdas.jobs.handlers.table")
  23. def test_it_returns_404_for_job_not_found(self, table):
  24. table.get_item.side_effect = ClientError(
  25. {"ResponseMetadata": {"HTTPStatusCode": 404}}, "get_item"
  26. )
  27. response = handlers.get_job_handler(
  28. {"pathParameters": {"job_id": "test"}}, SimpleNamespace()
  29. )
  30. assert 404 == response["statusCode"]
  31. assert ANY == response["headers"]
  32. class TestListJobs:
  33. @patch("backend.lambdas.jobs.handlers.table")
  34. def test_it_lists_jobs(self, table):
  35. stub = job_stub()
  36. table.query.return_value = {"Items": [stub]}
  37. response = handlers.list_jobs_handler(
  38. {"queryStringParameters": None}, SimpleNamespace()
  39. )
  40. resp_body = json.loads(response["body"])
  41. assert 200 == response["statusCode"]
  42. assert 1 == len(resp_body["Jobs"])
  43. assert stub == resp_body["Jobs"][0]
  44. @patch("backend.lambdas.jobs.handlers.bucket_count", 3)
  45. @patch("backend.lambdas.jobs.handlers.table")
  46. def test_it_queries_all_gsi_buckets(self, table):
  47. stub = job_stub()
  48. table.query.return_value = {"Items": [stub]}
  49. handlers.list_jobs_handler({"queryStringParameters": None}, SimpleNamespace())
  50. assert 3 == table.query.call_count
  51. @patch("backend.lambdas.jobs.handlers.Key")
  52. @patch("backend.lambdas.jobs.handlers.table")
  53. def test_it_handles_list_job_start_at_qs(self, table, k):
  54. stub = job_stub()
  55. table.query.return_value = {"Items": [stub]}
  56. handlers.list_jobs_handler(
  57. {"queryStringParameters": {"start_at": "12345"}}, SimpleNamespace()
  58. )
  59. k.assert_called_with("CreatedAt")
  60. k().lt.assert_called_with(12345)
  61. @patch("backend.lambdas.jobs.handlers.table")
  62. def test_it_returns_requested_page_size_for_jobs(self, table):
  63. stub = job_stub()
  64. table.query.return_value = {"Items": [stub for _ in range(0, 3)]}
  65. handlers.list_jobs_handler(
  66. {"queryStringParameters": {"page_size": "3"}}, SimpleNamespace()
  67. )
  68. table.query.assert_called_with(
  69. IndexName=ANY,
  70. KeyConditionExpression=ANY,
  71. ScanIndexForward=ANY,
  72. Limit=3,
  73. ProjectionExpression=ANY,
  74. )
  75. @patch("backend.lambdas.jobs.handlers.bucket_count", 3)
  76. @patch("backend.lambdas.jobs.handlers.table")
  77. def test_it_returns_requested_page_size_for_jobs_with_multiple_gsi_buckets(
  78. self, table
  79. ):
  80. table.query.return_value = {"Items": [job_stub() for _ in range(0, 5)]}
  81. resp = handlers.list_jobs_handler(
  82. {"queryStringParameters": {"page_size": "5"}}, SimpleNamespace()
  83. )
  84. assert 3 == table.query.call_count
  85. assert 5 == len(json.loads(resp["body"])["Jobs"])
  86. def test_it_rejects_invalid_page_size_for_list_jobs(self):
  87. response = handlers.list_jobs_handler(
  88. {"queryStringParameters": {"page_size": "NaN"}}, SimpleNamespace()
  89. )
  90. assert 422 == response["statusCode"]
  91. response = handlers.list_jobs_handler(
  92. {"queryStringParameters": {"page_size": "0"}}, SimpleNamespace()
  93. )
  94. assert 422 == response["statusCode"]
  95. response = handlers.list_jobs_handler(
  96. {"queryStringParameters": {"page_size": "1001"}}, SimpleNamespace()
  97. )
  98. assert 422 == response["statusCode"]
  99. def test_it_rejects_invalid_start_at_for_list_jobs(self):
  100. response = handlers.list_jobs_handler(
  101. {"queryStringParameters": {"start_at": "badformat"}}, SimpleNamespace()
  102. )
  103. assert 422 == response["statusCode"]
  104. @patch("backend.lambdas.jobs.handlers.table")
  105. def test_it_rejects_invalid_start_at_for_list_jobs(self, table):
  106. stub = job_stub()
  107. table.query.return_value = {"Items": [stub for _ in range(0, 3)]}
  108. response = handlers.list_jobs_handler(
  109. {"queryStringParameters": {"page_size": "badformat"}}, SimpleNamespace()
  110. )
  111. assert 422 == response["statusCode"]
  112. class TestListJobEvents:
  113. @patch("backend.lambdas.jobs.handlers.table")
  114. def test_it_lists_jobs_events(self, table):
  115. stub = job_event_stub()
  116. table.get_item.return_value = {"Item": job_stub()}
  117. table.query.return_value = {"Items": [stub]}
  118. response = handlers.list_job_events_handler(
  119. {
  120. "queryStringParameters": None,
  121. "pathParameters": {"job_id": "test"},
  122. "multiValueQueryStringParameters": None,
  123. },
  124. SimpleNamespace(),
  125. )
  126. resp_body = json.loads(response["body"])
  127. assert 200 == response["statusCode"]
  128. assert 1 == len(resp_body["JobEvents"])
  129. assert stub == resp_body["JobEvents"][0]
  130. @patch("backend.lambdas.jobs.handlers.table")
  131. def test_it_errors_if_job_not_found(self, table):
  132. table.get_item.side_effect = ClientError(
  133. {"ResponseMetadata": {"HTTPStatusCode": 404}}, "get_item"
  134. )
  135. response = handlers.list_job_events_handler(
  136. {
  137. "pathParameters": {"job_id": "test"},
  138. "queryStringParameters": {"start_at": "12345#test"},
  139. "multiValueQueryStringParameters": {},
  140. },
  141. SimpleNamespace(),
  142. )
  143. assert 404 == response["statusCode"]
  144. @patch("backend.lambdas.jobs.handlers.table")
  145. def test_it_returns_requested_page_size_for_jobs_events(self, table):
  146. table.get_item.return_value = {"Item": job_stub()}
  147. table.query.return_value = {
  148. "Items": [job_event_stub(job_id="job123", sk=str(i)) for i in range(1, 5)],
  149. "LastEvaluatedKey": {"Id": "test", "Sk": "12345"},
  150. }
  151. response = handlers.list_job_events_handler(
  152. {
  153. "pathParameters": {"job_id": "test"},
  154. "queryStringParameters": {"page_size": "3"},
  155. "multiValueQueryStringParameters": {"page_size": ["3"]},
  156. },
  157. SimpleNamespace(),
  158. )
  159. resp_body = json.loads(response["body"])
  160. table.query.assert_called_with(
  161. KeyConditionExpression=mock.ANY,
  162. ScanIndexForward=True,
  163. Limit=4,
  164. FilterExpression=mock.ANY,
  165. ExclusiveStartKey=mock.ANY,
  166. )
  167. assert 200 == response["statusCode"]
  168. assert 3 == len(resp_body["JobEvents"])
  169. assert "3" == resp_body["JobEvents"][len(resp_body["JobEvents"]) - 1]["Sk"]
  170. assert "NextStart" in resp_body
  171. @patch("backend.lambdas.jobs.handlers.table")
  172. def test_it_starts_at_earliest_by_default(self, table):
  173. stub = job_event_stub()
  174. table.get_item.return_value = {"Item": job_stub()}
  175. table.query.return_value = {"Items": [stub]}
  176. response = handlers.list_job_events_handler(
  177. {
  178. "queryStringParameters": None,
  179. "pathParameters": {"job_id": "test"},
  180. "multiValueQueryStringParameters": None,
  181. },
  182. SimpleNamespace(),
  183. )
  184. assert 200 == response["statusCode"]
  185. table.query.assert_called_with(
  186. KeyConditionExpression=mock.ANY,
  187. ScanIndexForward=True,
  188. Limit=mock.ANY,
  189. ExclusiveStartKey={"Id": "test", "Sk": "0"},
  190. FilterExpression=mock.ANY,
  191. )
  192. @patch("backend.lambdas.jobs.handlers.table")
  193. def test_it_accepts_start_at_earliest_watermark(self, table):
  194. stub = job_event_stub()
  195. table.get_item.return_value = {"Item": job_stub()}
  196. table.query.return_value = {
  197. "Items": [stub],
  198. "LastEvaluatedKey": {"Id": "test", "Sk": "12345#test"},
  199. }
  200. response = handlers.list_job_events_handler(
  201. {
  202. "pathParameters": {"job_id": "test"},
  203. "queryStringParameters": {"start_at": "0"},
  204. "multiValueQueryStringParameters": {"start_at": ["0"]},
  205. },
  206. SimpleNamespace(),
  207. )
  208. resp_body = json.loads(response["body"])
  209. assert 200 == response["statusCode"]
  210. assert "NextStart" in resp_body
  211. assert {
  212. "KeyConditionExpression": mock.ANY,
  213. "ScanIndexForward": mock.ANY,
  214. "Limit": mock.ANY,
  215. "FilterExpression": mock.ANY,
  216. "ExclusiveStartKey": {"Id": "test", "Sk": "0"},
  217. } == table.query.call_args_list[0][1]
  218. @patch("backend.lambdas.jobs.handlers.table")
  219. def test_it_starts_at_watermark(self, table):
  220. stub = job_event_stub()
  221. table.get_item.return_value = {"Item": job_stub()}
  222. table.query.return_value = {
  223. "Items": [stub],
  224. "LastEvaluatedKey": {"Id": "test", "Sk": "12345#test"},
  225. }
  226. response = handlers.list_job_events_handler(
  227. {
  228. "pathParameters": {"job_id": "test"},
  229. "queryStringParameters": {"start_at": "12345#test"},
  230. "multiValueQueryStringParameters": {"start_at": ["12345#test"]},
  231. },
  232. SimpleNamespace(),
  233. )
  234. resp_body = json.loads(response["body"])
  235. assert 200 == response["statusCode"]
  236. assert "NextStart" in resp_body
  237. table.query.assert_called_with(
  238. KeyConditionExpression=mock.ANY,
  239. ScanIndexForward=mock.ANY,
  240. Limit=mock.ANY,
  241. FilterExpression=mock.ANY,
  242. ExclusiveStartKey={"Id": "test", "Sk": "12345#test"},
  243. )
  244. @patch("backend.lambdas.jobs.handlers.table")
  245. def test_it_handles_watermark_with_microseconds_in_same_second(self, table):
  246. stub = job_event_stub()
  247. table.get_item.return_value = {"Item": job_stub(JobFinishTime=12345)}
  248. table.query.return_value = {"Items": [stub]}
  249. response = handlers.list_job_events_handler(
  250. {
  251. "pathParameters": {"job_id": "test"},
  252. "queryStringParameters": {"start_at": "12345001#test"},
  253. "multiValueQueryStringParameters": {"start_at": ["12345001#test"]},
  254. },
  255. SimpleNamespace(),
  256. )
  257. resp_body = json.loads(response["body"])
  258. assert 200 == response["statusCode"]
  259. assert "NextStart" in resp_body
  260. table.query.assert_called_with(
  261. KeyConditionExpression=mock.ANY,
  262. ScanIndexForward=mock.ANY,
  263. Limit=mock.ANY,
  264. FilterExpression=mock.ANY,
  265. ExclusiveStartKey={"Id": "test", "Sk": "12345001#test"},
  266. )
  267. @patch("backend.lambdas.jobs.handlers.table")
  268. def test_it_returns_page_size_where_job_item_in_initial_query_response(self, table):
  269. job = job_stub(JobFinishTime=12345, JobStatus="COMPLETED")
  270. table.get_item.return_value = {"Item": job}
  271. table.query.side_effect = [
  272. {
  273. "Items": [job_event_stub(Sk=str(i)) for i in range(0, 19)],
  274. "LastEvaluatedKey": {"Id": job["Id"], "Sk": "19"},
  275. },
  276. {"Items": [job_event_stub(Sk="20")]},
  277. ]
  278. response = handlers.list_job_events_handler(
  279. {
  280. "queryStringParameters": None,
  281. "pathParameters": {"job_id": "test"},
  282. "multiValueQueryStringParameters": None,
  283. },
  284. SimpleNamespace(),
  285. )
  286. resp_body = json.loads(response["body"])
  287. assert 2 == table.query.call_count
  288. assert 200 == response["statusCode"]
  289. assert 20 == len(resp_body["JobEvents"])
  290. assert "NextStart" not in resp_body
  291. # Running jobs
  292. @patch("backend.lambdas.jobs.handlers.table")
  293. def test_it_returns_last_item_watermark_for_incomplete_job(self, table):
  294. stub = job_event_stub()
  295. table.get_item.return_value = {"Item": job_stub(JobStatus="RUNNING")}
  296. table.query.return_value = {"Items": [stub]}
  297. response = handlers.list_job_events_handler(
  298. {
  299. "pathParameters": {"job_id": "test"},
  300. },
  301. SimpleNamespace(),
  302. )
  303. resp_body = json.loads(response["body"])
  304. assert 200 == response["statusCode"]
  305. assert stub["Sk"] == resp_body["NextStart"]
  306. @patch("backend.lambdas.jobs.handlers.table")
  307. def test_it_returns_last_item_watermark_for_incomplete_job_where_page_size_is_fulfilled(
  308. self, table
  309. ):
  310. stub = job_event_stub()
  311. table.get_item.return_value = {"Item": job_stub(JobStatus="RUNNING")}
  312. table.query.return_value = {"Items": [stub]}
  313. response = handlers.list_job_events_handler(
  314. {
  315. "pathParameters": {"job_id": "test"},
  316. "queryStringParameters": {"page_size": "1"},
  317. "multiValueQueryStringParameters": {"page_size": ["1"]},
  318. },
  319. SimpleNamespace(),
  320. )
  321. resp_body = json.loads(response["body"])
  322. assert 200 == response["statusCode"]
  323. assert stub["Sk"] == resp_body["NextStart"]
  324. @patch("backend.lambdas.jobs.handlers.table")
  325. def test_it_returns_provided_watermark_for_no_events_for_incomplete_job(
  326. self, table
  327. ):
  328. table.get_item.return_value = {"Item": job_stub(JobStatus="RUNNING")}
  329. table.query.return_value = {"Items": []}
  330. response = handlers.list_job_events_handler(
  331. {
  332. "pathParameters": {"job_id": "test"},
  333. "queryStringParameters": {"start_at": "111111#trgwtrwgergewrgwgrw"},
  334. "multiValueQueryStringParameters": {
  335. "start_at": ["111111#trgwtrwgergewrgwgrw"]
  336. },
  337. },
  338. SimpleNamespace(),
  339. )
  340. resp_body = json.loads(response["body"])
  341. assert 200 == response["statusCode"]
  342. assert "111111#trgwtrwgergewrgwgrw" == resp_body["NextStart"]
  343. # Completed jobs
  344. @patch("backend.lambdas.jobs.handlers.table")
  345. def test_it_returns_last_item_watermark_where_not_last_page_and_job_complete(
  346. self, table
  347. ):
  348. job = job_stub(JobFinishTime=12345, JobStatus="COMPLETED")
  349. table.get_item.return_value = {"Item": job}
  350. table.query.return_value = {
  351. "Items": [job_event_stub(Sk=str(i)) for i in range(0, 20)],
  352. "LastEvaluatedKey": {"Id": job["Id"], "Sk": "19"},
  353. }
  354. response = handlers.list_job_events_handler(
  355. {
  356. "queryStringParameters": None,
  357. "pathParameters": {"job_id": "test"},
  358. "multiValueQueryStringParameters": None,
  359. },
  360. SimpleNamespace(),
  361. )
  362. resp_body = json.loads(response["body"])
  363. assert 200 == response["statusCode"]
  364. assert "19" == resp_body["NextStart"]
  365. @patch("backend.lambdas.jobs.handlers.table")
  366. def test_it_does_not_return_watermark_if_last_page_reached_on_complete_job(
  367. self, table
  368. ):
  369. events = [
  370. "COMPLETED_CLEANUP_FAILED",
  371. "COMPLETED",
  372. "FAILED",
  373. "FIND_FAILED",
  374. "FORGET_FAILED",
  375. "FORGET_PARTIALLY_FAILED",
  376. ]
  377. for s in events:
  378. event_stub = job_event_stub()
  379. table.get_item.return_value = {"Item": job_stub(JobStatus=s)}
  380. table.query.return_value = {"Items": [event_stub]}
  381. response = handlers.list_job_events_handler(
  382. {
  383. "pathParameters": {"job_id": "test"},
  384. "queryStringParameters": None,
  385. "multiValueQueryStringParameters": None,
  386. },
  387. SimpleNamespace(),
  388. )
  389. resp_body = json.loads(response["body"])
  390. assert 200 == response["statusCode"]
  391. assert "NextStart" not in resp_body
  392. # Errors
  393. @patch("backend.lambdas.jobs.handlers.table")
  394. def test_it_returns_error_if_invalid_watermark_supplied_for_completed_job(
  395. self, table
  396. ):
  397. stub = job_event_stub()
  398. table.get_item.return_value = {"Item": job_stub(JobFinishTime=12345)}
  399. table.query.return_value = {"Items": [stub]}
  400. response = handlers.list_job_events_handler(
  401. {
  402. "pathParameters": {"job_id": "test"},
  403. "queryStringParameters": {"start_at": "12346001#test"},
  404. "multiValueQueryStringParameters": {"start_at": ["12346001#test"]},
  405. },
  406. SimpleNamespace(),
  407. )
  408. assert 400 == response["statusCode"]
  409. @patch("backend.lambdas.jobs.handlers.table")
  410. def test_it_returns_error_if_invalid_watermark_supplied_for_running_job(
  411. self, table
  412. ):
  413. table.get_item.return_value = {"Item": job_stub()}
  414. response = handlers.list_job_events_handler(
  415. {
  416. "pathParameters": {"job_id": "test"},
  417. "queryStringParameters": {"start_at": "999999999999999#test"},
  418. "multiValueQueryStringParameters": {},
  419. },
  420. SimpleNamespace(),
  421. )
  422. assert 400 == response["statusCode"]
  423. def test_it_rejects_invalid_page_size_for_list_job_events(self):
  424. response = handlers.list_job_events_handler(
  425. {"queryStringParameters": {"page_size": "NaN"}}, SimpleNamespace()
  426. )
  427. assert 422 == response["statusCode"]
  428. response = handlers.list_job_events_handler(
  429. {"queryStringParameters": {"page_size": "0"}}, SimpleNamespace()
  430. )
  431. assert 422 == response["statusCode"]
  432. response = handlers.list_job_events_handler(
  433. {"queryStringParameters": {"page_size": "1001"}}, SimpleNamespace()
  434. )
  435. assert 422 == response["statusCode"]
  436. class TestListJobEventFilters:
  437. @patch("backend.lambdas.jobs.handlers.table")
  438. def test_it_applies_filters(self, table):
  439. stub = job_event_stub()
  440. table.get_item.return_value = {"Item": job_stub()}
  441. table.query.return_value = {"Items": [stub]}
  442. response = handlers.list_job_events_handler(
  443. {
  444. "pathParameters": {"job_id": "test"},
  445. "queryStringParameters": {"start_at": "0"},
  446. "multiValueQueryStringParameters": {
  447. "filter": ["EventName=QuerySucceeded"]
  448. },
  449. },
  450. SimpleNamespace(),
  451. )
  452. resp_body = json.loads(response["body"])
  453. assert 200 == response["statusCode"]
  454. assert "NextStart" in resp_body
  455. assert 1 == table.query.call_count
  456. assert 1 == len(resp_body["JobEvents"])
  457. # Filter expression should be an And condition with 2 components
  458. # get_expression()["values"] returns the filters being applied as part of the And condition
  459. filter_expression = table.query.call_args_list[0][1]["FilterExpression"]
  460. assert isinstance(table.query.call_args_list[0][1]["FilterExpression"], And)
  461. assert (
  462. Attr("Type").eq("JobEvent") in filter_expression.get_expression()["values"]
  463. )
  464. assert (
  465. Attr("EventName").begins_with("QuerySucceeded")
  466. in filter_expression.get_expression()["values"]
  467. )
  468. assert 2 == len(filter_expression.get_expression()["values"])
  469. # Running jobs
  470. @patch("backend.lambdas.jobs.handlers.table")
  471. def test_it_returns_ddb_watermark_where_ddb_response_is_less_than_page_size(
  472. self, table
  473. ):
  474. job = job_stub(JobStatus="RUNNING")
  475. table.get_item.return_value = {"Item": job}
  476. # LastEvaluatedKey is determined before the Filter Expression is applied
  477. # so LastEvaluatedKey can still be present
  478. table.query.side_effect = [
  479. {
  480. "Items": [job_event_stub(Sk=str(i)) for i in range(0, 10)],
  481. "LastEvaluatedKey": {"Id": job["Id"], "Sk": "40"},
  482. },
  483. {"Items": []},
  484. ]
  485. response = handlers.list_job_events_handler(
  486. {
  487. "pathParameters": {"job_id": "test"},
  488. "queryStringParameters": {"start_at": "0"},
  489. "multiValueQueryStringParameters": {
  490. "filter": ["EventName=QuerySucceeded"]
  491. },
  492. },
  493. SimpleNamespace(),
  494. )
  495. resp_body = json.loads(response["body"])
  496. assert 200 == response["statusCode"]
  497. assert "40" == resp_body["NextStart"]
  498. @patch("backend.lambdas.jobs.handlers.table")
  499. def test_it_returns_last_item_watermark_where_ddb_response_is_greater_than_page_size(
  500. self, table
  501. ):
  502. job = job_stub(JobStatus="RUNNING")
  503. table.get_item.return_value = {"Item": job}
  504. # LastEvaluatedKey is determined before the Filter Expression is applied
  505. table.query.return_value = {
  506. "Items": [job_event_stub(Sk=str(i)) for i in range(0, 100)],
  507. "LastEvaluatedKey": {"Id": job["Id"], "Sk": "99"},
  508. }
  509. response = handlers.list_job_events_handler(
  510. {
  511. "pathParameters": {"job_id": "test"},
  512. "queryStringParameters": {"start_at": "0"},
  513. "multiValueQueryStringParameters": {
  514. "filter": ["EventName=QuerySucceeded"]
  515. },
  516. },
  517. SimpleNamespace(),
  518. )
  519. resp_body = json.loads(response["body"])
  520. assert 200 == response["statusCode"]
  521. assert "19" == resp_body["NextStart"]
  522. # Completed jobs
  523. @patch("backend.lambdas.jobs.handlers.table")
  524. def test_it_returns_last_item_watermark_where_not_last_page_and_job_complete(
  525. self, table
  526. ):
  527. job = job_stub(JobFinishTime=12345, JobStatus="COMPLETED")
  528. table.get_item.return_value = {"Item": job}
  529. table.query.return_value = {
  530. "Items": [job_event_stub(Sk=str(i)) for i in range(0, 20)],
  531. "LastEvaluatedKey": {"Id": job["Id"], "Sk": "19"},
  532. }
  533. response = handlers.list_job_events_handler(
  534. {
  535. "queryStringParameters": {},
  536. "pathParameters": {"job_id": "test"},
  537. "multiValueQueryStringParameters": {
  538. "filter": ["EventName=QuerySucceeded"]
  539. },
  540. },
  541. SimpleNamespace(),
  542. )
  543. resp_body = json.loads(response["body"])
  544. assert 200 == response["statusCode"]
  545. assert "19" == resp_body["NextStart"]
  546. @patch("backend.lambdas.jobs.handlers.table")
  547. def test_it_does_not_return_watermark_if_last_page_reached_on_complete_job(
  548. self, table
  549. ):
  550. events = [
  551. "COMPLETED_CLEANUP_FAILED",
  552. "COMPLETED",
  553. "FAILED",
  554. "FIND_FAILED",
  555. "FORGET_FAILED",
  556. "FORGET_PARTIALLY_FAILED",
  557. ]
  558. for s in events:
  559. event_stub = job_event_stub()
  560. table.get_item.return_value = {"Item": job_stub(JobStatus=s)}
  561. table.query.return_value = {"Items": [event_stub]}
  562. response = handlers.list_job_events_handler(
  563. {
  564. "pathParameters": {"job_id": "test"},
  565. "queryStringParameters": {"start_at": "0"},
  566. "multiValueQueryStringParameters": {
  567. "filter": ["EventName=QuerySucceeded"]
  568. },
  569. },
  570. SimpleNamespace(),
  571. )
  572. resp_body = json.loads(response["body"])
  573. assert 200 == response["statusCode"]
  574. assert "NextStart" not in resp_body
  575. # Errors
  576. def test_it_rejects_invalid_filters(self):
  577. response = handlers.list_job_events_handler(
  578. {
  579. "pathParameters": {"job_id": "test"},
  580. "queryStringParameters": {"start_at": "0"},
  581. "multiValueQueryStringParameters": {"filter": ["Invalid=Filter"]},
  582. },
  583. SimpleNamespace(),
  584. )
  585. assert 422 == response["statusCode"]
  586. def job_stub(
  587. job_id="test", created_at=round(datetime.datetime.utcnow().timestamp()), **kwargs
  588. ):
  589. return {
  590. "Id": job_id,
  591. "Sk": job_id,
  592. "CreatedAt": created_at,
  593. "Type": "Job",
  594. "JobStatus": "RUNNING",
  595. **kwargs,
  596. }
  597. def job_event_stub(job_id="test", sk=None, **kwargs):
  598. now = round(datetime.datetime.utcnow().timestamp())
  599. if not sk:
  600. sk = "{}#{}".format(str(now), "12345")
  601. return {
  602. "Id": job_id,
  603. "Sk": sk,
  604. "Type": "JobEvent",
  605. "CreatedAt": now,
  606. "EventName": "QuerySucceeded",
  607. **kwargs,
  608. }