test_manager.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License'). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the 'license' file accompanying this file. This file is
  10. # distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from io import BytesIO
  14. from botocore.awsrequest import create_request_object
  15. from s3transfer.exceptions import CancelledError, FatalError
  16. from s3transfer.futures import BaseExecutor
  17. from s3transfer.manager import TransferConfig, TransferManager
  18. from tests import StubbedClientTest, mock, skip_if_using_serial_implementation
  19. class ArbitraryException(Exception):
  20. pass
  21. class SignalTransferringBody(BytesIO):
  22. """A mocked body with the ability to signal when transfers occur"""
  23. def __init__(self):
  24. super().__init__()
  25. self.signal_transferring_call_count = 0
  26. self.signal_not_transferring_call_count = 0
  27. def signal_transferring(self):
  28. self.signal_transferring_call_count += 1
  29. def signal_not_transferring(self):
  30. self.signal_not_transferring_call_count += 1
  31. def seek(self, where, whence=0):
  32. pass
  33. def tell(self):
  34. return 0
  35. def read(self, amount=0):
  36. return b''
  37. class TestTransferManager(StubbedClientTest):
  38. @skip_if_using_serial_implementation(
  39. 'Exception is thrown once all transfers are submitted. '
  40. 'However for the serial implementation, transfers are performed '
  41. 'in main thread meaning all transfers will complete before the '
  42. 'exception being thrown.'
  43. )
  44. def test_error_in_context_manager_cancels_incomplete_transfers(self):
  45. # The purpose of this test is to make sure if an error is raised
  46. # in the body of the context manager, incomplete transfers will
  47. # be cancelled with value of the exception wrapped by a CancelledError
  48. # NOTE: The fact that delete() was chosen to test this is arbitrary
  49. # other than it is the easiet to set up for the stubber.
  50. # The specific operation is not important to the purpose of this test.
  51. num_transfers = 100
  52. futures = []
  53. ref_exception_msg = 'arbitrary exception'
  54. for _ in range(num_transfers):
  55. self.stubber.add_response('delete_object', {})
  56. manager = TransferManager(
  57. self.client,
  58. TransferConfig(
  59. max_request_concurrency=1, max_submission_concurrency=1
  60. ),
  61. )
  62. try:
  63. with manager:
  64. for i in range(num_transfers):
  65. futures.append(manager.delete('mybucket', 'mykey'))
  66. raise ArbitraryException(ref_exception_msg)
  67. except ArbitraryException:
  68. # At least one of the submitted futures should have been
  69. # cancelled.
  70. with self.assertRaisesRegex(FatalError, ref_exception_msg):
  71. for future in futures:
  72. future.result()
  73. @skip_if_using_serial_implementation(
  74. 'Exception is thrown once all transfers are submitted. '
  75. 'However for the serial implementation, transfers are performed '
  76. 'in main thread meaning all transfers will complete before the '
  77. 'exception being thrown.'
  78. )
  79. def test_cntrl_c_in_context_manager_cancels_incomplete_transfers(self):
  80. # The purpose of this test is to make sure if an error is raised
  81. # in the body of the context manager, incomplete transfers will
  82. # be cancelled with value of the exception wrapped by a CancelledError
  83. # NOTE: The fact that delete() was chosen to test this is arbitrary
  84. # other than it is the easiet to set up for the stubber.
  85. # The specific operation is not important to the purpose of this test.
  86. num_transfers = 100
  87. futures = []
  88. for _ in range(num_transfers):
  89. self.stubber.add_response('delete_object', {})
  90. manager = TransferManager(
  91. self.client,
  92. TransferConfig(
  93. max_request_concurrency=1, max_submission_concurrency=1
  94. ),
  95. )
  96. try:
  97. with manager:
  98. for i in range(num_transfers):
  99. futures.append(manager.delete('mybucket', 'mykey'))
  100. raise KeyboardInterrupt()
  101. except KeyboardInterrupt:
  102. # At least one of the submitted futures should have been
  103. # cancelled.
  104. with self.assertRaisesRegex(CancelledError, 'KeyboardInterrupt()'):
  105. for future in futures:
  106. future.result()
  107. def test_enable_disable_callbacks_only_ever_registered_once(self):
  108. body = SignalTransferringBody()
  109. request = create_request_object(
  110. {
  111. 'method': 'PUT',
  112. 'url': 'https://s3.amazonaws.com',
  113. 'body': body,
  114. 'headers': {},
  115. 'context': {},
  116. }
  117. )
  118. # Create two TransferManager's using the same client
  119. TransferManager(self.client)
  120. TransferManager(self.client)
  121. self.client.meta.events.emit(
  122. 'request-created.s3', request=request, operation_name='PutObject'
  123. )
  124. # The client should have only have the enable/disable callback
  125. # handlers registered once depite being used for two different
  126. # TransferManagers.
  127. self.assertEqual(
  128. body.signal_transferring_call_count,
  129. 1,
  130. 'The enable_callback() should have only ever been registered once',
  131. )
  132. self.assertEqual(
  133. body.signal_not_transferring_call_count,
  134. 1,
  135. 'The disable_callback() should have only ever been registered '
  136. 'once',
  137. )
  138. def test_use_custom_executor_implementation(self):
  139. mocked_executor_cls = mock.Mock(BaseExecutor)
  140. transfer_manager = TransferManager(
  141. self.client, executor_cls=mocked_executor_cls
  142. )
  143. transfer_manager.delete('bucket', 'key')
  144. self.assertTrue(mocked_executor_cls.return_value.submit.called)
  145. def test_unicode_exception_in_context_manager(self):
  146. with self.assertRaises(ArbitraryException):
  147. with TransferManager(self.client):
  148. raise ArbitraryException('\u2713')
  149. def test_client_property(self):
  150. manager = TransferManager(self.client)
  151. self.assertIs(manager.client, self.client)
  152. def test_config_property(self):
  153. config = TransferConfig()
  154. manager = TransferManager(self.client, config)
  155. self.assertIs(manager.config, config)
  156. def test_can_disable_bucket_validation(self):
  157. s3_object_lambda_arn = (
  158. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  159. 'accesspoint:my-accesspoint'
  160. )
  161. config = TransferConfig()
  162. manager = TransferManager(self.client, config)
  163. manager.VALIDATE_SUPPORTED_BUCKET_VALUES = False
  164. manager.delete(s3_object_lambda_arn, 'my-key')