123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- import json
- import os
- from types import SimpleNamespace
- from decimal import Decimal
- import pytest
- from mock import patch, ANY
- with patch.dict(os.environ, {"DeletionQueueTable": "DeletionQueueTable"}):
- from backend.lambdas.queue import handlers
- pytestmark = [pytest.mark.unit, pytest.mark.api, pytest.mark.queue]
- autorization_mock = {
- "authorizer": {
- "claims": {"sub": "cognitoSub", "cognito:username": "cognitoUsername"}
- }
- }
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_retrieves_all_items(table):
- table.scan.return_value = {"Items": []}
- response = handlers.get_handler({}, SimpleNamespace())
- assert {
- "statusCode": 200,
- "body": json.dumps({"MatchIds": [], "NextStart": None}),
- "headers": ANY,
- } == response
- table.scan.assert_called_with(Limit=10)
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_retrieves_all_items_with_size_and_pagination(table):
- table.scan.return_value = {
- "Items": [
- {
- "DeletionQueueItemId": "id123",
- "MatchId": "foo",
- "DataMappers": [],
- "CreatedAt": 123456789,
- }
- ]
- }
- response = handlers.get_handler(
- {"queryStringParameters": {"page_size": "1", "start_at": "id000"}},
- SimpleNamespace(),
- )
- assert {
- "statusCode": 200,
- "body": json.dumps(
- {
- "MatchIds": [
- {
- "Type": "Simple",
- "DeletionQueueItemId": "id123",
- "MatchId": "foo",
- "DataMappers": [],
- "CreatedAt": 123456789,
- }
- ],
- "NextStart": "id123",
- }
- ),
- "headers": ANY,
- } == response
- table.scan.assert_called_with(
- Limit=1, ExclusiveStartKey={"DeletionQueueItemId": "id000"}
- )
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_adds_to_queue(table):
- response = handlers.enqueue_handler(
- {
- "body": json.dumps({"MatchId": "test", "DataMappers": ["a"]}),
- "requestContext": autorization_mock,
- },
- SimpleNamespace(),
- )
- assert 201 == response["statusCode"]
- assert {
- "DeletionQueueItemId": ANY,
- "MatchId": "test",
- "Type": "Simple",
- "CreatedAt": ANY,
- "DataMappers": ["a"],
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- } == json.loads(response["body"])
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_adds_composite_to_queue(table):
- mid = [{"Column": "first_name", "Value": "test"}]
- response = handlers.enqueue_handler(
- {
- "body": json.dumps(
- {
- "MatchId": mid,
- "Type": "Composite",
- "DataMappers": ["a"],
- }
- ),
- "requestContext": autorization_mock,
- },
- SimpleNamespace(),
- )
- assert 201 == response["statusCode"]
- assert {
- "DeletionQueueItemId": ANY,
- "MatchId": mid,
- "Type": "Composite",
- "CreatedAt": ANY,
- "DataMappers": ["a"],
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- } == json.loads(response["body"])
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_adds_batch_to_queue(table):
- response = handlers.enqueue_batch_handler(
- {
- "body": json.dumps(
- {
- "Matches": [
- {"MatchId": "test", "DataMappers": ["a"]},
- {"MatchId": "test2", "DataMappers": ["a"]},
- ]
- }
- ),
- "requestContext": autorization_mock,
- },
- SimpleNamespace(),
- )
- assert 201 == response["statusCode"]
- assert {
- "Matches": [
- {
- "DeletionQueueItemId": ANY,
- "MatchId": "test",
- "Type": "Simple",
- "CreatedAt": ANY,
- "DataMappers": ["a"],
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- },
- {
- "DeletionQueueItemId": ANY,
- "MatchId": "test2",
- "Type": "Simple",
- "CreatedAt": ANY,
- "DataMappers": ["a"],
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- },
- ]
- } == json.loads(response["body"])
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_provides_default_data_mappers(table):
- response = handlers.enqueue_handler(
- {
- "body": json.dumps(
- {
- "MatchId": "test",
- }
- ),
- "requestContext": autorization_mock,
- },
- SimpleNamespace(),
- )
- assert 201 == response["statusCode"]
- assert {
- "DeletionQueueItemId": ANY,
- "MatchId": "test",
- "Type": "Simple",
- "CreatedAt": ANY,
- "DataMappers": [],
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- } == json.loads(response["body"])
- @patch("backend.lambdas.queue.handlers.running_job_exists")
- @patch("backend.lambdas.queue.handlers.deletion_queue_table")
- def test_it_cancels_deletions(table, mock_running_job):
- mock_running_job.return_value = False
- response = handlers.cancel_handler(
- {
- "body": json.dumps(
- {
- "Matches": [{"DeletionQueueItemId": "id123"}],
- }
- )
- },
- SimpleNamespace(),
- )
- assert {"statusCode": 204, "headers": ANY} == response
- @patch("backend.lambdas.queue.handlers.running_job_exists")
- def test_it_prevents_cancelling_whilst_running_jobs(mock_running_job):
- mock_running_job.return_value = True
- response = handlers.cancel_handler(
- {
- "body": json.dumps(
- {
- "Matches": [
- {
- "MatchId": "test",
- "CreatedAt": 123456789,
- }
- ],
- }
- )
- },
- SimpleNamespace(),
- )
- assert 400 == response["statusCode"]
- assert "headers" in response
- @patch("backend.lambdas.queue.handlers.bucket_count", 1)
- @patch("backend.lambdas.queue.handlers.uuid")
- @patch("backend.lambdas.queue.handlers.jobs_table")
- @patch("backend.lambdas.queue.handlers.running_job_exists")
- @patch("backend.lambdas.queue.handlers.get_config")
- def test_it_process_queue(mock_config, mock_running_job, job_table, uuid):
- mock_running_job.return_value = False
- mock_config.return_value = {
- "AthenaConcurrencyLimit": 15,
- "AthenaQueryMaxRetries": 2,
- "DeletionTasksMaxNumber": 50,
- "QueryExecutionWaitSeconds": 5,
- "QueryQueueWaitSeconds": 5,
- "ForgetQueueWaitSeconds": 30,
- }
- uuid.uuid4.return_value = 123
- response = handlers.process_handler(
- {"body": "", "requestContext": autorization_mock}, SimpleNamespace()
- )
- job_table.put_item.assert_called_with(
- Item={
- "Id": "123",
- "Sk": "123",
- "Type": "Job",
- "JobStatus": "QUEUED",
- "GSIBucket": "0",
- "CreatedAt": ANY,
- "AthenaConcurrencyLimit": 15,
- "AthenaQueryMaxRetries": 2,
- "DeletionTasksMaxNumber": 50,
- "QueryExecutionWaitSeconds": 5,
- "QueryQueueWaitSeconds": 5,
- "ForgetQueueWaitSeconds": 30,
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- }
- )
- assert 202 == response["statusCode"]
- assert "headers" in response
- assert {
- "Id": "123",
- "Sk": "123",
- "Type": "Job",
- "JobStatus": "QUEUED",
- "GSIBucket": "0",
- "CreatedAt": ANY,
- "AthenaConcurrencyLimit": 15,
- "AthenaQueryMaxRetries": 2,
- "DeletionTasksMaxNumber": 50,
- "QueryExecutionWaitSeconds": 5,
- "QueryQueueWaitSeconds": 5,
- "ForgetQueueWaitSeconds": 30,
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- } == json.loads(response["body"])
- @patch("backend.lambdas.queue.handlers.bucket_count", 1)
- @patch("backend.lambdas.queue.handlers.uuid")
- @patch("backend.lambdas.queue.handlers.jobs_table")
- @patch("backend.lambdas.queue.handlers.running_job_exists")
- @patch("backend.lambdas.queue.handlers.get_config")
- @patch("backend.lambdas.queue.handlers.utc_timestamp")
- def test_it_applies_expiry(mock_utc, mock_config, mock_running_job, job_table, uuid):
- mock_running_job.return_value = False
- mock_utc.return_value = 12346789
- mock_config.return_value = {
- "AthenaConcurrencyLimit": 15,
- "AthenaQueryMaxRetries": 2,
- "DeletionTasksMaxNumber": 50,
- "JobDetailsRetentionDays": 30,
- "QueryExecutionWaitSeconds": 5,
- "QueryQueueWaitSeconds": 5,
- "ForgetQueueWaitSeconds": 30,
- }
- uuid.uuid4.return_value = 123
- response = handlers.process_handler(
- {"body": "", "requestContext": autorization_mock}, SimpleNamespace()
- )
- mock_utc.assert_called_with(days=30)
- job_table.put_item.assert_called_with(
- Item={
- "Id": "123",
- "Sk": "123",
- "Type": "Job",
- "JobStatus": "QUEUED",
- "GSIBucket": "0",
- "CreatedAt": ANY,
- "Expires": 12346789,
- "AthenaConcurrencyLimit": 15,
- "AthenaQueryMaxRetries": 2,
- "DeletionTasksMaxNumber": 50,
- "QueryExecutionWaitSeconds": 5,
- "QueryQueueWaitSeconds": 5,
- "ForgetQueueWaitSeconds": 30,
- "CreatedBy": {"Username": "cognitoUsername", "Sub": "cognitoSub"},
- }
- )
- assert 202 == response["statusCode"]
- @patch("backend.lambdas.queue.handlers.running_job_exists")
- def test_it_prevents_concurrent_running_jobs(mock_running_job):
- mock_running_job.return_value = True
- response = handlers.process_handler(
- {"body": "", "requestContext": autorization_mock}, SimpleNamespace()
- )
- assert 400 == response["statusCode"]
- assert "headers" in response
- def test_it_validates_composite_queue_item_for_matchid_not_array():
- items = [
- {
- "Type": "Composite",
- "MatchId": "Test",
- "Columns": ["column"],
- "DataMappers": [],
- }
- ]
- with pytest.raises(ValueError) as e:
- handlers.validate_queue_items(items)
- assert e.value.args[0] == "MatchIds of Composite type need to be specified as array"
- def test_it_validates_composite_queue_item_for_matchid_empty_array():
- items = [
- {"Type": "Composite", "MatchId": [], "Columns": ["column"], "DataMappers": []}
- ]
- with pytest.raises(ValueError) as e:
- handlers.validate_queue_items(items)
- assert (
- e.value.args[0]
- == "MatchIds of Composite type need to have a value for at least one column"
- )
- def test_it_validates_composite_queue_item_for_data_mapper_empty():
- items = [
- {
- "Type": "Composite",
- "MatchId": [{"Column": "first_name", "Value": "Test"}],
- "Columns": ["column"],
- "DataMappers": [],
- }
- ]
- with pytest.raises(ValueError) as e:
- handlers.validate_queue_items(items)
- assert (
- e.value.args[0]
- == "MatchIds of Composite type need to be associated to exactly one Data Mapper"
- )
- def test_it_validates_composite_queue_item_for_too_many_data_mappers():
- items = [
- {
- "Type": "Composite",
- "MatchId": [{"Column": "first_name", "Value": "Test"}],
- "Columns": ["column"],
- "DataMappers": ["foo", "bar"],
- }
- ]
- with pytest.raises(ValueError) as e:
- handlers.validate_queue_items(items)
- assert (
- e.value.args[0]
- == "MatchIds of Composite type need to be associated to exactly one Data Mapper"
- )
|