test_status_updater.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. import os
  2. import boto3
  3. import pytest
  4. from botocore.exceptions import ClientError
  5. from mock import patch, Mock
  6. with patch.dict(os.environ, {"JobTable": "test", "DeletionQueueTable": "test"}):
  7. from backend.lambdas.jobs.status_updater import (
  8. update_status,
  9. determine_status,
  10. job_has_errors,
  11. )
  12. pytestmark = [pytest.mark.unit, pytest.mark.jobs]
  13. @patch("backend.lambdas.jobs.status_updater.job_has_errors", Mock(return_value=False))
  14. def test_it_determines_basic_statuses():
  15. assert "FIND_FAILED" == determine_status("123", "FindPhaseFailed")
  16. assert "FORGET_FAILED" == determine_status("123", "ForgetPhaseFailed")
  17. assert "FAILED" == determine_status("123", "Exception")
  18. assert "RUNNING" == determine_status("123", "JobStarted")
  19. assert "FORGET_COMPLETED_CLEANUP_IN_PROGRESS" == determine_status(
  20. "123", "ForgetPhaseEnded"
  21. )
  22. assert "COMPLETED_CLEANUP_FAILED" == determine_status("123", "CleanupFailed")
  23. assert "COMPLETED" == determine_status("123", "CleanupSucceeded")
  24. @patch("backend.lambdas.jobs.status_updater.job_has_errors", Mock(return_value=True))
  25. def test_it_determines_completed_with_errors():
  26. assert "FORGET_PARTIALLY_FAILED" == determine_status("123", "ForgetPhaseEnded")
  27. @patch("backend.lambdas.jobs.status_updater.table")
  28. def test_it_determines_job_has_errors_for_failed_object_updates(table):
  29. table.get_item.return_value = {"Item": {"TotalObjectUpdateFailedCount": 1}}
  30. assert job_has_errors("test")
  31. @patch("backend.lambdas.jobs.status_updater.table")
  32. def test_it_determines_job_has_errors_for_failed_queries(table):
  33. table.get_item.return_value = {"Item": {"TotalQueryFailedCount": 1}}
  34. assert job_has_errors("test")
  35. @patch("backend.lambdas.jobs.status_updater.table")
  36. def test_it_determines_job_does_not_have_errors_for_failed_object_updates(table):
  37. table.get_item.return_value = {
  38. "Item": {
  39. "TotalObjectUpdateFailedCount": 0,
  40. "TotalQueryFailedCount": 0,
  41. }
  42. }
  43. assert not job_has_errors("test")
  44. @patch(
  45. "backend.lambdas.jobs.status_updater.determine_status", Mock(return_value="RUNNING")
  46. )
  47. @patch("backend.lambdas.jobs.status_updater.table")
  48. def test_it_handles_job_started(table):
  49. update_status(
  50. "job123",
  51. [
  52. {
  53. "Id": "job123",
  54. "Sk": "123456",
  55. "Type": "JobEvent",
  56. "CreatedAt": 123.0,
  57. "EventName": "JobStarted",
  58. "EventData": {},
  59. }
  60. ],
  61. )
  62. table.update_item.assert_called_with(
  63. Key={
  64. "Id": "job123",
  65. "Sk": "job123",
  66. },
  67. UpdateExpression="set #JobStatus = :JobStatus, #JobStartTime = :JobStartTime",
  68. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  69. ExpressionAttributeNames={
  70. "#Id": "Id",
  71. "#Sk": "Sk",
  72. "#JobStatus": "JobStatus",
  73. "#JobStartTime": "JobStartTime",
  74. },
  75. ExpressionAttributeValues={
  76. ":Id": "job123",
  77. ":Sk": "job123",
  78. ":RUNNING": "RUNNING",
  79. ":QUEUED": "QUEUED",
  80. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  81. ":JobStatus": "RUNNING",
  82. ":JobStartTime": 123.0,
  83. },
  84. ReturnValues="ALL_NEW",
  85. )
  86. assert 1 == table.update_item.call_count
  87. @patch(
  88. "backend.lambdas.jobs.status_updater.determine_status",
  89. Mock(return_value="FORGET_COMPLETED_CLEANUP_IN_PROGRESS"),
  90. )
  91. @patch("backend.lambdas.jobs.status_updater.table")
  92. def test_it_handles_forget_finished(table):
  93. update_status(
  94. "job123",
  95. [
  96. {
  97. "Id": "job123",
  98. "Sk": "123456",
  99. "Type": "JobEvent",
  100. "CreatedAt": 123,
  101. "EventName": "ForgetPhaseEnded",
  102. "EventData": {},
  103. }
  104. ],
  105. )
  106. table.update_item.assert_called_with(
  107. Key={
  108. "Id": "job123",
  109. "Sk": "job123",
  110. },
  111. UpdateExpression="set #JobStatus = :JobStatus",
  112. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  113. ExpressionAttributeNames={
  114. "#Id": "Id",
  115. "#Sk": "Sk",
  116. "#JobStatus": "JobStatus",
  117. },
  118. ExpressionAttributeValues={
  119. ":Id": "job123",
  120. ":Sk": "job123",
  121. ":RUNNING": "RUNNING",
  122. ":QUEUED": "QUEUED",
  123. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  124. ":JobStatus": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  125. },
  126. ReturnValues="ALL_NEW",
  127. )
  128. assert 1 == table.update_item.call_count
  129. @patch(
  130. "backend.lambdas.jobs.status_updater.determine_status",
  131. Mock(return_value="COMPLETED"),
  132. )
  133. @patch("backend.lambdas.jobs.status_updater.table")
  134. def test_it_handles_cleanup_success(table):
  135. update_status(
  136. "job123",
  137. [
  138. {
  139. "Id": "job123",
  140. "Sk": "123456",
  141. "Type": "JobEvent",
  142. "CreatedAt": 123,
  143. "EventName": "CleanupSucceeded",
  144. "EventData": {},
  145. }
  146. ],
  147. )
  148. table.update_item.assert_called_with(
  149. Key={
  150. "Id": "job123",
  151. "Sk": "job123",
  152. },
  153. UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
  154. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  155. ExpressionAttributeNames={
  156. "#Id": "Id",
  157. "#Sk": "Sk",
  158. "#JobStatus": "JobStatus",
  159. "#JobFinishTime": "JobFinishTime",
  160. },
  161. ExpressionAttributeValues={
  162. ":Id": "job123",
  163. ":Sk": "job123",
  164. ":RUNNING": "RUNNING",
  165. ":QUEUED": "QUEUED",
  166. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  167. ":JobStatus": "COMPLETED",
  168. ":JobFinishTime": 123.0,
  169. },
  170. ReturnValues="ALL_NEW",
  171. )
  172. assert 1 == table.update_item.call_count
  173. @patch(
  174. "backend.lambdas.jobs.status_updater.determine_status",
  175. Mock(return_value="COMPLETED_CLEANUP_FAILED"),
  176. )
  177. @patch("backend.lambdas.jobs.status_updater.table")
  178. def test_it_handles_cleanup_failed(table):
  179. update_status(
  180. "job123",
  181. [
  182. {
  183. "Id": "job123",
  184. "Sk": "123456",
  185. "Type": "JobEvent",
  186. "CreatedAt": 123,
  187. "EventName": "CleanupFailed",
  188. "EventData": {},
  189. }
  190. ],
  191. )
  192. table.update_item.assert_called_with(
  193. Key={
  194. "Id": "job123",
  195. "Sk": "job123",
  196. },
  197. UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
  198. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  199. ExpressionAttributeNames={
  200. "#Id": "Id",
  201. "#Sk": "Sk",
  202. "#JobStatus": "JobStatus",
  203. "#JobFinishTime": "JobFinishTime",
  204. },
  205. ExpressionAttributeValues={
  206. ":Id": "job123",
  207. ":Sk": "job123",
  208. ":RUNNING": "RUNNING",
  209. ":QUEUED": "QUEUED",
  210. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  211. ":JobStatus": "COMPLETED_CLEANUP_FAILED",
  212. ":JobFinishTime": 123.0,
  213. },
  214. ReturnValues="ALL_NEW",
  215. )
  216. assert 1 == table.update_item.call_count
  217. @patch(
  218. "backend.lambdas.jobs.status_updater.determine_status",
  219. Mock(return_value="FIND_FAILED"),
  220. )
  221. @patch("backend.lambdas.jobs.status_updater.table")
  222. def test_it_handles_find_failed(table):
  223. update_status(
  224. "job123",
  225. [
  226. {
  227. "Id": "job123",
  228. "Sk": "123456",
  229. "Type": "JobEvent",
  230. "CreatedAt": 123.0,
  231. "EventName": "FindPhaseFailed",
  232. "EventData": {},
  233. }
  234. ],
  235. )
  236. table.update_item.assert_called_with(
  237. Key={
  238. "Id": "job123",
  239. "Sk": "job123",
  240. },
  241. UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
  242. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  243. ExpressionAttributeNames={
  244. "#Id": "Id",
  245. "#Sk": "Sk",
  246. "#JobStatus": "JobStatus",
  247. "#JobFinishTime": "JobFinishTime",
  248. },
  249. ExpressionAttributeValues={
  250. ":Id": "job123",
  251. ":Sk": "job123",
  252. ":JobStatus": "FIND_FAILED",
  253. ":RUNNING": "RUNNING",
  254. ":QUEUED": "QUEUED",
  255. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  256. ":JobFinishTime": 123.0,
  257. },
  258. ReturnValues="ALL_NEW",
  259. )
  260. assert 1 == table.update_item.call_count
  261. @patch(
  262. "backend.lambdas.jobs.status_updater.determine_status",
  263. Mock(return_value="FORGET_FAILED"),
  264. )
  265. @patch("backend.lambdas.jobs.status_updater.table")
  266. def test_it_handles_forget_failed(table):
  267. update_status(
  268. "job123",
  269. [
  270. {
  271. "Id": "job123",
  272. "Sk": "123456",
  273. "Type": "JobEvent",
  274. "CreatedAt": 123.0,
  275. "EventName": "ForgetPhaseFailed",
  276. "EventData": {},
  277. }
  278. ],
  279. )
  280. table.update_item.assert_called_with(
  281. Key={
  282. "Id": "job123",
  283. "Sk": "job123",
  284. },
  285. UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
  286. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  287. ExpressionAttributeNames={
  288. "#Id": "Id",
  289. "#Sk": "Sk",
  290. "#JobStatus": "JobStatus",
  291. "#JobFinishTime": "JobFinishTime",
  292. },
  293. ExpressionAttributeValues={
  294. ":Id": "job123",
  295. ":Sk": "job123",
  296. ":JobStatus": "FORGET_FAILED",
  297. ":RUNNING": "RUNNING",
  298. ":QUEUED": "QUEUED",
  299. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  300. ":JobFinishTime": 123.0,
  301. },
  302. ReturnValues="ALL_NEW",
  303. )
  304. assert 1 == table.update_item.call_count
  305. @patch(
  306. "backend.lambdas.jobs.status_updater.determine_status", Mock(return_value="FAILED")
  307. )
  308. @patch("backend.lambdas.jobs.status_updater.table")
  309. def test_it_handles_exception(table):
  310. update_status(
  311. "job123",
  312. [
  313. {
  314. "Id": "job123",
  315. "Sk": "123456",
  316. "Type": "JobEvent",
  317. "CreatedAt": 123.0,
  318. "EventName": "Exception",
  319. "EventData": {},
  320. }
  321. ],
  322. )
  323. table.update_item.assert_called_with(
  324. Key={
  325. "Id": "job123",
  326. "Sk": "job123",
  327. },
  328. UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
  329. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  330. ExpressionAttributeNames={
  331. "#Id": "Id",
  332. "#Sk": "Sk",
  333. "#JobStatus": "JobStatus",
  334. "#JobFinishTime": "JobFinishTime",
  335. },
  336. ExpressionAttributeValues={
  337. ":Id": "job123",
  338. ":Sk": "job123",
  339. ":JobStatus": "FAILED",
  340. ":RUNNING": "RUNNING",
  341. ":QUEUED": "QUEUED",
  342. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  343. ":JobFinishTime": 123.0,
  344. },
  345. ReturnValues="ALL_NEW",
  346. )
  347. assert 1 == table.update_item.call_count
  348. @patch("backend.lambdas.jobs.status_updater.ddb")
  349. @patch("backend.lambdas.jobs.status_updater.table")
  350. def test_it_handles_already_failed_jobs(table, ddb):
  351. e = boto3.client("dynamodb").exceptions.ConditionalCheckFailedException
  352. ddb.meta.client.exceptions.ConditionalCheckFailedException = e
  353. table.update_item.side_effect = e({}, "ConditionalCheckFailedException")
  354. update_status(
  355. "job123",
  356. [
  357. {
  358. "Id": "job123",
  359. "Sk": "123456",
  360. "Type": "JobEvent",
  361. "CreatedAt": 123.0,
  362. "EventName": "Exception",
  363. "EventData": {},
  364. }
  365. ],
  366. )
  367. table.update_item.assert_called()
  368. @patch("backend.lambdas.jobs.status_updater.table")
  369. def test_it_throws_for_non_condition_errors(table):
  370. table.update_item.side_effect = ClientError(
  371. {"Error": {"Code": "AnError"}}, "update_item"
  372. )
  373. with pytest.raises(ClientError):
  374. update_status(
  375. "job123",
  376. [
  377. {
  378. "Id": "job123",
  379. "Sk": "123456",
  380. "Type": "JobEvent",
  381. "CreatedAt": 123.0,
  382. "EventName": "Exception",
  383. "EventData": {},
  384. }
  385. ],
  386. )
  387. @patch("backend.lambdas.jobs.status_updater.table")
  388. def test_it_ignores_none_status_events(table):
  389. update_status(
  390. "job123",
  391. [
  392. {
  393. "Id": "job123",
  394. "Sk": "123456",
  395. "Type": "JobEvent",
  396. "CreatedAt": 123.0,
  397. "EventName": "SomeEvent",
  398. "EventData": {},
  399. }
  400. ],
  401. )
  402. table.update_item.assert_not_called()
  403. @patch("backend.lambdas.jobs.status_updater.table")
  404. def test_it_handles_query_planning_complete(table):
  405. update_status(
  406. "job123",
  407. [
  408. {
  409. "Id": "job123",
  410. "Sk": "123456",
  411. "Type": "JobEvent",
  412. "CreatedAt": 123.0,
  413. "EventName": "QueryPlanningComplete",
  414. "EventData": {
  415. "GeneratedQueries": 123,
  416. "DeletionQueueSize": 3456,
  417. "Manifests": [
  418. "s3://temp-bucket/manifests/job123/dm-123/manifest.json"
  419. ],
  420. },
  421. }
  422. ],
  423. )
  424. table.update_item.assert_called_with(
  425. Key={
  426. "Id": "job123",
  427. "Sk": "job123",
  428. },
  429. UpdateExpression="set #GeneratedQueries = :GeneratedQueries, #DeletionQueueSize = :DeletionQueueSize, #Manifests = :Manifests",
  430. ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
  431. ExpressionAttributeNames={
  432. "#Id": "Id",
  433. "#Sk": "Sk",
  434. "#JobStatus": "JobStatus",
  435. "#GeneratedQueries": "GeneratedQueries",
  436. "#DeletionQueueSize": "DeletionQueueSize",
  437. "#Manifests": "Manifests",
  438. },
  439. ExpressionAttributeValues={
  440. ":Id": "job123",
  441. ":Sk": "job123",
  442. ":GeneratedQueries": 123,
  443. ":DeletionQueueSize": 3456,
  444. ":Manifests": ["s3://temp-bucket/manifests/job123/dm-123/manifest.json"],
  445. ":RUNNING": "RUNNING",
  446. ":QUEUED": "QUEUED",
  447. ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
  448. },
  449. ReturnValues="ALL_NEW",
  450. )
  451. assert 1 == table.update_item.call_count