test_download.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import os
  15. import shutil
  16. import tempfile
  17. from io import BytesIO
  18. from s3transfer.bandwidth import BandwidthLimiter
  19. from s3transfer.compat import SOCKET_ERROR
  20. from s3transfer.download import (
  21. CompleteDownloadNOOPTask,
  22. DeferQueue,
  23. DownloadChunkIterator,
  24. DownloadFilenameOutputManager,
  25. DownloadNonSeekableOutputManager,
  26. DownloadSeekableOutputManager,
  27. DownloadSpecialFilenameOutputManager,
  28. DownloadSubmissionTask,
  29. GetObjectTask,
  30. ImmediatelyWriteIOGetObjectTask,
  31. IOCloseTask,
  32. IORenameFileTask,
  33. IOStreamingWriteTask,
  34. IOWriteTask,
  35. )
  36. from s3transfer.exceptions import RetriesExceededError
  37. from s3transfer.futures import IN_MEMORY_DOWNLOAD_TAG, BoundedExecutor
  38. from s3transfer.utils import CallArgs, OSUtils
  39. from tests import (
  40. BaseSubmissionTaskTest,
  41. BaseTaskTest,
  42. FileCreator,
  43. NonSeekableWriter,
  44. RecordingExecutor,
  45. StreamWithError,
  46. mock,
  47. unittest,
  48. )
  49. class DownloadException(Exception):
  50. pass
  51. class WriteCollector:
  52. """A utility to collect information about writes and seeks"""
  53. def __init__(self):
  54. self._pos = 0
  55. self.writes = []
  56. def seek(self, pos, whence=0):
  57. self._pos = pos
  58. def write(self, data):
  59. self.writes.append((self._pos, data))
  60. self._pos += len(data)
  61. class AlwaysIndicatesSpecialFileOSUtils(OSUtils):
  62. """OSUtil that always returns True for is_special_file"""
  63. def is_special_file(self, filename):
  64. return True
  65. class CancelledStreamWrapper:
  66. """A wrapper to trigger a cancellation while stream reading
  67. Forces the transfer coordinator to cancel after a certain amount of reads
  68. :param stream: The underlying stream to read from
  69. :param transfer_coordinator: The coordinator for the transfer
  70. :param num_reads: On which read to signal a cancellation. 0 is the first
  71. read.
  72. """
  73. def __init__(self, stream, transfer_coordinator, num_reads=0):
  74. self._stream = stream
  75. self._transfer_coordinator = transfer_coordinator
  76. self._num_reads = num_reads
  77. self._count = 0
  78. def read(self, *args, **kwargs):
  79. if self._num_reads == self._count:
  80. self._transfer_coordinator.cancel()
  81. self._stream.read(*args, **kwargs)
  82. self._count += 1
  83. class BaseDownloadOutputManagerTest(BaseTaskTest):
  84. def setUp(self):
  85. super().setUp()
  86. self.osutil = OSUtils()
  87. # Create a file to write to
  88. self.tempdir = tempfile.mkdtemp()
  89. self.filename = os.path.join(self.tempdir, 'myfile')
  90. self.call_args = CallArgs(fileobj=self.filename)
  91. self.future = self.get_transfer_future(self.call_args)
  92. self.io_executor = BoundedExecutor(1000, 1)
  93. def tearDown(self):
  94. super().tearDown()
  95. shutil.rmtree(self.tempdir)
  96. class TestDownloadFilenameOutputManager(BaseDownloadOutputManagerTest):
  97. def setUp(self):
  98. super().setUp()
  99. self.download_output_manager = DownloadFilenameOutputManager(
  100. self.osutil,
  101. self.transfer_coordinator,
  102. io_executor=self.io_executor,
  103. )
  104. def test_is_compatible(self):
  105. self.assertTrue(
  106. self.download_output_manager.is_compatible(
  107. self.filename, self.osutil
  108. )
  109. )
  110. def test_get_download_task_tag(self):
  111. self.assertIsNone(self.download_output_manager.get_download_task_tag())
  112. def test_get_fileobj_for_io_writes(self):
  113. with self.download_output_manager.get_fileobj_for_io_writes(
  114. self.future
  115. ) as f:
  116. # Ensure it is a file like object returned
  117. self.assertTrue(hasattr(f, 'read'))
  118. self.assertTrue(hasattr(f, 'seek'))
  119. # Make sure the name of the file returned is not the same as the
  120. # final filename as we should be writing to a temporary file.
  121. self.assertNotEqual(f.name, self.filename)
  122. def test_get_final_io_task(self):
  123. ref_contents = b'my_contents'
  124. with self.download_output_manager.get_fileobj_for_io_writes(
  125. self.future
  126. ) as f:
  127. temp_filename = f.name
  128. # Write some data to test that the data gets moved over to the
  129. # final location.
  130. f.write(ref_contents)
  131. final_task = self.download_output_manager.get_final_io_task()
  132. # Make sure it is the appropriate task.
  133. self.assertIsInstance(final_task, IORenameFileTask)
  134. final_task()
  135. # Make sure the temp_file gets removed
  136. self.assertFalse(os.path.exists(temp_filename))
  137. # Make sure what ever was written to the temp file got moved to
  138. # the final filename
  139. with open(self.filename, 'rb') as f:
  140. self.assertEqual(f.read(), ref_contents)
  141. def test_can_queue_file_io_task(self):
  142. fileobj = WriteCollector()
  143. self.download_output_manager.queue_file_io_task(
  144. fileobj=fileobj, data='foo', offset=0
  145. )
  146. self.download_output_manager.queue_file_io_task(
  147. fileobj=fileobj, data='bar', offset=3
  148. )
  149. self.io_executor.shutdown()
  150. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  151. def test_get_file_io_write_task(self):
  152. fileobj = WriteCollector()
  153. io_write_task = self.download_output_manager.get_io_write_task(
  154. fileobj=fileobj, data='foo', offset=3
  155. )
  156. self.assertIsInstance(io_write_task, IOWriteTask)
  157. io_write_task()
  158. self.assertEqual(fileobj.writes, [(3, 'foo')])
  159. class TestDownloadSpecialFilenameOutputManager(BaseDownloadOutputManagerTest):
  160. def setUp(self):
  161. super().setUp()
  162. self.osutil = AlwaysIndicatesSpecialFileOSUtils()
  163. self.download_output_manager = DownloadSpecialFilenameOutputManager(
  164. self.osutil,
  165. self.transfer_coordinator,
  166. io_executor=self.io_executor,
  167. )
  168. def test_is_compatible_for_special_file(self):
  169. self.assertTrue(
  170. self.download_output_manager.is_compatible(
  171. self.filename, AlwaysIndicatesSpecialFileOSUtils()
  172. )
  173. )
  174. def test_is_not_compatible_for_non_special_file(self):
  175. self.assertFalse(
  176. self.download_output_manager.is_compatible(
  177. self.filename, OSUtils()
  178. )
  179. )
  180. def test_get_fileobj_for_io_writes(self):
  181. with self.download_output_manager.get_fileobj_for_io_writes(
  182. self.future
  183. ) as f:
  184. # Ensure it is a file like object returned
  185. self.assertTrue(hasattr(f, 'read'))
  186. # Make sure the name of the file returned is the same as the
  187. # final filename as we should not be writing to a temporary file.
  188. self.assertEqual(f.name, self.filename)
  189. def test_get_final_io_task(self):
  190. self.assertIsInstance(
  191. self.download_output_manager.get_final_io_task(), IOCloseTask
  192. )
  193. def test_can_queue_file_io_task(self):
  194. fileobj = WriteCollector()
  195. self.download_output_manager.queue_file_io_task(
  196. fileobj=fileobj, data='foo', offset=0
  197. )
  198. self.download_output_manager.queue_file_io_task(
  199. fileobj=fileobj, data='bar', offset=3
  200. )
  201. self.io_executor.shutdown()
  202. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  203. class TestDownloadSeekableOutputManager(BaseDownloadOutputManagerTest):
  204. def setUp(self):
  205. super().setUp()
  206. self.download_output_manager = DownloadSeekableOutputManager(
  207. self.osutil,
  208. self.transfer_coordinator,
  209. io_executor=self.io_executor,
  210. )
  211. # Create a fileobj to write to
  212. self.fileobj = open(self.filename, 'wb')
  213. self.call_args = CallArgs(fileobj=self.fileobj)
  214. self.future = self.get_transfer_future(self.call_args)
  215. def tearDown(self):
  216. self.fileobj.close()
  217. super().tearDown()
  218. def test_is_compatible(self):
  219. self.assertTrue(
  220. self.download_output_manager.is_compatible(
  221. self.fileobj, self.osutil
  222. )
  223. )
  224. def test_is_compatible_bytes_io(self):
  225. self.assertTrue(
  226. self.download_output_manager.is_compatible(BytesIO(), self.osutil)
  227. )
  228. def test_not_compatible_for_non_filelike_obj(self):
  229. self.assertFalse(
  230. self.download_output_manager.is_compatible(object(), self.osutil)
  231. )
  232. def test_get_download_task_tag(self):
  233. self.assertIsNone(self.download_output_manager.get_download_task_tag())
  234. def test_get_fileobj_for_io_writes(self):
  235. self.assertIs(
  236. self.download_output_manager.get_fileobj_for_io_writes(
  237. self.future
  238. ),
  239. self.fileobj,
  240. )
  241. def test_get_final_io_task(self):
  242. self.assertIsInstance(
  243. self.download_output_manager.get_final_io_task(),
  244. CompleteDownloadNOOPTask,
  245. )
  246. def test_can_queue_file_io_task(self):
  247. fileobj = WriteCollector()
  248. self.download_output_manager.queue_file_io_task(
  249. fileobj=fileobj, data='foo', offset=0
  250. )
  251. self.download_output_manager.queue_file_io_task(
  252. fileobj=fileobj, data='bar', offset=3
  253. )
  254. self.io_executor.shutdown()
  255. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  256. def test_get_file_io_write_task(self):
  257. fileobj = WriteCollector()
  258. io_write_task = self.download_output_manager.get_io_write_task(
  259. fileobj=fileobj, data='foo', offset=3
  260. )
  261. self.assertIsInstance(io_write_task, IOWriteTask)
  262. io_write_task()
  263. self.assertEqual(fileobj.writes, [(3, 'foo')])
  264. class TestDownloadNonSeekableOutputManager(BaseDownloadOutputManagerTest):
  265. def setUp(self):
  266. super().setUp()
  267. self.download_output_manager = DownloadNonSeekableOutputManager(
  268. self.osutil, self.transfer_coordinator, io_executor=None
  269. )
  270. def test_is_compatible_with_seekable_stream(self):
  271. with open(self.filename, 'wb') as f:
  272. self.assertTrue(
  273. self.download_output_manager.is_compatible(f, self.osutil)
  274. )
  275. def test_not_compatible_with_filename(self):
  276. self.assertFalse(
  277. self.download_output_manager.is_compatible(
  278. self.filename, self.osutil
  279. )
  280. )
  281. def test_compatible_with_non_seekable_stream(self):
  282. class NonSeekable:
  283. def write(self, data):
  284. pass
  285. f = NonSeekable()
  286. self.assertTrue(
  287. self.download_output_manager.is_compatible(f, self.osutil)
  288. )
  289. def test_is_compatible_with_bytesio(self):
  290. self.assertTrue(
  291. self.download_output_manager.is_compatible(BytesIO(), self.osutil)
  292. )
  293. def test_get_download_task_tag(self):
  294. self.assertIs(
  295. self.download_output_manager.get_download_task_tag(),
  296. IN_MEMORY_DOWNLOAD_TAG,
  297. )
  298. def test_submit_writes_from_internal_queue(self):
  299. class FakeQueue:
  300. def request_writes(self, offset, data):
  301. return [
  302. {'offset': 0, 'data': 'foo'},
  303. {'offset': 3, 'data': 'bar'},
  304. ]
  305. q = FakeQueue()
  306. io_executor = BoundedExecutor(1000, 1)
  307. manager = DownloadNonSeekableOutputManager(
  308. self.osutil,
  309. self.transfer_coordinator,
  310. io_executor=io_executor,
  311. defer_queue=q,
  312. )
  313. fileobj = WriteCollector()
  314. manager.queue_file_io_task(fileobj=fileobj, data='foo', offset=1)
  315. io_executor.shutdown()
  316. self.assertEqual(fileobj.writes, [(0, 'foo'), (3, 'bar')])
  317. def test_get_file_io_write_task(self):
  318. fileobj = WriteCollector()
  319. io_write_task = self.download_output_manager.get_io_write_task(
  320. fileobj=fileobj, data='foo', offset=1
  321. )
  322. self.assertIsInstance(io_write_task, IOStreamingWriteTask)
  323. io_write_task()
  324. self.assertEqual(fileobj.writes, [(0, 'foo')])
  325. class TestDownloadSubmissionTask(BaseSubmissionTaskTest):
  326. def setUp(self):
  327. super().setUp()
  328. self.tempdir = tempfile.mkdtemp()
  329. self.filename = os.path.join(self.tempdir, 'myfile')
  330. self.bucket = 'mybucket'
  331. self.key = 'mykey'
  332. self.extra_args = {}
  333. self.subscribers = []
  334. # Create a stream to read from
  335. self.content = b'my content'
  336. self.stream = BytesIO(self.content)
  337. # A list to keep track of all of the bodies sent over the wire
  338. # and their order.
  339. self.call_args = self.get_call_args()
  340. self.transfer_future = self.get_transfer_future(self.call_args)
  341. self.io_executor = BoundedExecutor(1000, 1)
  342. self.submission_main_kwargs = {
  343. 'client': self.client,
  344. 'config': self.config,
  345. 'osutil': self.osutil,
  346. 'request_executor': self.executor,
  347. 'io_executor': self.io_executor,
  348. 'transfer_future': self.transfer_future,
  349. }
  350. self.submission_task = self.get_download_submission_task()
  351. def tearDown(self):
  352. super().tearDown()
  353. shutil.rmtree(self.tempdir)
  354. def get_call_args(self, **kwargs):
  355. default_call_args = {
  356. 'fileobj': self.filename,
  357. 'bucket': self.bucket,
  358. 'key': self.key,
  359. 'extra_args': self.extra_args,
  360. 'subscribers': self.subscribers,
  361. }
  362. default_call_args.update(kwargs)
  363. return CallArgs(**default_call_args)
  364. def wrap_executor_in_recorder(self):
  365. self.executor = RecordingExecutor(self.executor)
  366. self.submission_main_kwargs['request_executor'] = self.executor
  367. def use_fileobj_in_call_args(self, fileobj):
  368. self.call_args = self.get_call_args(fileobj=fileobj)
  369. self.transfer_future = self.get_transfer_future(self.call_args)
  370. self.submission_main_kwargs['transfer_future'] = self.transfer_future
  371. def assert_tag_for_get_object(self, tag_value):
  372. submissions_to_compare = self.executor.submissions
  373. if len(submissions_to_compare) > 1:
  374. # If it was ranged get, make sure we do not include the join task.
  375. submissions_to_compare = submissions_to_compare[:-1]
  376. for submission in submissions_to_compare:
  377. self.assertEqual(submission['tag'], tag_value)
  378. def add_head_object_response(self):
  379. self.stubber.add_response(
  380. 'head_object', {'ContentLength': len(self.content)}
  381. )
  382. def add_get_responses(self):
  383. chunksize = self.config.multipart_chunksize
  384. for i in range(0, len(self.content), chunksize):
  385. if i + chunksize > len(self.content):
  386. stream = BytesIO(self.content[i:])
  387. self.stubber.add_response('get_object', {'Body': stream})
  388. else:
  389. stream = BytesIO(self.content[i : i + chunksize])
  390. self.stubber.add_response('get_object', {'Body': stream})
  391. def configure_for_ranged_get(self):
  392. self.config.multipart_threshold = 1
  393. self.config.multipart_chunksize = 4
  394. def get_download_submission_task(self):
  395. return self.get_task(
  396. DownloadSubmissionTask, main_kwargs=self.submission_main_kwargs
  397. )
  398. def wait_and_assert_completed_successfully(self, submission_task):
  399. submission_task()
  400. self.transfer_future.result()
  401. self.stubber.assert_no_pending_responses()
  402. def test_submits_no_tag_for_get_object_filename(self):
  403. self.wrap_executor_in_recorder()
  404. self.add_head_object_response()
  405. self.add_get_responses()
  406. self.submission_task = self.get_download_submission_task()
  407. self.wait_and_assert_completed_successfully(self.submission_task)
  408. # Make sure no tag to limit that task specifically was not associated
  409. # to that task submission.
  410. self.assert_tag_for_get_object(None)
  411. def test_submits_no_tag_for_ranged_get_filename(self):
  412. self.wrap_executor_in_recorder()
  413. self.configure_for_ranged_get()
  414. self.add_head_object_response()
  415. self.add_get_responses()
  416. self.submission_task = self.get_download_submission_task()
  417. self.wait_and_assert_completed_successfully(self.submission_task)
  418. # Make sure no tag to limit that task specifically was not associated
  419. # to that task submission.
  420. self.assert_tag_for_get_object(None)
  421. def test_submits_no_tag_for_get_object_fileobj(self):
  422. self.wrap_executor_in_recorder()
  423. self.add_head_object_response()
  424. self.add_get_responses()
  425. with open(self.filename, 'wb') as f:
  426. self.use_fileobj_in_call_args(f)
  427. self.submission_task = self.get_download_submission_task()
  428. self.wait_and_assert_completed_successfully(self.submission_task)
  429. # Make sure no tag to limit that task specifically was not associated
  430. # to that task submission.
  431. self.assert_tag_for_get_object(None)
  432. def test_submits_no_tag_for_ranged_get_object_fileobj(self):
  433. self.wrap_executor_in_recorder()
  434. self.configure_for_ranged_get()
  435. self.add_head_object_response()
  436. self.add_get_responses()
  437. with open(self.filename, 'wb') as f:
  438. self.use_fileobj_in_call_args(f)
  439. self.submission_task = self.get_download_submission_task()
  440. self.wait_and_assert_completed_successfully(self.submission_task)
  441. # Make sure no tag to limit that task specifically was not associated
  442. # to that task submission.
  443. self.assert_tag_for_get_object(None)
  444. def tests_submits_tag_for_get_object_nonseekable_fileobj(self):
  445. self.wrap_executor_in_recorder()
  446. self.add_head_object_response()
  447. self.add_get_responses()
  448. with open(self.filename, 'wb') as f:
  449. self.use_fileobj_in_call_args(NonSeekableWriter(f))
  450. self.submission_task = self.get_download_submission_task()
  451. self.wait_and_assert_completed_successfully(self.submission_task)
  452. # Make sure no tag to limit that task specifically was not associated
  453. # to that task submission.
  454. self.assert_tag_for_get_object(IN_MEMORY_DOWNLOAD_TAG)
  455. def tests_submits_tag_for_ranged_get_object_nonseekable_fileobj(self):
  456. self.wrap_executor_in_recorder()
  457. self.configure_for_ranged_get()
  458. self.add_head_object_response()
  459. self.add_get_responses()
  460. with open(self.filename, 'wb') as f:
  461. self.use_fileobj_in_call_args(NonSeekableWriter(f))
  462. self.submission_task = self.get_download_submission_task()
  463. self.wait_and_assert_completed_successfully(self.submission_task)
  464. # Make sure no tag to limit that task specifically was not associated
  465. # to that task submission.
  466. self.assert_tag_for_get_object(IN_MEMORY_DOWNLOAD_TAG)
  467. class TestGetObjectTask(BaseTaskTest):
  468. def setUp(self):
  469. super().setUp()
  470. self.bucket = 'mybucket'
  471. self.key = 'mykey'
  472. self.extra_args = {}
  473. self.callbacks = []
  474. self.max_attempts = 5
  475. self.io_executor = BoundedExecutor(1000, 1)
  476. self.content = b'my content'
  477. self.stream = BytesIO(self.content)
  478. self.fileobj = WriteCollector()
  479. self.osutil = OSUtils()
  480. self.io_chunksize = 64 * (1024**2)
  481. self.task_cls = GetObjectTask
  482. self.download_output_manager = DownloadSeekableOutputManager(
  483. self.osutil, self.transfer_coordinator, self.io_executor
  484. )
  485. def get_download_task(self, **kwargs):
  486. default_kwargs = {
  487. 'client': self.client,
  488. 'bucket': self.bucket,
  489. 'key': self.key,
  490. 'fileobj': self.fileobj,
  491. 'extra_args': self.extra_args,
  492. 'callbacks': self.callbacks,
  493. 'max_attempts': self.max_attempts,
  494. 'download_output_manager': self.download_output_manager,
  495. 'io_chunksize': self.io_chunksize,
  496. }
  497. default_kwargs.update(kwargs)
  498. self.transfer_coordinator.set_status_to_queued()
  499. return self.get_task(self.task_cls, main_kwargs=default_kwargs)
  500. def assert_io_writes(self, expected_writes):
  501. # Let the io executor process all of the writes before checking
  502. # what writes were sent to it.
  503. self.io_executor.shutdown()
  504. self.assertEqual(self.fileobj.writes, expected_writes)
  505. def test_main(self):
  506. self.stubber.add_response(
  507. 'get_object',
  508. service_response={'Body': self.stream},
  509. expected_params={'Bucket': self.bucket, 'Key': self.key},
  510. )
  511. task = self.get_download_task()
  512. task()
  513. self.stubber.assert_no_pending_responses()
  514. self.assert_io_writes([(0, self.content)])
  515. def test_extra_args(self):
  516. self.stubber.add_response(
  517. 'get_object',
  518. service_response={'Body': self.stream},
  519. expected_params={
  520. 'Bucket': self.bucket,
  521. 'Key': self.key,
  522. 'Range': 'bytes=0-',
  523. },
  524. )
  525. self.extra_args['Range'] = 'bytes=0-'
  526. task = self.get_download_task()
  527. task()
  528. self.stubber.assert_no_pending_responses()
  529. self.assert_io_writes([(0, self.content)])
  530. def test_control_chunk_size(self):
  531. self.stubber.add_response(
  532. 'get_object',
  533. service_response={'Body': self.stream},
  534. expected_params={'Bucket': self.bucket, 'Key': self.key},
  535. )
  536. task = self.get_download_task(io_chunksize=1)
  537. task()
  538. self.stubber.assert_no_pending_responses()
  539. expected_contents = []
  540. for i in range(len(self.content)):
  541. expected_contents.append((i, bytes(self.content[i : i + 1])))
  542. self.assert_io_writes(expected_contents)
  543. def test_start_index(self):
  544. self.stubber.add_response(
  545. 'get_object',
  546. service_response={'Body': self.stream},
  547. expected_params={'Bucket': self.bucket, 'Key': self.key},
  548. )
  549. task = self.get_download_task(start_index=5)
  550. task()
  551. self.stubber.assert_no_pending_responses()
  552. self.assert_io_writes([(5, self.content)])
  553. def test_uses_bandwidth_limiter(self):
  554. bandwidth_limiter = mock.Mock(BandwidthLimiter)
  555. self.stubber.add_response(
  556. 'get_object',
  557. service_response={'Body': self.stream},
  558. expected_params={'Bucket': self.bucket, 'Key': self.key},
  559. )
  560. task = self.get_download_task(bandwidth_limiter=bandwidth_limiter)
  561. task()
  562. self.stubber.assert_no_pending_responses()
  563. self.assertEqual(
  564. bandwidth_limiter.get_bandwith_limited_stream.call_args_list,
  565. [mock.call(mock.ANY, self.transfer_coordinator)],
  566. )
  567. def test_retries_succeeds(self):
  568. self.stubber.add_response(
  569. 'get_object',
  570. service_response={
  571. 'Body': StreamWithError(self.stream, SOCKET_ERROR)
  572. },
  573. expected_params={'Bucket': self.bucket, 'Key': self.key},
  574. )
  575. self.stubber.add_response(
  576. 'get_object',
  577. service_response={'Body': self.stream},
  578. expected_params={'Bucket': self.bucket, 'Key': self.key},
  579. )
  580. task = self.get_download_task()
  581. task()
  582. # Retryable error should have not affected the bytes placed into
  583. # the io queue.
  584. self.stubber.assert_no_pending_responses()
  585. self.assert_io_writes([(0, self.content)])
  586. def test_retries_failure(self):
  587. for _ in range(self.max_attempts):
  588. self.stubber.add_response(
  589. 'get_object',
  590. service_response={
  591. 'Body': StreamWithError(self.stream, SOCKET_ERROR)
  592. },
  593. expected_params={'Bucket': self.bucket, 'Key': self.key},
  594. )
  595. task = self.get_download_task()
  596. task()
  597. self.transfer_coordinator.announce_done()
  598. # Should have failed out on a RetriesExceededError
  599. with self.assertRaises(RetriesExceededError):
  600. self.transfer_coordinator.result()
  601. self.stubber.assert_no_pending_responses()
  602. def test_retries_in_middle_of_streaming(self):
  603. # After the first read a retryable error will be thrown
  604. self.stubber.add_response(
  605. 'get_object',
  606. service_response={
  607. 'Body': StreamWithError(
  608. copy.deepcopy(self.stream), SOCKET_ERROR, 1
  609. )
  610. },
  611. expected_params={'Bucket': self.bucket, 'Key': self.key},
  612. )
  613. self.stubber.add_response(
  614. 'get_object',
  615. service_response={'Body': self.stream},
  616. expected_params={'Bucket': self.bucket, 'Key': self.key},
  617. )
  618. task = self.get_download_task(io_chunksize=1)
  619. task()
  620. self.stubber.assert_no_pending_responses()
  621. expected_contents = []
  622. # This is the content initially read in before the retry hit on the
  623. # second read()
  624. expected_contents.append((0, bytes(self.content[0:1])))
  625. # The rest of the content should be the entire set of data partitioned
  626. # out based on the one byte stream chunk size. Note the second
  627. # element in the list should be a copy of the first element since
  628. # a retryable exception happened in between.
  629. for i in range(len(self.content)):
  630. expected_contents.append((i, bytes(self.content[i : i + 1])))
  631. self.assert_io_writes(expected_contents)
  632. def test_cancels_out_of_queueing(self):
  633. self.stubber.add_response(
  634. 'get_object',
  635. service_response={
  636. 'Body': CancelledStreamWrapper(
  637. self.stream, self.transfer_coordinator
  638. )
  639. },
  640. expected_params={'Bucket': self.bucket, 'Key': self.key},
  641. )
  642. task = self.get_download_task()
  643. task()
  644. self.stubber.assert_no_pending_responses()
  645. # Make sure that no contents were added to the queue because the task
  646. # should have been canceled before trying to add the contents to the
  647. # io queue.
  648. self.assert_io_writes([])
  649. def test_handles_callback_on_initial_error(self):
  650. # We can't use the stubber for this because we need to raise
  651. # a S3_RETRYABLE_DOWNLOAD_ERRORS, and the stubber only allows
  652. # you to raise a ClientError.
  653. self.client.get_object = mock.Mock(side_effect=SOCKET_ERROR())
  654. task = self.get_download_task()
  655. task()
  656. self.transfer_coordinator.announce_done()
  657. # Should have failed out on a RetriesExceededError because
  658. # get_object keeps raising a socket error.
  659. with self.assertRaises(RetriesExceededError):
  660. self.transfer_coordinator.result()
  661. class TestImmediatelyWriteIOGetObjectTask(TestGetObjectTask):
  662. def setUp(self):
  663. super().setUp()
  664. self.task_cls = ImmediatelyWriteIOGetObjectTask
  665. # When data is written out, it should not use the io executor at all
  666. # if it does use the io executor that is a deviation from expected
  667. # behavior as the data should be written immediately to the file
  668. # object once downloaded.
  669. self.io_executor = None
  670. self.download_output_manager = DownloadSeekableOutputManager(
  671. self.osutil, self.transfer_coordinator, self.io_executor
  672. )
  673. def assert_io_writes(self, expected_writes):
  674. self.assertEqual(self.fileobj.writes, expected_writes)
  675. class BaseIOTaskTest(BaseTaskTest):
  676. def setUp(self):
  677. super().setUp()
  678. self.files = FileCreator()
  679. self.osutil = OSUtils()
  680. self.temp_filename = os.path.join(self.files.rootdir, 'mytempfile')
  681. self.final_filename = os.path.join(self.files.rootdir, 'myfile')
  682. def tearDown(self):
  683. super().tearDown()
  684. self.files.remove_all()
  685. class TestIOStreamingWriteTask(BaseIOTaskTest):
  686. def test_main(self):
  687. with open(self.temp_filename, 'wb') as f:
  688. task = self.get_task(
  689. IOStreamingWriteTask,
  690. main_kwargs={'fileobj': f, 'data': b'foobar'},
  691. )
  692. task()
  693. task2 = self.get_task(
  694. IOStreamingWriteTask,
  695. main_kwargs={'fileobj': f, 'data': b'baz'},
  696. )
  697. task2()
  698. with open(self.temp_filename, 'rb') as f:
  699. # We should just have written to the file in the order
  700. # the tasks were executed.
  701. self.assertEqual(f.read(), b'foobarbaz')
  702. class TestIOWriteTask(BaseIOTaskTest):
  703. def test_main(self):
  704. with open(self.temp_filename, 'wb') as f:
  705. # Write once to the file
  706. task = self.get_task(
  707. IOWriteTask,
  708. main_kwargs={'fileobj': f, 'data': b'foo', 'offset': 0},
  709. )
  710. task()
  711. # Write again to the file
  712. task = self.get_task(
  713. IOWriteTask,
  714. main_kwargs={'fileobj': f, 'data': b'bar', 'offset': 3},
  715. )
  716. task()
  717. with open(self.temp_filename, 'rb') as f:
  718. self.assertEqual(f.read(), b'foobar')
  719. class TestIORenameFileTask(BaseIOTaskTest):
  720. def test_main(self):
  721. with open(self.temp_filename, 'wb') as f:
  722. task = self.get_task(
  723. IORenameFileTask,
  724. main_kwargs={
  725. 'fileobj': f,
  726. 'final_filename': self.final_filename,
  727. 'osutil': self.osutil,
  728. },
  729. )
  730. task()
  731. self.assertTrue(os.path.exists(self.final_filename))
  732. self.assertFalse(os.path.exists(self.temp_filename))
  733. class TestIOCloseTask(BaseIOTaskTest):
  734. def test_main(self):
  735. with open(self.temp_filename, 'w') as f:
  736. task = self.get_task(IOCloseTask, main_kwargs={'fileobj': f})
  737. task()
  738. self.assertTrue(f.closed)
  739. class TestDownloadChunkIterator(unittest.TestCase):
  740. def test_iter(self):
  741. content = b'my content'
  742. body = BytesIO(content)
  743. ref_chunks = []
  744. for chunk in DownloadChunkIterator(body, len(content)):
  745. ref_chunks.append(chunk)
  746. self.assertEqual(ref_chunks, [b'my content'])
  747. def test_iter_chunksize(self):
  748. content = b'1234'
  749. body = BytesIO(content)
  750. ref_chunks = []
  751. for chunk in DownloadChunkIterator(body, 3):
  752. ref_chunks.append(chunk)
  753. self.assertEqual(ref_chunks, [b'123', b'4'])
  754. def test_empty_content(self):
  755. body = BytesIO(b'')
  756. ref_chunks = []
  757. for chunk in DownloadChunkIterator(body, 3):
  758. ref_chunks.append(chunk)
  759. self.assertEqual(ref_chunks, [b''])
  760. class TestDeferQueue(unittest.TestCase):
  761. def setUp(self):
  762. self.q = DeferQueue()
  763. def test_no_writes_when_not_lowest_block(self):
  764. writes = self.q.request_writes(offset=1, data='bar')
  765. self.assertEqual(writes, [])
  766. def test_writes_returned_in_order(self):
  767. self.assertEqual(self.q.request_writes(offset=3, data='d'), [])
  768. self.assertEqual(self.q.request_writes(offset=2, data='c'), [])
  769. self.assertEqual(self.q.request_writes(offset=1, data='b'), [])
  770. # Everything at this point has been deferred, but as soon as we
  771. # send offset=0, that will unlock offsets 0-3.
  772. writes = self.q.request_writes(offset=0, data='a')
  773. self.assertEqual(
  774. writes,
  775. [
  776. {'offset': 0, 'data': 'a'},
  777. {'offset': 1, 'data': 'b'},
  778. {'offset': 2, 'data': 'c'},
  779. {'offset': 3, 'data': 'd'},
  780. ],
  781. )
  782. def test_unlocks_partial_range(self):
  783. self.assertEqual(self.q.request_writes(offset=5, data='f'), [])
  784. self.assertEqual(self.q.request_writes(offset=1, data='b'), [])
  785. # offset=0 unlocks 0-1, but offset=5 still needs to see 2-4 first.
  786. writes = self.q.request_writes(offset=0, data='a')
  787. self.assertEqual(
  788. writes,
  789. [
  790. {'offset': 0, 'data': 'a'},
  791. {'offset': 1, 'data': 'b'},
  792. ],
  793. )
  794. def test_data_can_be_any_size(self):
  795. self.q.request_writes(offset=5, data='hello world')
  796. writes = self.q.request_writes(offset=0, data='abcde')
  797. self.assertEqual(
  798. writes,
  799. [
  800. {'offset': 0, 'data': 'abcde'},
  801. {'offset': 5, 'data': 'hello world'},
  802. ],
  803. )
  804. def test_data_queued_in_order(self):
  805. # This immediately gets returned because offset=0 is the
  806. # next range we're waiting on.
  807. writes = self.q.request_writes(offset=0, data='hello world')
  808. self.assertEqual(writes, [{'offset': 0, 'data': 'hello world'}])
  809. # Same thing here but with offset
  810. writes = self.q.request_writes(offset=11, data='hello again')
  811. self.assertEqual(writes, [{'offset': 11, 'data': 'hello again'}])
  812. def test_writes_below_min_offset_are_ignored(self):
  813. self.q.request_writes(offset=0, data='a')
  814. self.q.request_writes(offset=1, data='b')
  815. self.q.request_writes(offset=2, data='c')
  816. # At this point we're expecting offset=3, so if a write
  817. # comes in below 3, we ignore it.
  818. self.assertEqual(self.q.request_writes(offset=0, data='a'), [])
  819. self.assertEqual(self.q.request_writes(offset=1, data='b'), [])
  820. self.assertEqual(
  821. self.q.request_writes(offset=3, data='d'),
  822. [{'offset': 3, 'data': 'd'}],
  823. )
  824. def test_duplicate_writes_are_ignored(self):
  825. self.q.request_writes(offset=2, data='c')
  826. self.q.request_writes(offset=1, data='b')
  827. # We're still waiting for offset=0, but if
  828. # a duplicate write comes in for offset=2/offset=1
  829. # it's ignored. This gives "first one wins" behavior.
  830. self.assertEqual(self.q.request_writes(offset=2, data='X'), [])
  831. self.assertEqual(self.q.request_writes(offset=1, data='Y'), [])
  832. self.assertEqual(
  833. self.q.request_writes(offset=0, data='a'),
  834. [
  835. {'offset': 0, 'data': 'a'},
  836. # Note we're seeing 'b' 'c', and not 'X', 'Y'.
  837. {'offset': 1, 'data': 'b'},
  838. {'offset': 2, 'data': 'c'},
  839. ],
  840. )