test_download.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import glob
  15. import os
  16. import shutil
  17. import tempfile
  18. import time
  19. from io import BytesIO
  20. from botocore.exceptions import ClientError
  21. from s3transfer.compat import SOCKET_ERROR
  22. from s3transfer.exceptions import RetriesExceededError
  23. from s3transfer.manager import TransferConfig, TransferManager
  24. from tests import (
  25. BaseGeneralInterfaceTest,
  26. FileSizeProvider,
  27. NonSeekableWriter,
  28. RecordingOSUtils,
  29. RecordingSubscriber,
  30. StreamWithError,
  31. skip_if_using_serial_implementation,
  32. skip_if_windows,
  33. )
  34. class BaseDownloadTest(BaseGeneralInterfaceTest):
  35. def setUp(self):
  36. super().setUp()
  37. self.config = TransferConfig(max_request_concurrency=1)
  38. self._manager = TransferManager(self.client, self.config)
  39. # Create a temporary directory to write to
  40. self.tempdir = tempfile.mkdtemp()
  41. self.filename = os.path.join(self.tempdir, 'myfile')
  42. # Initialize some default arguments
  43. self.bucket = 'mybucket'
  44. self.key = 'mykey'
  45. self.extra_args = {}
  46. self.subscribers = []
  47. # Create a stream to read from
  48. self.content = b'my content'
  49. self.stream = BytesIO(self.content)
  50. def tearDown(self):
  51. super().tearDown()
  52. shutil.rmtree(self.tempdir)
  53. @property
  54. def manager(self):
  55. return self._manager
  56. @property
  57. def method(self):
  58. return self.manager.download
  59. def create_call_kwargs(self):
  60. return {
  61. 'bucket': self.bucket,
  62. 'key': self.key,
  63. 'fileobj': self.filename,
  64. }
  65. def create_invalid_extra_args(self):
  66. return {'Foo': 'bar'}
  67. def create_stubbed_responses(self):
  68. # We want to make sure the beginning of the stream is always used
  69. # in case this gets called twice.
  70. self.stream.seek(0)
  71. return [
  72. {
  73. 'method': 'head_object',
  74. 'service_response': {'ContentLength': len(self.content)},
  75. },
  76. {
  77. 'method': 'get_object',
  78. 'service_response': {'Body': self.stream},
  79. },
  80. ]
  81. def create_expected_progress_callback_info(self):
  82. # Note that last read is from the empty sentinel indicating
  83. # that the stream is done.
  84. return [{'bytes_transferred': 10}]
  85. def add_head_object_response(self, expected_params=None):
  86. head_response = self.create_stubbed_responses()[0]
  87. if expected_params:
  88. head_response['expected_params'] = expected_params
  89. self.stubber.add_response(**head_response)
  90. def add_successful_get_object_responses(
  91. self, expected_params=None, expected_ranges=None
  92. ):
  93. # Add all get_object responses needed to complete the download.
  94. # Should account for both ranged and nonranged downloads.
  95. for i, stubbed_response in enumerate(
  96. self.create_stubbed_responses()[1:]
  97. ):
  98. if expected_params:
  99. stubbed_response['expected_params'] = copy.deepcopy(
  100. expected_params
  101. )
  102. if expected_ranges:
  103. stubbed_response['expected_params'][
  104. 'Range'
  105. ] = expected_ranges[i]
  106. self.stubber.add_response(**stubbed_response)
  107. def add_n_retryable_get_object_responses(self, n, num_reads=0):
  108. for _ in range(n):
  109. self.stubber.add_response(
  110. method='get_object',
  111. service_response={
  112. 'Body': StreamWithError(
  113. copy.deepcopy(self.stream), SOCKET_ERROR, num_reads
  114. )
  115. },
  116. )
  117. def test_download_temporary_file_does_not_exist(self):
  118. self.add_head_object_response()
  119. self.add_successful_get_object_responses()
  120. future = self.manager.download(**self.create_call_kwargs())
  121. future.result()
  122. # Make sure the file exists
  123. self.assertTrue(os.path.exists(self.filename))
  124. # Make sure the random temporary file does not exist
  125. possible_matches = glob.glob('%s*' % self.filename + os.extsep)
  126. self.assertEqual(possible_matches, [])
  127. def test_download_for_fileobj(self):
  128. self.add_head_object_response()
  129. self.add_successful_get_object_responses()
  130. with open(self.filename, 'wb') as f:
  131. future = self.manager.download(
  132. self.bucket, self.key, f, self.extra_args
  133. )
  134. future.result()
  135. # Ensure that the contents are correct
  136. with open(self.filename, 'rb') as f:
  137. self.assertEqual(self.content, f.read())
  138. def test_download_for_seekable_filelike_obj(self):
  139. self.add_head_object_response()
  140. self.add_successful_get_object_responses()
  141. # Create a file-like object to test. In this case, it is a BytesIO
  142. # object.
  143. bytes_io = BytesIO()
  144. future = self.manager.download(
  145. self.bucket, self.key, bytes_io, self.extra_args
  146. )
  147. future.result()
  148. # Ensure that the contents are correct
  149. bytes_io.seek(0)
  150. self.assertEqual(self.content, bytes_io.read())
  151. def test_download_for_nonseekable_filelike_obj(self):
  152. self.add_head_object_response()
  153. self.add_successful_get_object_responses()
  154. with open(self.filename, 'wb') as f:
  155. future = self.manager.download(
  156. self.bucket, self.key, NonSeekableWriter(f), self.extra_args
  157. )
  158. future.result()
  159. # Ensure that the contents are correct
  160. with open(self.filename, 'rb') as f:
  161. self.assertEqual(self.content, f.read())
  162. def test_download_cleanup_on_failure(self):
  163. self.add_head_object_response()
  164. # Throw an error on the download
  165. self.stubber.add_client_error('get_object')
  166. future = self.manager.download(**self.create_call_kwargs())
  167. with self.assertRaises(ClientError):
  168. future.result()
  169. # Make sure the actual file and the temporary do not exist
  170. # by globbing for the file and any of its extensions
  171. possible_matches = glob.glob('%s*' % self.filename)
  172. self.assertEqual(possible_matches, [])
  173. def test_download_with_nonexistent_directory(self):
  174. self.add_head_object_response()
  175. self.add_successful_get_object_responses()
  176. call_kwargs = self.create_call_kwargs()
  177. call_kwargs['fileobj'] = os.path.join(
  178. self.tempdir, 'missing-directory', 'myfile'
  179. )
  180. future = self.manager.download(**call_kwargs)
  181. with self.assertRaises(IOError):
  182. future.result()
  183. def test_retries_and_succeeds(self):
  184. self.add_head_object_response()
  185. # Insert a response that will trigger a retry.
  186. self.add_n_retryable_get_object_responses(1)
  187. # Add the normal responses to simulate the download proceeding
  188. # as normal after the retry.
  189. self.add_successful_get_object_responses()
  190. future = self.manager.download(**self.create_call_kwargs())
  191. future.result()
  192. # The retry should have been consumed and the process should have
  193. # continued using the successful responses.
  194. self.stubber.assert_no_pending_responses()
  195. with open(self.filename, 'rb') as f:
  196. self.assertEqual(self.content, f.read())
  197. def test_retry_failure(self):
  198. self.add_head_object_response()
  199. max_retries = 3
  200. self.config.num_download_attempts = max_retries
  201. self._manager = TransferManager(self.client, self.config)
  202. # Add responses that fill up the maximum number of retries.
  203. self.add_n_retryable_get_object_responses(max_retries)
  204. future = self.manager.download(**self.create_call_kwargs())
  205. # A retry exceeded error should have happened.
  206. with self.assertRaises(RetriesExceededError):
  207. future.result()
  208. # All of the retries should have been used up.
  209. self.stubber.assert_no_pending_responses()
  210. def test_retry_rewinds_callbacks(self):
  211. self.add_head_object_response()
  212. # Insert a response that will trigger a retry after one read of the
  213. # stream has been made.
  214. self.add_n_retryable_get_object_responses(1, num_reads=1)
  215. # Add the normal responses to simulate the download proceeding
  216. # as normal after the retry.
  217. self.add_successful_get_object_responses()
  218. recorder_subscriber = RecordingSubscriber()
  219. # Set the streaming to a size that is smaller than the data we
  220. # currently provide to it to simulate rewinds of callbacks.
  221. self.config.io_chunksize = 3
  222. future = self.manager.download(
  223. subscribers=[recorder_subscriber], **self.create_call_kwargs()
  224. )
  225. future.result()
  226. # Ensure that there is no more remaining responses and that contents
  227. # are correct.
  228. self.stubber.assert_no_pending_responses()
  229. with open(self.filename, 'rb') as f:
  230. self.assertEqual(self.content, f.read())
  231. # Assert that the number of bytes seen is equal to the length of
  232. # downloaded content.
  233. self.assertEqual(
  234. recorder_subscriber.calculate_bytes_seen(), len(self.content)
  235. )
  236. # Also ensure that the second progress invocation was negative three
  237. # because a retry happened on the second read of the stream and we
  238. # know that the chunk size for each read is 3.
  239. progress_byte_amts = [
  240. call['bytes_transferred']
  241. for call in recorder_subscriber.on_progress_calls
  242. ]
  243. self.assertEqual(-3, progress_byte_amts[1])
  244. def test_can_provide_file_size(self):
  245. self.add_successful_get_object_responses()
  246. call_kwargs = self.create_call_kwargs()
  247. call_kwargs['subscribers'] = [FileSizeProvider(len(self.content))]
  248. future = self.manager.download(**call_kwargs)
  249. future.result()
  250. # The HeadObject should have not happened and should have been able
  251. # to successfully download the file.
  252. self.stubber.assert_no_pending_responses()
  253. with open(self.filename, 'rb') as f:
  254. self.assertEqual(self.content, f.read())
  255. def test_uses_provided_osutil(self):
  256. osutil = RecordingOSUtils()
  257. # Use the recording os utility for the transfer manager
  258. self._manager = TransferManager(self.client, self.config, osutil)
  259. self.add_head_object_response()
  260. self.add_successful_get_object_responses()
  261. future = self.manager.download(**self.create_call_kwargs())
  262. future.result()
  263. # The osutil should have had its open() method invoked when opening
  264. # a temporary file and its rename_file() method invoked when the
  265. # the temporary file was moved to its final location.
  266. self.assertEqual(len(osutil.open_records), 1)
  267. self.assertEqual(len(osutil.rename_records), 1)
  268. @skip_if_windows('Windows does not support UNIX special files')
  269. @skip_if_using_serial_implementation(
  270. 'A separate thread is needed to read from the fifo'
  271. )
  272. def test_download_for_fifo_file(self):
  273. self.add_head_object_response()
  274. self.add_successful_get_object_responses()
  275. # Create the fifo file
  276. os.mkfifo(self.filename)
  277. future = self.manager.download(
  278. self.bucket, self.key, self.filename, self.extra_args
  279. )
  280. # The call to open a fifo will block until there is both a reader
  281. # and a writer, so we need to open it for reading after we've
  282. # started the transfer.
  283. with open(self.filename, 'rb') as fifo:
  284. future.result()
  285. self.assertEqual(fifo.read(), self.content)
  286. def test_raise_exception_on_s3_object_lambda_resource(self):
  287. s3_object_lambda_arn = (
  288. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  289. 'accesspoint:my-accesspoint'
  290. )
  291. with self.assertRaisesRegex(ValueError, 'methods do not support'):
  292. self.manager.download(
  293. s3_object_lambda_arn, self.key, self.filename, self.extra_args
  294. )
  295. class TestNonRangedDownload(BaseDownloadTest):
  296. # TODO: If you want to add tests outside of this test class and still
  297. # subclass from BaseDownloadTest you need to set ``__test__ = True``. If
  298. # you do not, your tests will not get picked up by the test runner! This
  299. # needs to be done until we find a better way to ignore running test cases
  300. # from the general test base class, which we do not want ran.
  301. __test__ = True
  302. def test_download(self):
  303. self.extra_args['RequestPayer'] = 'requester'
  304. expected_params = {
  305. 'Bucket': self.bucket,
  306. 'Key': self.key,
  307. 'RequestPayer': 'requester',
  308. }
  309. self.add_head_object_response(expected_params)
  310. self.add_successful_get_object_responses(expected_params)
  311. future = self.manager.download(
  312. self.bucket, self.key, self.filename, self.extra_args
  313. )
  314. future.result()
  315. # Ensure that the contents are correct
  316. with open(self.filename, 'rb') as f:
  317. self.assertEqual(self.content, f.read())
  318. def test_download_with_checksum_enabled(self):
  319. self.extra_args['ChecksumMode'] = 'ENABLED'
  320. expected_params = {
  321. 'Bucket': self.bucket,
  322. 'Key': self.key,
  323. 'ChecksumMode': 'ENABLED',
  324. }
  325. self.add_head_object_response(expected_params)
  326. self.add_successful_get_object_responses(expected_params)
  327. future = self.manager.download(
  328. self.bucket, self.key, self.filename, self.extra_args
  329. )
  330. future.result()
  331. # Ensure that the contents are correct
  332. with open(self.filename, 'rb') as f:
  333. self.assertEqual(self.content, f.read())
  334. def test_allowed_copy_params_are_valid(self):
  335. op_model = self.client.meta.service_model.operation_model('GetObject')
  336. for allowed_upload_arg in self._manager.ALLOWED_DOWNLOAD_ARGS:
  337. self.assertIn(allowed_upload_arg, op_model.input_shape.members)
  338. def test_download_empty_object(self):
  339. self.content = b''
  340. self.stream = BytesIO(self.content)
  341. self.add_head_object_response()
  342. self.add_successful_get_object_responses()
  343. future = self.manager.download(
  344. self.bucket, self.key, self.filename, self.extra_args
  345. )
  346. future.result()
  347. # Ensure that the empty file exists
  348. with open(self.filename, 'rb') as f:
  349. self.assertEqual(b'', f.read())
  350. def test_uses_bandwidth_limiter(self):
  351. self.content = b'a' * 1024 * 1024
  352. self.stream = BytesIO(self.content)
  353. self.config = TransferConfig(
  354. max_request_concurrency=1, max_bandwidth=len(self.content) / 2
  355. )
  356. self._manager = TransferManager(self.client, self.config)
  357. self.add_head_object_response()
  358. self.add_successful_get_object_responses()
  359. start = time.time()
  360. future = self.manager.download(
  361. self.bucket, self.key, self.filename, self.extra_args
  362. )
  363. future.result()
  364. # This is just a smoke test to make sure that the limiter is
  365. # being used and not necessary its exactness. So we set the maximum
  366. # bandwidth to len(content)/2 per sec and make sure that it is
  367. # noticeably slower. Ideally it will take more than two seconds, but
  368. # given tracking at the beginning of transfers are not entirely
  369. # accurate setting at the initial start of a transfer, we give us
  370. # some flexibility by setting the expected time to half of the
  371. # theoretical time to take.
  372. self.assertGreaterEqual(time.time() - start, 1)
  373. # Ensure that the contents are correct
  374. with open(self.filename, 'rb') as f:
  375. self.assertEqual(self.content, f.read())
  376. class TestRangedDownload(BaseDownloadTest):
  377. # TODO: If you want to add tests outside of this test class and still
  378. # subclass from BaseDownloadTest you need to set ``__test__ = True``. If
  379. # you do not, your tests will not get picked up by the test runner! This
  380. # needs to be done until we find a better way to ignore running test cases
  381. # from the general test base class, which we do not want ran.
  382. __test__ = True
  383. def setUp(self):
  384. super().setUp()
  385. self.config = TransferConfig(
  386. max_request_concurrency=1,
  387. multipart_threshold=1,
  388. multipart_chunksize=4,
  389. )
  390. self._manager = TransferManager(self.client, self.config)
  391. def create_stubbed_responses(self):
  392. return [
  393. {
  394. 'method': 'head_object',
  395. 'service_response': {'ContentLength': len(self.content)},
  396. },
  397. {
  398. 'method': 'get_object',
  399. 'service_response': {'Body': BytesIO(self.content[0:4])},
  400. },
  401. {
  402. 'method': 'get_object',
  403. 'service_response': {'Body': BytesIO(self.content[4:8])},
  404. },
  405. {
  406. 'method': 'get_object',
  407. 'service_response': {'Body': BytesIO(self.content[8:])},
  408. },
  409. ]
  410. def create_expected_progress_callback_info(self):
  411. return [
  412. {'bytes_transferred': 4},
  413. {'bytes_transferred': 4},
  414. {'bytes_transferred': 2},
  415. ]
  416. def test_download(self):
  417. self.extra_args['RequestPayer'] = 'requester'
  418. expected_params = {
  419. 'Bucket': self.bucket,
  420. 'Key': self.key,
  421. 'RequestPayer': 'requester',
  422. }
  423. expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-']
  424. self.add_head_object_response(expected_params)
  425. self.add_successful_get_object_responses(
  426. expected_params, expected_ranges
  427. )
  428. future = self.manager.download(
  429. self.bucket, self.key, self.filename, self.extra_args
  430. )
  431. future.result()
  432. # Ensure that the contents are correct
  433. with open(self.filename, 'rb') as f:
  434. self.assertEqual(self.content, f.read())
  435. def test_download_with_checksum_enabled(self):
  436. self.extra_args['ChecksumMode'] = 'ENABLED'
  437. expected_params = {
  438. 'Bucket': self.bucket,
  439. 'Key': self.key,
  440. 'ChecksumMode': 'ENABLED',
  441. }
  442. expected_ranges = ['bytes=0-3', 'bytes=4-7', 'bytes=8-']
  443. self.add_head_object_response(expected_params)
  444. self.add_successful_get_object_responses(
  445. expected_params, expected_ranges
  446. )
  447. future = self.manager.download(
  448. self.bucket, self.key, self.filename, self.extra_args
  449. )
  450. future.result()
  451. # Ensure that the contents are correct
  452. with open(self.filename, 'rb') as f:
  453. self.assertEqual(self.content, f.read())