test_download.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import glob
  14. import os
  15. import threading
  16. import time
  17. from concurrent.futures import CancelledError
  18. from s3transfer.manager import TransferConfig
  19. from tests import (
  20. NonSeekableWriter,
  21. RecordingSubscriber,
  22. assert_files_equal,
  23. skip_if_using_serial_implementation,
  24. skip_if_windows,
  25. )
  26. from tests.integration import (
  27. BaseTransferManagerIntegTest,
  28. WaitForTransferStart,
  29. )
  30. class TestDownload(BaseTransferManagerIntegTest):
  31. def setUp(self):
  32. super().setUp()
  33. self.multipart_threshold = 5 * 1024 * 1024
  34. self.config = TransferConfig(
  35. multipart_threshold=self.multipart_threshold
  36. )
  37. def test_below_threshold(self):
  38. transfer_manager = self.create_transfer_manager(self.config)
  39. filename = self.files.create_file_with_size(
  40. 'foo.txt', filesize=1024 * 1024
  41. )
  42. self.upload_file(filename, '1mb.txt')
  43. download_path = os.path.join(self.files.rootdir, '1mb.txt')
  44. future = transfer_manager.download(
  45. self.bucket_name, '1mb.txt', download_path
  46. )
  47. future.result()
  48. assert_files_equal(filename, download_path)
  49. def test_above_threshold(self):
  50. transfer_manager = self.create_transfer_manager(self.config)
  51. filename = self.files.create_file_with_size(
  52. 'foo.txt', filesize=20 * 1024 * 1024
  53. )
  54. self.upload_file(filename, '20mb.txt')
  55. download_path = os.path.join(self.files.rootdir, '20mb.txt')
  56. future = transfer_manager.download(
  57. self.bucket_name, '20mb.txt', download_path
  58. )
  59. future.result()
  60. assert_files_equal(filename, download_path)
  61. @skip_if_using_serial_implementation(
  62. 'Exception is thrown once the transfer is submitted. '
  63. 'However for the serial implementation, transfers are performed '
  64. 'in main thread meaning the transfer will complete before the '
  65. 'KeyboardInterrupt being thrown.'
  66. )
  67. def test_large_download_exits_quicky_on_exception(self):
  68. transfer_manager = self.create_transfer_manager(self.config)
  69. filename = self.files.create_file_with_size(
  70. 'foo.txt', filesize=60 * 1024 * 1024
  71. )
  72. self.upload_file(filename, '60mb.txt')
  73. download_path = os.path.join(self.files.rootdir, '60mb.txt')
  74. timeout = 10
  75. bytes_transferring = threading.Event()
  76. subscriber = WaitForTransferStart(bytes_transferring)
  77. try:
  78. with transfer_manager:
  79. future = transfer_manager.download(
  80. self.bucket_name,
  81. '60mb.txt',
  82. download_path,
  83. subscribers=[subscriber],
  84. )
  85. if not bytes_transferring.wait(timeout):
  86. future.cancel()
  87. raise RuntimeError(
  88. "Download transfer did not start after waiting for "
  89. "%s seconds." % timeout
  90. )
  91. # Raise an exception which should cause the preceding
  92. # download to cancel and exit quickly
  93. start_time = time.time()
  94. raise KeyboardInterrupt()
  95. except KeyboardInterrupt:
  96. pass
  97. end_time = time.time()
  98. # The maximum time allowed for the transfer manager to exit.
  99. # This means that it should take less than a couple second after
  100. # sleeping to exit.
  101. max_allowed_exit_time = 5
  102. actual_time_to_exit = end_time - start_time
  103. self.assertLess(
  104. actual_time_to_exit,
  105. max_allowed_exit_time,
  106. "Failed to exit under {}. Instead exited in {}.".format(
  107. max_allowed_exit_time, actual_time_to_exit
  108. ),
  109. )
  110. # Make sure the future was cancelled because of the KeyboardInterrupt
  111. with self.assertRaisesRegex(CancelledError, 'KeyboardInterrupt()'):
  112. future.result()
  113. # Make sure the actual file and the temporary do not exist
  114. # by globbing for the file and any of its extensions
  115. possible_matches = glob.glob('%s*' % download_path)
  116. self.assertEqual(possible_matches, [])
  117. @skip_if_using_serial_implementation(
  118. 'Exception is thrown once the transfer is submitted. '
  119. 'However for the serial implementation, transfers are performed '
  120. 'in main thread meaning the transfer will complete before the '
  121. 'KeyboardInterrupt being thrown.'
  122. )
  123. def test_many_files_exits_quicky_on_exception(self):
  124. # Set the max request queue size and number of submission threads
  125. # to something small to simulate having a large queue
  126. # of transfer requests to complete and it is backed up.
  127. self.config.max_request_queue_size = 1
  128. self.config.max_submission_concurrency = 1
  129. transfer_manager = self.create_transfer_manager(self.config)
  130. filename = self.files.create_file_with_size(
  131. 'foo.txt', filesize=1024 * 1024
  132. )
  133. self.upload_file(filename, '1mb.txt')
  134. filenames = []
  135. futures = []
  136. for i in range(10):
  137. filenames.append(os.path.join(self.files.rootdir, 'file' + str(i)))
  138. try:
  139. with transfer_manager:
  140. start_time = time.time()
  141. for filename in filenames:
  142. futures.append(
  143. transfer_manager.download(
  144. self.bucket_name, '1mb.txt', filename
  145. )
  146. )
  147. # Raise an exception which should cause the preceding
  148. # transfer to cancel and exit quickly
  149. raise KeyboardInterrupt()
  150. except KeyboardInterrupt:
  151. pass
  152. end_time = time.time()
  153. # The maximum time allowed for the transfer manager to exit.
  154. # This means that it should take less than a couple seconds to exit.
  155. max_allowed_exit_time = 5
  156. self.assertLess(
  157. end_time - start_time,
  158. max_allowed_exit_time,
  159. "Failed to exit under {}. Instead exited in {}.".format(
  160. max_allowed_exit_time, end_time - start_time
  161. ),
  162. )
  163. # Make sure at least one of the futures got cancelled
  164. with self.assertRaisesRegex(CancelledError, 'KeyboardInterrupt()'):
  165. for future in futures:
  166. future.result()
  167. # For the transfer that did get cancelled, make sure the temporary
  168. # file got removed.
  169. possible_matches = glob.glob('%s*' % future.meta.call_args.fileobj)
  170. self.assertEqual(possible_matches, [])
  171. def test_progress_subscribers_on_download(self):
  172. subscriber = RecordingSubscriber()
  173. transfer_manager = self.create_transfer_manager(self.config)
  174. filename = self.files.create_file_with_size(
  175. 'foo.txt', filesize=20 * 1024 * 1024
  176. )
  177. self.upload_file(filename, '20mb.txt')
  178. download_path = os.path.join(self.files.rootdir, '20mb.txt')
  179. future = transfer_manager.download(
  180. self.bucket_name,
  181. '20mb.txt',
  182. download_path,
  183. subscribers=[subscriber],
  184. )
  185. future.result()
  186. self.assertEqual(subscriber.calculate_bytes_seen(), 20 * 1024 * 1024)
  187. def test_below_threshold_for_fileobj(self):
  188. transfer_manager = self.create_transfer_manager(self.config)
  189. filename = self.files.create_file_with_size(
  190. 'foo.txt', filesize=1024 * 1024
  191. )
  192. self.upload_file(filename, '1mb.txt')
  193. download_path = os.path.join(self.files.rootdir, '1mb.txt')
  194. with open(download_path, 'wb') as f:
  195. future = transfer_manager.download(self.bucket_name, '1mb.txt', f)
  196. future.result()
  197. assert_files_equal(filename, download_path)
  198. def test_above_threshold_for_fileobj(self):
  199. transfer_manager = self.create_transfer_manager(self.config)
  200. filename = self.files.create_file_with_size(
  201. 'foo.txt', filesize=20 * 1024 * 1024
  202. )
  203. self.upload_file(filename, '20mb.txt')
  204. download_path = os.path.join(self.files.rootdir, '20mb.txt')
  205. with open(download_path, 'wb') as f:
  206. future = transfer_manager.download(self.bucket_name, '20mb.txt', f)
  207. future.result()
  208. assert_files_equal(filename, download_path)
  209. def test_below_threshold_for_nonseekable_fileobj(self):
  210. transfer_manager = self.create_transfer_manager(self.config)
  211. filename = self.files.create_file_with_size(
  212. 'foo.txt', filesize=1024 * 1024
  213. )
  214. self.upload_file(filename, '1mb.txt')
  215. download_path = os.path.join(self.files.rootdir, '1mb.txt')
  216. with open(download_path, 'wb') as f:
  217. future = transfer_manager.download(
  218. self.bucket_name, '1mb.txt', NonSeekableWriter(f)
  219. )
  220. future.result()
  221. assert_files_equal(filename, download_path)
  222. def test_above_threshold_for_nonseekable_fileobj(self):
  223. transfer_manager = self.create_transfer_manager(self.config)
  224. filename = self.files.create_file_with_size(
  225. 'foo.txt', filesize=20 * 1024 * 1024
  226. )
  227. self.upload_file(filename, '20mb.txt')
  228. download_path = os.path.join(self.files.rootdir, '20mb.txt')
  229. with open(download_path, 'wb') as f:
  230. future = transfer_manager.download(
  231. self.bucket_name, '20mb.txt', NonSeekableWriter(f)
  232. )
  233. future.result()
  234. assert_files_equal(filename, download_path)
  235. @skip_if_windows('Windows does not support UNIX special files')
  236. def test_download_to_special_file(self):
  237. transfer_manager = self.create_transfer_manager(self.config)
  238. filename = self.files.create_file_with_size(
  239. 'foo.txt', filesize=1024 * 1024
  240. )
  241. self.upload_file(filename, '1mb.txt')
  242. future = transfer_manager.download(
  243. self.bucket_name, '1mb.txt', '/dev/null'
  244. )
  245. try:
  246. future.result()
  247. except Exception as e:
  248. self.fail(
  249. 'Should have been able to download to /dev/null but received '
  250. 'following exception %s' % e
  251. )