11-test_upload.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import threading
  14. import time
  15. from concurrent.futures import CancelledError
  16. from io import BytesIO
  17. from s3transfer.manager import TransferConfig
  18. from tests import (
  19. NonSeekableReader,
  20. RecordingSubscriber,
  21. skip_if_using_serial_implementation,
  22. )
  23. from tests.integration import (
  24. BaseTransferManagerIntegTest,
  25. WaitForTransferStart,
  26. )
  27. class TestUpload(BaseTransferManagerIntegTest):
  28. def setUp(self):
  29. super().setUp()
  30. self.multipart_threshold = 5 * 1024 * 1024
  31. self.config = TransferConfig(
  32. multipart_threshold=self.multipart_threshold
  33. )
  34. def get_input_fileobj(self, size, name=''):
  35. return self.files.create_file_with_size(name, size)
  36. def test_upload_below_threshold(self):
  37. transfer_manager = self.create_transfer_manager(self.config)
  38. file = self.get_input_fileobj(size=1024 * 1024, name='1mb.txt')
  39. future = transfer_manager.upload(file, self.bucket_name, '1mb.txt')
  40. self.addCleanup(self.delete_object, '1mb.txt')
  41. future.result()
  42. self.assertTrue(self.object_exists('1mb.txt'))
  43. def test_upload_above_threshold(self):
  44. transfer_manager = self.create_transfer_manager(self.config)
  45. file = self.get_input_fileobj(size=20 * 1024 * 1024, name='20mb.txt')
  46. future = transfer_manager.upload(file, self.bucket_name, '20mb.txt')
  47. self.addCleanup(self.delete_object, '20mb.txt')
  48. future.result()
  49. self.assertTrue(self.object_exists('20mb.txt'))
  50. @skip_if_using_serial_implementation(
  51. 'Exception is thrown once the transfer is submitted. '
  52. 'However for the serial implementation, transfers are performed '
  53. 'in main thread meaning the transfer will complete before the '
  54. 'KeyboardInterrupt being thrown.'
  55. )
  56. def test_large_upload_exits_quicky_on_exception(self):
  57. transfer_manager = self.create_transfer_manager(self.config)
  58. filename = self.get_input_fileobj(
  59. name='foo.txt', size=20 * 1024 * 1024
  60. )
  61. timeout = 10
  62. bytes_transferring = threading.Event()
  63. subscriber = WaitForTransferStart(bytes_transferring)
  64. try:
  65. with transfer_manager:
  66. future = transfer_manager.upload(
  67. filename,
  68. self.bucket_name,
  69. '20mb.txt',
  70. subscribers=[subscriber],
  71. )
  72. if not bytes_transferring.wait(timeout):
  73. future.cancel()
  74. raise RuntimeError(
  75. "Download transfer did not start after waiting for "
  76. "%s seconds." % timeout
  77. )
  78. # Raise an exception which should cause the preceding
  79. # download to cancel and exit quickly
  80. start_time = time.time()
  81. raise KeyboardInterrupt()
  82. except KeyboardInterrupt:
  83. pass
  84. end_time = time.time()
  85. # The maximum time allowed for the transfer manager to exit.
  86. # This means that it should take less than a couple second after
  87. # sleeping to exit.
  88. max_allowed_exit_time = 5
  89. actual_time_to_exit = end_time - start_time
  90. self.assertLess(
  91. actual_time_to_exit,
  92. max_allowed_exit_time,
  93. "Failed to exit under {}. Instead exited in {}.".format(
  94. max_allowed_exit_time, actual_time_to_exit
  95. ),
  96. )
  97. try:
  98. future.result()
  99. self.skipTest(
  100. 'Upload completed before interrupted and therefore '
  101. 'could not cancel the upload'
  102. )
  103. except CancelledError as e:
  104. self.assertEqual(str(e), 'KeyboardInterrupt()')
  105. # If the transfer did get cancelled,
  106. # make sure the object does not exist.
  107. self.assertTrue(self.object_not_exists('20mb.txt'))
  108. @skip_if_using_serial_implementation(
  109. 'Exception is thrown once the transfers are submitted. '
  110. 'However for the serial implementation, transfers are performed '
  111. 'in main thread meaning the transfers will complete before the '
  112. 'KeyboardInterrupt being thrown.'
  113. )
  114. def test_many_files_exits_quicky_on_exception(self):
  115. # Set the max request queue size and number of submission threads
  116. # to something small to simulate having a large queue
  117. # of transfer requests to complete and it is backed up.
  118. self.config.max_request_queue_size = 1
  119. self.config.max_submission_concurrency = 1
  120. transfer_manager = self.create_transfer_manager(self.config)
  121. fileobjs = []
  122. keynames = []
  123. futures = []
  124. for i in range(10):
  125. filename = 'file' + str(i)
  126. keynames.append(filename)
  127. fileobjs.append(
  128. self.get_input_fileobj(name=filename, size=1024 * 1024)
  129. )
  130. try:
  131. with transfer_manager:
  132. for i, fileobj in enumerate(fileobjs):
  133. futures.append(
  134. transfer_manager.upload(
  135. fileobj, self.bucket_name, keynames[i]
  136. )
  137. )
  138. # Raise an exception which should cause the preceding
  139. # transfer to cancel and exit quickly
  140. start_time = time.time()
  141. raise KeyboardInterrupt()
  142. except KeyboardInterrupt:
  143. pass
  144. end_time = time.time()
  145. # The maximum time allowed for the transfer manager to exit.
  146. # This means that it should take less than a couple seconds to exit.
  147. max_allowed_exit_time = 5
  148. self.assertLess(
  149. end_time - start_time,
  150. max_allowed_exit_time,
  151. "Failed to exit under {}. Instead exited in {}.".format(
  152. max_allowed_exit_time, end_time - start_time
  153. ),
  154. )
  155. # Make sure at least one of the futures got cancelled
  156. with self.assertRaisesRegex(CancelledError, 'KeyboardInterrupt()'):
  157. for future in futures:
  158. future.result()
  159. # For the transfer that did get cancelled, make sure the object
  160. # does not exist.
  161. self.assertTrue(self.object_not_exists(future.meta.call_args.key))
  162. def test_progress_subscribers_on_upload(self):
  163. subscriber = RecordingSubscriber()
  164. transfer_manager = self.create_transfer_manager(self.config)
  165. file = self.get_input_fileobj(size=20 * 1024 * 1024, name='20mb.txt')
  166. future = transfer_manager.upload(
  167. file, self.bucket_name, '20mb.txt', subscribers=[subscriber]
  168. )
  169. self.addCleanup(self.delete_object, '20mb.txt')
  170. future.result()
  171. # The callback should have been called enough times such that
  172. # the total amount of bytes we've seen (via the "amount"
  173. # arg to the callback function) should be the size
  174. # of the file we uploaded.
  175. self.assertEqual(subscriber.calculate_bytes_seen(), 20 * 1024 * 1024)
  176. class TestUploadSeekableStream(TestUpload):
  177. def get_input_fileobj(self, size, name=''):
  178. return BytesIO(b'0' * size)
  179. class TestUploadNonSeekableStream(TestUpload):
  180. def get_input_fileobj(self, size, name=''):
  181. return NonSeekableReader(b'0' * size)