|
- import datetime
- import json
- from mock import patch, MagicMock, call, ANY
- from io import BytesIO
- import pytest
- from botocore.exceptions import ClientError
- from backend.ecs_tasks.delete_files.s3 import (
- delete_old_versions,
- DeleteOldVersionsError,
- fetch_job_manifest,
- fetch_manifest,
- get_requester_payment,
- get_grantees,
- get_object_acl,
- get_object_info,
- get_object_tags,
- IntegrityCheckFailedError,
- rollback_object_version,
- save,
- s3transfer,
- validate_bucket_versioning,
- verify_object_versions_integrity,
- )
- pytestmark = [pytest.mark.unit, pytest.mark.ecs_tasks]
- def get_list_object_versions_error():
- return ClientError(
- {
- "Error": {
- "Code": "InvalidArgument",
- "Message": "Invalid version id specified",
- }
- },
- "ListObjectVersions",
- )
- def test_it_validates_bucket_versioning():
- validate_bucket_versioning.cache_clear()
- client = MagicMock()
- client.get_bucket_versioning.return_value = {"Status": "Enabled"}
- assert validate_bucket_versioning(client, "bucket")
- def test_it_throws_when_versioning_disabled():
- validate_bucket_versioning.cache_clear()
- client = MagicMock()
- client.get_bucket_versioning.return_value = {}
- with pytest.raises(ValueError) as e:
- validate_bucket_versioning(client, "bucket")
- assert e.value.args[0] == "Bucket bucket does not have versioning enabled"
- def test_it_throws_when_versioning_suspended():
- validate_bucket_versioning.cache_clear()
- client = MagicMock()
- client.get_bucket_versioning.return_value = {"Status": "Suspended"}
- with pytest.raises(ValueError) as e:
- validate_bucket_versioning(client, "bucket")
- assert e.value.args[0] == "Bucket bucket does not have versioning enabled"
- def test_it_throws_when_mfa_delete_enabled():
- validate_bucket_versioning.cache_clear()
- client = MagicMock()
- client.get_bucket_versioning.return_value = {
- "Status": "Enabled",
- "MFADelete": "Enabled",
- }
- with pytest.raises(ValueError) as e:
- validate_bucket_versioning(client, "bucket")
- assert e.value.args[0] == "Bucket bucket has MFA Delete enabled"
- def test_it_returns_requester_pays():
- get_requester_payment.cache_clear()
- client = MagicMock()
- client.get_bucket_request_payment.return_value = {"Payer": "Requester"}
- assert (
- {"RequestPayer": "requester"},
- {"Payer": "Requester"},
- ) == get_requester_payment(client, "bucket")
- def test_it_returns_empty_for_non_requester_pays():
- get_requester_payment.cache_clear()
- client = MagicMock()
- client.get_bucket_request_payment.return_value = {"Payer": "Owner"}
- assert ({}, {"Payer": "Owner"}) == get_requester_payment(client, "bucket")
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- def test_it_returns_standard_info(mock_requester):
- get_object_info.cache_clear()
- client = MagicMock()
- mock_requester.return_value = {}, {}
- stub = {
- "CacheControl": "cache",
- "ContentDisposition": "content_disposition",
- "ContentEncoding": "content_encoding",
- "ContentLanguage": "content_language",
- "ContentType": "ContentType",
- "Expires": "123",
- "Metadata": {"foo": "bar"},
- "ServerSideEncryption": "see",
- "StorageClass": "STANDARD",
- "SSECustomerAlgorithm": "aws:kms",
- "SSEKMSKeyId": "1234",
- "WebsiteRedirectLocation": "test",
- }
- client.head_object.return_value = stub
- assert stub == get_object_info(client, "bucket", "key")[0]
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- def test_it_strips_empty_standard_info(mock_requester):
- get_object_info.cache_clear()
- client = MagicMock()
- mock_requester.return_value = {}, {}
- stub = {
- "CacheControl": "cache",
- "ContentDisposition": "content_disposition",
- "ContentEncoding": "content_encoding",
- "ContentLanguage": "content_language",
- "ContentType": "ContentType",
- "Expires": "123",
- "Metadata": {"foo": "bar"},
- "ServerSideEncryption": "see",
- "StorageClass": "STANDARD",
- "SSECustomerAlgorithm": "aws:kms",
- "SSEKMSKeyId": "1234",
- "WebsiteRedirectLocation": None,
- }
- client.head_object.return_value = stub
- assert {
- "CacheControl": "cache",
- "ContentDisposition": "content_disposition",
- "ContentEncoding": "content_encoding",
- "ContentLanguage": "content_language",
- "ContentType": "ContentType",
- "Expires": "123",
- "Metadata": {"foo": "bar"},
- "ServerSideEncryption": "see",
- "StorageClass": "STANDARD",
- "SSECustomerAlgorithm": "aws:kms",
- "SSEKMSKeyId": "1234",
- } == get_object_info(client, "bucket", "key")[0]
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- def test_it_handles_versions_for_get_info(mock_requester):
- get_object_info.cache_clear()
- client = MagicMock()
- mock_requester.return_value = {}, {}
- client.head_object.return_value = {}
- get_object_info(client, "bucket", "key")
- client.head_object.assert_called_with(Bucket="bucket", Key="key")
- get_object_info(client, "bucket", "key", "abc123")
- client.head_object.assert_called_with(
- Bucket="bucket", Key="key", VersionId="abc123"
- )
- def test_it_gets_tagging_args():
- get_object_tags.cache_clear()
- client = MagicMock()
- client.get_object_tagging.return_value = {
- "TagSet": [{"Key": "a", "Value": "b"}, {"Key": "c", "Value": "d"}]
- }
- assert {"Tagging": "a=b&c=d",} == get_object_tags(
- client, "bucket", "key"
- )[0]
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- def test_it_handles_versions_for_get_tagging(mock_requester):
- get_object_info.cache_clear()
- client = MagicMock()
- mock_requester.return_value = {}, {}
- client.get_object_tagging.return_value = {"TagSet": []}
- get_object_tags(client, "bucket", "key")
- client.get_object_tagging.assert_called_with(Bucket="bucket", Key="key")
- get_object_tags(client, "bucket", "key", "abc123")
- client.get_object_tagging.assert_called_with(
- Bucket="bucket", Key="key", VersionId="abc123"
- )
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- def test_it_gets_acl_args(mock_requester):
- get_object_acl.cache_clear()
- client = MagicMock()
- mock_requester.return_value = {}, {}
- client.get_object_acl.return_value = {
- "Owner": {"ID": "a"},
- "Grants": [
- {"Grantee": {"ID": "b", "Type": "CanonicalUser"}, "Permission": "READ"},
- {"Grantee": {"ID": "c", "Type": "CanonicalUser"}, "Permission": "READ_ACP"},
- ],
- }
- assert {
- "GrantFullControl": "id=a",
- "GrantRead": "id=b",
- "GrantReadACP": "id=c",
- } == get_object_acl(client, "bucket", "key")[0]
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- def test_it_handles_versions_for_get_acl(mock_requester):
- get_object_info.cache_clear()
- client = MagicMock()
- mock_requester.return_value = {}, {}
- client.get_object_tagging.return_value = {
- "Owner": {"ID": "a"},
- "Grants": [
- {"Grantee": {"ID": "b", "Type": "CanonicalUser"}, "Permission": "READ"},
- {"Grantee": {"ID": "c", "Type": "CanonicalUser"}, "Permission": "READ_ACP"},
- ],
- }
- get_object_acl(client, "bucket", "key")
- client.get_object_acl.assert_called_with(Bucket="bucket", Key="key")
- get_object_acl(client, "bucket", "key", "abc123")
- client.get_object_acl.assert_called_with(
- Bucket="bucket", Key="key", VersionId="abc123"
- )
- def test_it_gets_grantees_by_type():
- acl = {
- "Owner": {"ID": "owner_id"},
- "Grants": [
- {
- "Grantee": {"ID": "grantee1", "Type": "CanonicalUser"},
- "Permission": "FULL_CONTROL",
- },
- {
- "Grantee": {"ID": "grantee2", "Type": "CanonicalUser"},
- "Permission": "FULL_CONTROL",
- },
- {
- "Grantee": {
- "EmailAddress": "grantee3",
- "Type": "AmazonCustomerByEmail",
- },
- "Permission": "READ",
- },
- {"Grantee": {"URI": "grantee4", "Type": "Group"}, "Permission": "WRITE"},
- {
- "Grantee": {"ID": "grantee5", "Type": "CanonicalUser"},
- "Permission": "READ_ACP",
- },
- {
- "Grantee": {"ID": "grantee6", "Type": "CanonicalUser"},
- "Permission": "WRITE_ACP",
- },
- ],
- }
- assert {"id=grantee1", "id=grantee2"} == get_grantees(acl, "FULL_CONTROL")
- assert {"emailAddress=grantee3"} == get_grantees(acl, "READ")
- assert {"uri=grantee4"} == get_grantees(acl, "WRITE")
- assert {"id=grantee5"} == get_grantees(acl, "READ_ACP")
- assert {"id=grantee6"} == get_grantees(acl, "WRITE_ACP")
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_info")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_tags")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_acl")
- @patch("backend.ecs_tasks.delete_files.s3.get_grantees")
- def test_it_applies_settings_when_saving(
- mock_grantees, mock_acl, mock_tagging, mock_standard, mock_requester
- ):
- mock_client = MagicMock()
- mock_requester.return_value = {"RequestPayer": "requester"}, {"Payer": "Requester"}
- mock_standard.return_value = ({"Expires": "123", "Metadata": {}}, {})
- mock_tagging.return_value = (
- {"Tagging": "a=b"},
- {"TagSet": [{"Key": "a", "Value": "b"}]},
- )
- mock_acl.return_value = (
- {
- "GrantFullControl": "id=abc",
- "GrantRead": "id=123",
- },
- {
- "Owner": {"ID": "owner_id"},
- "Grants": [
- {
- "Grantee": {"ID": "abc", "Type": "CanonicalUser"},
- "Permission": "FULL_CONTROL",
- },
- {
- "Grantee": {"ID": "123", "Type": "CanonicalUser"},
- "Permission": "READ",
- },
- ],
- },
- )
- mock_grantees.return_value = ""
- buf = BytesIO()
- mock_client.upload_fileobj.return_value = {"VersionId": "abc123"}
- resp = save(mock_client, buf, "bucket", "key", {}, "abc123")
- mock_client.upload_fileobj.assert_called_with(
- buf,
- "bucket",
- "key",
- ExtraArgs={
- "RequestPayer": "requester",
- "Expires": "123",
- "Metadata": {},
- "Tagging": "a=b",
- "GrantFullControl": "id=abc",
- "GrantRead": "id=123",
- },
- )
- assert "abc123" == resp
- mock_client.put_object_acl.assert_not_called()
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_info")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_tags")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_acl")
- @patch("backend.ecs_tasks.delete_files.s3.get_grantees")
- def test_it_passes_through_version(
- mock_grantees, mock_acl, mock_tagging, mock_standard, mock_requester
- ):
- mock_client = MagicMock()
- mock_requester.return_value = {}, {}
- mock_standard.return_value = ({}, {})
- mock_tagging.return_value = ({}, {})
- mock_acl.return_value = ({}, {})
- mock_grantees.return_value = ""
- buf = BytesIO()
- save(mock_client, buf, "bucket", "key", {}, "abc123")
- mock_acl.assert_called_with(mock_client, "bucket", "key", "abc123")
- mock_tagging.assert_called_with(mock_client, "bucket", "key", "abc123")
- mock_standard.assert_called_with(mock_client, "bucket", "key", "abc123")
- @patch("backend.ecs_tasks.delete_files.s3.get_requester_payment")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_info")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_tags")
- @patch("backend.ecs_tasks.delete_files.s3.get_object_acl")
- @patch("backend.ecs_tasks.delete_files.s3.get_grantees")
- def test_it_restores_write_permissions(
- mock_grantees, mock_acl, mock_tagging, mock_standard, mock_requester
- ):
- mock_client = MagicMock()
- mock_requester.return_value = {}, {}
- mock_standard.return_value = ({}, {})
- mock_tagging.return_value = ({}, {})
- mock_acl.return_value = (
- {
- "GrantFullControl": "id=abc",
- },
- {
- "Owner": {"ID": "owner_id"},
- "Grants": [
- {
- "Grantee": {"ID": "abc", "Type": "CanonicalUser"},
- "Permission": "FULL_CONTROL",
- },
- {
- "Grantee": {"ID": "123", "Type": "CanonicalUser"},
- "Permission": "WRITE",
- },
- ],
- },
- )
- mock_grantees.return_value = {"id=123"}
- buf = BytesIO()
- mock_client.upload_fileobj.return_value = {"VersionId": "new_version123"}
- save(mock_client, buf, "bucket", "key", "abc123")
- mock_client.put_object_acl.assert_called_with(
- Bucket="bucket",
- Key="key",
- VersionId="new_version123",
- GrantFullControl="id=abc",
- GrantWrite="id=123",
- )
- def test_it_verifies_integrity_happy_path():
- s3_mock = MagicMock()
- s3_mock.list_object_versions.return_value = {
- "VersionIdMarker": "v7",
- "Versions": [{"VersionId": "v6", "ETag": "a"}],
- }
- result = verify_object_versions_integrity(
- s3_mock, "bucket", "requirements.txt", "v6", "v7"
- )
- assert result
- s3_mock.list_object_versions.assert_called_with(
- Bucket="bucket",
- Prefix="requirements.txt",
- VersionIdMarker="v7",
- KeyMarker="requirements.txt",
- MaxKeys=1,
- )
- def test_it_fails_integrity_when_delete_marker_between():
- s3_mock = MagicMock()
- s3_mock.list_object_versions.return_value = {
- "VersionIdMarker": "v7",
- "Versions": [],
- "DeleteMarkers": [{"VersionId": "v6"}],
- }
- with pytest.raises(IntegrityCheckFailedError) as e:
- result = verify_object_versions_integrity(
- s3_mock, "bucket", "requirements.txt", "v5", "v7"
- )
- assert e.value.args == (
- "A delete marker (v6) was detected for the given object between read and write operations (v5 and v7).",
- s3_mock,
- "bucket",
- "requirements.txt",
- "v7",
- )
- def test_it_fails_integrity_when_other_version_between():
- s3_mock = MagicMock()
- s3_mock.list_object_versions.return_value = {
- "VersionIdMarker": "v7",
- "Versions": [{"VersionId": "v6", "ETag": "a"}],
- }
- with pytest.raises(IntegrityCheckFailedError) as e:
- result = verify_object_versions_integrity(
- s3_mock, "bucket", "requirements.txt", "v5", "v7"
- )
- assert e.value.args == (
- "A version (v6) was detected for the given object between read and write operations (v5 and v7).",
- s3_mock,
- "bucket",
- "requirements.txt",
- "v7",
- )
- def test_it_fails_integrity_when_no_other_version_before():
- s3_mock = MagicMock()
- s3_mock.list_object_versions.return_value = {
- "VersionIdMarker": "v7",
- "Versions": [],
- }
- with pytest.raises(IntegrityCheckFailedError) as e:
- result = verify_object_versions_integrity(
- s3_mock, "bucket", "requirements.txt", "v5", "v7"
- )
- assert e.value.args == (
- "Previous version (v5) has been deleted.",
- s3_mock,
- "bucket",
- "requirements.txt",
- "v7",
- )
- @patch("time.sleep")
- def test_it_errors_when_version_to_not_found_after_retries(sleep_mock):
- s3_mock = MagicMock()
- s3_mock.list_object_versions.side_effect = get_list_object_versions_error()
- with pytest.raises(ClientError) as e:
- result = verify_object_versions_integrity(
- s3_mock, "bucket", "requirements.txt", "v7", "v8"
- )
- assert sleep_mock.call_args_list == [call(2), call(4), call(8), call(16), call(32)]
- assert (
- e.value.args[0]
- == "An error occurred (InvalidArgument) when calling the ListObjectVersions operation: Invalid version id specified"
- )
- @patch("backend.ecs_tasks.delete_files.s3.paginate")
- def test_it_deletes_old_versions(paginate_mock):
- s3_mock = MagicMock()
- paginate_mock.return_value = iter(
- [
- (
- {
- "VersionId": "v1",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=4),
- },
- {
- "VersionId": "d2",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=3),
- },
- ),
- (
- {
- "VersionId": "v3",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=2),
- },
- None,
- ),
- ]
- )
- delete_old_versions(s3_mock, "bucket", "key", "v4")
- paginate_mock.assert_called_with(
- s3_mock,
- s3_mock.list_object_versions,
- ["Versions", "DeleteMarkers"],
- Bucket="bucket",
- Prefix="key",
- VersionIdMarker="v4",
- KeyMarker="key",
- )
- s3_mock.delete_objects.assert_called_with(
- Bucket="bucket",
- Delete={
- "Objects": [
- {"Key": "key", "VersionId": "v1"},
- {"Key": "key", "VersionId": "d2"},
- {"Key": "key", "VersionId": "v3"},
- ],
- "Quiet": True,
- },
- )
- @patch("backend.ecs_tasks.delete_files.s3.paginate")
- def test_it_handles_high_old_version_count(paginate_mock):
- s3_mock = MagicMock()
- paginate_mock.return_value = iter(
- [
- (
- {
- "VersionId": "v{}".format(i),
- "LastModified": datetime.datetime.now()
- + datetime.timedelta(minutes=i),
- },
- None,
- )
- for i in range(1, 1501)
- ]
- )
- delete_old_versions(s3_mock, "bucket", "key", "v0")
- paginate_mock.assert_called_with(
- s3_mock,
- s3_mock.list_object_versions,
- ["Versions", "DeleteMarkers"],
- Bucket="bucket",
- Prefix="key",
- VersionIdMarker="v0",
- KeyMarker="key",
- )
- assert 2 == s3_mock.delete_objects.call_count
- assert {
- "Bucket": "bucket",
- "Delete": {
- "Objects": [
- {"Key": "key", "VersionId": "v{}".format(i)} for i in range(1, 1001)
- ],
- "Quiet": True,
- },
- } == s3_mock.delete_objects.call_args_list[0][1]
- assert {
- "Bucket": "bucket",
- "Delete": {
- "Objects": [
- {"Key": "key", "VersionId": "v{}".format(i)} for i in range(1001, 1501)
- ],
- "Quiet": True,
- },
- } == s3_mock.delete_objects.call_args_list[1][1]
- @patch("backend.ecs_tasks.delete_files.s3.paginate")
- def test_it_retries_for_deletion_errors(paginate_mock):
- s3_mock = MagicMock()
- paginate_mock.return_value = iter(
- [
- (
- {
- "VersionId": "v1",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=4),
- },
- {
- "VersionId": "v2",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=3),
- },
- ),
- (
- {
- "VersionId": "v3",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=2),
- },
- None,
- ),
- ]
- )
- s3_mock.delete_objects.side_effect = [
- {
- "Errors": [
- {"VersionId": "v1", "Key": "key", "Message": "InternalServerError"}
- ]
- },
- {
- "Errors": [],
- },
- ]
- delete_old_versions(s3_mock, "bucket", "key", "v4")
- assert s3_mock.delete_objects.call_count == 2
- @patch("backend.ecs_tasks.delete_files.s3.paginate")
- @patch("backend.ecs_tasks.delete_files.s3.delete_s3_objects")
- def test_it_raises_for_deletion_errors(delete_s3_objects_mock, paginate_mock):
- s3_mock = MagicMock()
- paginate_mock.return_value = iter(
- [
- (
- {
- "VersionId": "v1",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=4),
- },
- {
- "VersionId": "v2",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=3),
- },
- ),
- (
- {
- "VersionId": "v3",
- "LastModified": datetime.datetime.now()
- - datetime.timedelta(minutes=2),
- },
- None,
- ),
- ]
- )
- delete_s3_objects_mock.return_value = {
- "Errors": [{"VersionId": "v1", "Key": "key", "Message": "Version not found"}]
- }
- with pytest.raises(DeleteOldVersionsError):
- delete_old_versions(s3_mock, "bucket", "key", "v4")
- @patch("backend.ecs_tasks.delete_files.s3.paginate")
- def test_it_handles_client_errors_as_deletion_errors(paginate_mock):
- s3_mock = MagicMock()
- paginate_mock.side_effect = get_list_object_versions_error()
- with pytest.raises(DeleteOldVersionsError):
- delete_old_versions(s3_mock, "bucket", "key", "v3")
- def test_it_deletes_new_version_during_rollback():
- s3_mock = MagicMock()
- s3_mock.delete_object.return_value = "result"
- mock_callback = MagicMock()
- result = rollback_object_version(
- s3_mock, "bucket", "requirements.txt", "version23", on_error=mock_callback
- )
- assert result == "result"
- s3_mock.delete_object.assert_called_with(
- Bucket="bucket", Key="requirements.txt", VersionId="version23"
- )
- mock_callback.assert_not_called()
- def test_it_handles_error_for_client_error():
- s3_mock = MagicMock()
- s3_mock.delete_object.side_effect = ClientError({}, "DeleteObject")
- mock_callback = MagicMock()
- result = rollback_object_version(
- s3_mock, "bucket", "requirements.txt", "version23", on_error=mock_callback
- )
- mock_callback.assert_called_with(
- "ClientError: An error occurred (Unknown) when calling the DeleteObject "
- "operation: Unknown. Version rollback caused by version integrity conflict "
- "failed"
- )
- def test_it_handles_error_for_generic_errors():
- s3_mock = MagicMock()
- s3_mock.delete_object.side_effect = RuntimeError("Some issue")
- mock_callback = MagicMock()
- result = rollback_object_version(
- s3_mock, "bucket", "requirements.txt", "version23", on_error=mock_callback
- )
- mock_callback.assert_called_with(
- "Unknown error: Some issue. Version rollback caused by version integrity "
- "conflict failed"
- )
- @patch("backend.ecs_tasks.delete_files.s3.fetch_job_manifest")
- def test_it_caches_manifests(mock_fetch):
- fetch_manifest.cache_clear()
- fetch_manifest("s3://path/to/manifest1.json")
- fetch_manifest("s3://path/to/manifest1.json")
- fetch_manifest("s3://path/to/manifest2.json")
- assert mock_fetch.call_count == 2
- mock_fetch.assert_has_calls(
- [call("s3://path/to/manifest1.json"), call("s3://path/to/manifest2.json")],
- any_order=True,
- )
- def test_s3transfer_locked_version():
- """
- https://github.com/boto/s3transfer/issues/82#issuecomment-837971614
- We have a monkey patch in place to allow us using boto3's upload_fileobj
- when we write back to S3. The issue is that while the method offers a nice
- wrapper around file operations, such as implementing multipart upload only
- when needed, we don't get the VersionId back as we would by using
- put_object. This monkey patch is in place while we wait some pull requests
- to be merged. In the meanwhile, here is a test that allow us to notice
- any change on the version we use on s3transfer, in order to add extra
- protection against automated library upgrade PRs that may silently introduce
- issues.
- """
- assert s3transfer.__version__ == "0.6.0"
- def test_s3transfer_put_object_monkeypatch_returns_response():
- put_object_task = s3transfer.upload.PutObjectTask(MagicMock())
- client_mock = MagicMock()
- client_mock.put_object.return_value = "result"
- file_mock = MagicMock()
- file_mock.__enter__.return_value = b"123"
- resp = put_object_task._main(client_mock, file_mock, "bucket", "key", {})
- client_mock.put_object.assert_called_with(Bucket="bucket", Key="key", Body=b"123")
- assert resp == "result"
- def test_s3transfer_complete_multipart_monkeypatch_returns_response():
- cmpu_task = s3transfer.upload.CompleteMultipartUploadTask(MagicMock())
- client_mock = MagicMock()
- client_mock.complete_multipart_upload.return_value = "cmpu_result"
- resp = cmpu_task._main(client_mock, "bucket", "key", "id", [{}], {})
- client_mock.complete_multipart_upload.assert_called_with(
- Bucket="bucket", Key="key", UploadId="id", MultipartUpload={"Parts": [{}]}
- )
- assert resp == "cmpu_result"
|