123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- import os
- import boto3
- import pytest
- from botocore.exceptions import ClientError
- from mock import patch, Mock
- with patch.dict(os.environ, {"JobTable": "test", "DeletionQueueTable": "test"}):
- from backend.lambdas.jobs.status_updater import (
- update_status,
- determine_status,
- job_has_errors,
- )
- pytestmark = [pytest.mark.unit, pytest.mark.jobs]
- @patch("backend.lambdas.jobs.status_updater.job_has_errors", Mock(return_value=False))
- def test_it_determines_basic_statuses():
- assert "FIND_FAILED" == determine_status("123", "FindPhaseFailed")
- assert "FORGET_FAILED" == determine_status("123", "ForgetPhaseFailed")
- assert "FAILED" == determine_status("123", "Exception")
- assert "RUNNING" == determine_status("123", "JobStarted")
- assert "FORGET_COMPLETED_CLEANUP_IN_PROGRESS" == determine_status(
- "123", "ForgetPhaseEnded"
- )
- assert "COMPLETED_CLEANUP_FAILED" == determine_status("123", "CleanupFailed")
- assert "COMPLETED" == determine_status("123", "CleanupSucceeded")
- @patch("backend.lambdas.jobs.status_updater.job_has_errors", Mock(return_value=True))
- def test_it_determines_completed_with_errors():
- assert "FORGET_PARTIALLY_FAILED" == determine_status("123", "ForgetPhaseEnded")
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_determines_job_has_errors_for_failed_object_updates(table):
- table.get_item.return_value = {"Item": {"TotalObjectUpdateFailedCount": 1}}
- assert job_has_errors("test")
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_determines_job_has_errors_for_failed_queries(table):
- table.get_item.return_value = {"Item": {"TotalQueryFailedCount": 1}}
- assert job_has_errors("test")
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_determines_job_does_not_have_errors_for_failed_object_updates(table):
- table.get_item.return_value = {
- "Item": {
- "TotalObjectUpdateFailedCount": 0,
- "TotalQueryFailedCount": 0,
- }
- }
- assert not job_has_errors("test")
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status", Mock(return_value="RUNNING")
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_job_started(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "JobStarted",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus, #JobStartTime = :JobStartTime",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#JobStartTime": "JobStartTime",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobStatus": "RUNNING",
- ":JobStartTime": 123.0,
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status",
- Mock(return_value="FORGET_COMPLETED_CLEANUP_IN_PROGRESS"),
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_forget_finished(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123,
- "EventName": "ForgetPhaseEnded",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobStatus": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status",
- Mock(return_value="COMPLETED"),
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_cleanup_success(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123,
- "EventName": "CleanupSucceeded",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#JobFinishTime": "JobFinishTime",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobStatus": "COMPLETED",
- ":JobFinishTime": 123.0,
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status",
- Mock(return_value="COMPLETED_CLEANUP_FAILED"),
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_cleanup_failed(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123,
- "EventName": "CleanupFailed",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#JobFinishTime": "JobFinishTime",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobStatus": "COMPLETED_CLEANUP_FAILED",
- ":JobFinishTime": 123.0,
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status",
- Mock(return_value="FIND_FAILED"),
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_find_failed(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "FindPhaseFailed",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#JobFinishTime": "JobFinishTime",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":JobStatus": "FIND_FAILED",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobFinishTime": 123.0,
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status",
- Mock(return_value="FORGET_FAILED"),
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_forget_failed(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "ForgetPhaseFailed",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#JobFinishTime": "JobFinishTime",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":JobStatus": "FORGET_FAILED",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobFinishTime": 123.0,
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch(
- "backend.lambdas.jobs.status_updater.determine_status", Mock(return_value="FAILED")
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_exception(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "Exception",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #JobStatus = :JobStatus, #JobFinishTime = :JobFinishTime",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#JobFinishTime": "JobFinishTime",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":JobStatus": "FAILED",
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- ":JobFinishTime": 123.0,
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
- @patch("backend.lambdas.jobs.status_updater.ddb")
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_already_failed_jobs(table, ddb):
- e = boto3.client("dynamodb").exceptions.ConditionalCheckFailedException
- ddb.meta.client.exceptions.ConditionalCheckFailedException = e
- table.update_item.side_effect = e({}, "ConditionalCheckFailedException")
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "Exception",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_called()
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_throws_for_non_condition_errors(table):
- table.update_item.side_effect = ClientError(
- {"Error": {"Code": "AnError"}}, "update_item"
- )
- with pytest.raises(ClientError):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "Exception",
- "EventData": {},
- }
- ],
- )
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_ignores_none_status_events(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "SomeEvent",
- "EventData": {},
- }
- ],
- )
- table.update_item.assert_not_called()
- @patch("backend.lambdas.jobs.status_updater.table")
- def test_it_handles_query_planning_complete(table):
- update_status(
- "job123",
- [
- {
- "Id": "job123",
- "Sk": "123456",
- "Type": "JobEvent",
- "CreatedAt": 123.0,
- "EventName": "QueryPlanningComplete",
- "EventData": {
- "GeneratedQueries": 123,
- "DeletionQueueSize": 3456,
- "Manifests": [
- "s3://temp-bucket/manifests/job123/dm-123/manifest.json"
- ],
- },
- }
- ],
- )
- table.update_item.assert_called_with(
- Key={
- "Id": "job123",
- "Sk": "job123",
- },
- UpdateExpression="set #GeneratedQueries = :GeneratedQueries, #DeletionQueueSize = :DeletionQueueSize, #Manifests = :Manifests",
- ConditionExpression="#Id = :Id AND #Sk = :Sk AND (#JobStatus = :RUNNING OR #JobStatus = :QUEUED OR #JobStatus = :FORGET_COMPLETED_CLEANUP_IN_PROGRESS)",
- ExpressionAttributeNames={
- "#Id": "Id",
- "#Sk": "Sk",
- "#JobStatus": "JobStatus",
- "#GeneratedQueries": "GeneratedQueries",
- "#DeletionQueueSize": "DeletionQueueSize",
- "#Manifests": "Manifests",
- },
- ExpressionAttributeValues={
- ":Id": "job123",
- ":Sk": "job123",
- ":GeneratedQueries": 123,
- ":DeletionQueueSize": 3456,
- ":Manifests": ["s3://temp-bucket/manifests/job123/dm-123/manifest.json"],
- ":RUNNING": "RUNNING",
- ":QUEUED": "QUEUED",
- ":FORGET_COMPLETED_CLEANUP_IN_PROGRESS": "FORGET_COMPLETED_CLEANUP_IN_PROGRESS",
- },
- ReturnValues="ALL_NEW",
- )
- assert 1 == table.update_item.call_count
|