test_upload.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License'). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the 'license' file accompanying this file. This file is
  10. # distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import math
  14. import os
  15. import shutil
  16. import tempfile
  17. from io import BytesIO
  18. from botocore.stub import ANY
  19. from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
  20. from s3transfer.manager import TransferConfig
  21. from s3transfer.upload import (
  22. AggregatedProgressCallback,
  23. InterruptReader,
  24. PutObjectTask,
  25. UploadFilenameInputManager,
  26. UploadNonSeekableInputManager,
  27. UploadPartTask,
  28. UploadSeekableInputManager,
  29. UploadSubmissionTask,
  30. )
  31. from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE, CallArgs, OSUtils
  32. from tests import (
  33. BaseSubmissionTaskTest,
  34. BaseTaskTest,
  35. FileSizeProvider,
  36. NonSeekableReader,
  37. RecordingExecutor,
  38. RecordingSubscriber,
  39. unittest,
  40. )
  41. class InterruptionError(Exception):
  42. pass
  43. class OSUtilsExceptionOnFileSize(OSUtils):
  44. def get_file_size(self, filename):
  45. raise AssertionError(
  46. "The file %s should not have been stated" % filename
  47. )
  48. class BaseUploadTest(BaseTaskTest):
  49. def setUp(self):
  50. super().setUp()
  51. self.bucket = 'mybucket'
  52. self.key = 'foo'
  53. self.osutil = OSUtils()
  54. self.tempdir = tempfile.mkdtemp()
  55. self.filename = os.path.join(self.tempdir, 'myfile')
  56. self.content = b'my content'
  57. self.subscribers = []
  58. with open(self.filename, 'wb') as f:
  59. f.write(self.content)
  60. # A list to keep track of all of the bodies sent over the wire
  61. # and their order.
  62. self.sent_bodies = []
  63. self.client.meta.events.register(
  64. 'before-parameter-build.s3.*', self.collect_body
  65. )
  66. def tearDown(self):
  67. super().tearDown()
  68. shutil.rmtree(self.tempdir)
  69. def collect_body(self, params, **kwargs):
  70. if 'Body' in params:
  71. self.sent_bodies.append(params['Body'].read())
  72. class TestAggregatedProgressCallback(unittest.TestCase):
  73. def setUp(self):
  74. self.aggregated_amounts = []
  75. self.threshold = 3
  76. self.aggregated_progress_callback = AggregatedProgressCallback(
  77. [self.callback], self.threshold
  78. )
  79. def callback(self, bytes_transferred):
  80. self.aggregated_amounts.append(bytes_transferred)
  81. def test_under_threshold(self):
  82. one_under_threshold_amount = self.threshold - 1
  83. self.aggregated_progress_callback(one_under_threshold_amount)
  84. self.assertEqual(self.aggregated_amounts, [])
  85. self.aggregated_progress_callback(1)
  86. self.assertEqual(self.aggregated_amounts, [self.threshold])
  87. def test_at_threshold(self):
  88. self.aggregated_progress_callback(self.threshold)
  89. self.assertEqual(self.aggregated_amounts, [self.threshold])
  90. def test_over_threshold(self):
  91. over_threshold_amount = self.threshold + 1
  92. self.aggregated_progress_callback(over_threshold_amount)
  93. self.assertEqual(self.aggregated_amounts, [over_threshold_amount])
  94. def test_flush(self):
  95. under_threshold_amount = self.threshold - 1
  96. self.aggregated_progress_callback(under_threshold_amount)
  97. self.assertEqual(self.aggregated_amounts, [])
  98. self.aggregated_progress_callback.flush()
  99. self.assertEqual(self.aggregated_amounts, [under_threshold_amount])
  100. def test_flush_with_nothing_to_flush(self):
  101. under_threshold_amount = self.threshold - 1
  102. self.aggregated_progress_callback(under_threshold_amount)
  103. self.assertEqual(self.aggregated_amounts, [])
  104. self.aggregated_progress_callback.flush()
  105. self.assertEqual(self.aggregated_amounts, [under_threshold_amount])
  106. # Flushing again should do nothing as it was just flushed
  107. self.aggregated_progress_callback.flush()
  108. self.assertEqual(self.aggregated_amounts, [under_threshold_amount])
  109. class TestInterruptReader(BaseUploadTest):
  110. def test_read_raises_exception(self):
  111. with open(self.filename, 'rb') as f:
  112. reader = InterruptReader(f, self.transfer_coordinator)
  113. # Read some bytes to show it can be read.
  114. self.assertEqual(reader.read(1), self.content[0:1])
  115. # Then set an exception in the transfer coordinator
  116. self.transfer_coordinator.set_exception(InterruptionError())
  117. # The next read should have the exception propograte
  118. with self.assertRaises(InterruptionError):
  119. reader.read()
  120. def test_seek(self):
  121. with open(self.filename, 'rb') as f:
  122. reader = InterruptReader(f, self.transfer_coordinator)
  123. # Ensure it can seek correctly
  124. reader.seek(1)
  125. self.assertEqual(reader.read(1), self.content[1:2])
  126. def test_tell(self):
  127. with open(self.filename, 'rb') as f:
  128. reader = InterruptReader(f, self.transfer_coordinator)
  129. # Ensure it can tell correctly
  130. reader.seek(1)
  131. self.assertEqual(reader.tell(), 1)
  132. class BaseUploadInputManagerTest(BaseUploadTest):
  133. def setUp(self):
  134. super().setUp()
  135. self.osutil = OSUtils()
  136. self.config = TransferConfig()
  137. self.recording_subscriber = RecordingSubscriber()
  138. self.subscribers.append(self.recording_subscriber)
  139. def _get_expected_body_for_part(self, part_number):
  140. # A helper method for retrieving the expected body for a specific
  141. # part number of the data
  142. total_size = len(self.content)
  143. chunk_size = self.config.multipart_chunksize
  144. start_index = (part_number - 1) * chunk_size
  145. end_index = part_number * chunk_size
  146. if end_index >= total_size:
  147. return self.content[start_index:]
  148. return self.content[start_index:end_index]
  149. class TestUploadFilenameInputManager(BaseUploadInputManagerTest):
  150. def setUp(self):
  151. super().setUp()
  152. self.upload_input_manager = UploadFilenameInputManager(
  153. self.osutil, self.transfer_coordinator
  154. )
  155. self.call_args = CallArgs(
  156. fileobj=self.filename, subscribers=self.subscribers
  157. )
  158. self.future = self.get_transfer_future(self.call_args)
  159. def test_is_compatible(self):
  160. self.assertTrue(
  161. self.upload_input_manager.is_compatible(
  162. self.future.meta.call_args.fileobj
  163. )
  164. )
  165. def test_stores_bodies_in_memory_put_object(self):
  166. self.assertFalse(
  167. self.upload_input_manager.stores_body_in_memory('put_object')
  168. )
  169. def test_stores_bodies_in_memory_upload_part(self):
  170. self.assertFalse(
  171. self.upload_input_manager.stores_body_in_memory('upload_part')
  172. )
  173. def test_provide_transfer_size(self):
  174. self.upload_input_manager.provide_transfer_size(self.future)
  175. # The provided file size should be equal to size of the contents of
  176. # the file.
  177. self.assertEqual(self.future.meta.size, len(self.content))
  178. def test_requires_multipart_upload(self):
  179. self.future.meta.provide_transfer_size(len(self.content))
  180. # With the default multipart threshold, the length of the content
  181. # should be smaller than the threshold thus not requiring a multipart
  182. # transfer.
  183. self.assertFalse(
  184. self.upload_input_manager.requires_multipart_upload(
  185. self.future, self.config
  186. )
  187. )
  188. # Decreasing the threshold to that of the length of the content of
  189. # the file should trigger the need for a multipart upload.
  190. self.config.multipart_threshold = len(self.content)
  191. self.assertTrue(
  192. self.upload_input_manager.requires_multipart_upload(
  193. self.future, self.config
  194. )
  195. )
  196. def test_get_put_object_body(self):
  197. self.future.meta.provide_transfer_size(len(self.content))
  198. read_file_chunk = self.upload_input_manager.get_put_object_body(
  199. self.future
  200. )
  201. read_file_chunk.enable_callback()
  202. # The file-like object provided back should be the same as the content
  203. # of the file.
  204. with read_file_chunk:
  205. self.assertEqual(read_file_chunk.read(), self.content)
  206. # The file-like object should also have been wrapped with the
  207. # on_queued callbacks to track the amount of bytes being transferred.
  208. self.assertEqual(
  209. self.recording_subscriber.calculate_bytes_seen(), len(self.content)
  210. )
  211. def test_get_put_object_body_is_interruptable(self):
  212. self.future.meta.provide_transfer_size(len(self.content))
  213. read_file_chunk = self.upload_input_manager.get_put_object_body(
  214. self.future
  215. )
  216. # Set an exception in the transfer coordinator
  217. self.transfer_coordinator.set_exception(InterruptionError)
  218. # Ensure the returned read file chunk can be interrupted with that
  219. # error.
  220. with self.assertRaises(InterruptionError):
  221. read_file_chunk.read()
  222. def test_yield_upload_part_bodies(self):
  223. # Adjust the chunk size to something more grainular for testing.
  224. self.config.multipart_chunksize = 4
  225. self.future.meta.provide_transfer_size(len(self.content))
  226. # Get an iterator that will yield all of the bodies and their
  227. # respective part number.
  228. part_iterator = self.upload_input_manager.yield_upload_part_bodies(
  229. self.future, self.config.multipart_chunksize
  230. )
  231. expected_part_number = 1
  232. for part_number, read_file_chunk in part_iterator:
  233. # Ensure that the part number is as expected
  234. self.assertEqual(part_number, expected_part_number)
  235. read_file_chunk.enable_callback()
  236. # Ensure that the body is correct for that part.
  237. with read_file_chunk:
  238. self.assertEqual(
  239. read_file_chunk.read(),
  240. self._get_expected_body_for_part(part_number),
  241. )
  242. expected_part_number += 1
  243. # All of the file-like object should also have been wrapped with the
  244. # on_queued callbacks to track the amount of bytes being transferred.
  245. self.assertEqual(
  246. self.recording_subscriber.calculate_bytes_seen(), len(self.content)
  247. )
  248. def test_yield_upload_part_bodies_are_interruptable(self):
  249. # Adjust the chunk size to something more grainular for testing.
  250. self.config.multipart_chunksize = 4
  251. self.future.meta.provide_transfer_size(len(self.content))
  252. # Get an iterator that will yield all of the bodies and their
  253. # respective part number.
  254. part_iterator = self.upload_input_manager.yield_upload_part_bodies(
  255. self.future, self.config.multipart_chunksize
  256. )
  257. # Set an exception in the transfer coordinator
  258. self.transfer_coordinator.set_exception(InterruptionError)
  259. for _, read_file_chunk in part_iterator:
  260. # Ensure that each read file chunk yielded can be interrupted
  261. # with that error.
  262. with self.assertRaises(InterruptionError):
  263. read_file_chunk.read()
  264. class TestUploadSeekableInputManager(TestUploadFilenameInputManager):
  265. def setUp(self):
  266. super().setUp()
  267. self.upload_input_manager = UploadSeekableInputManager(
  268. self.osutil, self.transfer_coordinator
  269. )
  270. self.fileobj = open(self.filename, 'rb')
  271. self.call_args = CallArgs(
  272. fileobj=self.fileobj, subscribers=self.subscribers
  273. )
  274. self.future = self.get_transfer_future(self.call_args)
  275. def tearDown(self):
  276. self.fileobj.close()
  277. super().tearDown()
  278. def test_is_compatible_bytes_io(self):
  279. self.assertTrue(self.upload_input_manager.is_compatible(BytesIO()))
  280. def test_not_compatible_for_non_filelike_obj(self):
  281. self.assertFalse(self.upload_input_manager.is_compatible(object()))
  282. def test_stores_bodies_in_memory_upload_part(self):
  283. self.assertTrue(
  284. self.upload_input_manager.stores_body_in_memory('upload_part')
  285. )
  286. def test_get_put_object_body(self):
  287. start_pos = 3
  288. self.fileobj.seek(start_pos)
  289. adjusted_size = len(self.content) - start_pos
  290. self.future.meta.provide_transfer_size(adjusted_size)
  291. read_file_chunk = self.upload_input_manager.get_put_object_body(
  292. self.future
  293. )
  294. read_file_chunk.enable_callback()
  295. # The fact that the file was seeked to start should be taken into
  296. # account in length and content for the read file chunk.
  297. with read_file_chunk:
  298. self.assertEqual(len(read_file_chunk), adjusted_size)
  299. self.assertEqual(read_file_chunk.read(), self.content[start_pos:])
  300. self.assertEqual(
  301. self.recording_subscriber.calculate_bytes_seen(), adjusted_size
  302. )
  303. class TestUploadNonSeekableInputManager(TestUploadFilenameInputManager):
  304. def setUp(self):
  305. super().setUp()
  306. self.upload_input_manager = UploadNonSeekableInputManager(
  307. self.osutil, self.transfer_coordinator
  308. )
  309. self.fileobj = NonSeekableReader(self.content)
  310. self.call_args = CallArgs(
  311. fileobj=self.fileobj, subscribers=self.subscribers
  312. )
  313. self.future = self.get_transfer_future(self.call_args)
  314. def assert_multipart_parts(self):
  315. """
  316. Asserts that the input manager will generate a multipart upload
  317. and that each part is in order and the correct size.
  318. """
  319. # Assert that a multipart upload is required.
  320. self.assertTrue(
  321. self.upload_input_manager.requires_multipart_upload(
  322. self.future, self.config
  323. )
  324. )
  325. # Get a list of all the parts that would be sent.
  326. parts = list(
  327. self.upload_input_manager.yield_upload_part_bodies(
  328. self.future, self.config.multipart_chunksize
  329. )
  330. )
  331. # Assert that the actual number of parts is what we would expect
  332. # based on the configuration.
  333. size = self.config.multipart_chunksize
  334. num_parts = math.ceil(len(self.content) / size)
  335. self.assertEqual(len(parts), num_parts)
  336. # Run for every part but the last part.
  337. for i, part in enumerate(parts[:-1]):
  338. # Assert the part number is correct.
  339. self.assertEqual(part[0], i + 1)
  340. # Assert the part contains the right amount of data.
  341. data = part[1].read()
  342. self.assertEqual(len(data), size)
  343. # Assert that the last part is the correct size.
  344. expected_final_size = len(self.content) - ((num_parts - 1) * size)
  345. final_part = parts[-1]
  346. self.assertEqual(len(final_part[1].read()), expected_final_size)
  347. # Assert that the last part has the correct part number.
  348. self.assertEqual(final_part[0], len(parts))
  349. def test_provide_transfer_size(self):
  350. self.upload_input_manager.provide_transfer_size(self.future)
  351. # There is no way to get the size without reading the entire body.
  352. self.assertEqual(self.future.meta.size, None)
  353. def test_stores_bodies_in_memory_upload_part(self):
  354. self.assertTrue(
  355. self.upload_input_manager.stores_body_in_memory('upload_part')
  356. )
  357. def test_stores_bodies_in_memory_put_object(self):
  358. self.assertTrue(
  359. self.upload_input_manager.stores_body_in_memory('put_object')
  360. )
  361. def test_initial_data_parts_threshold_lesser(self):
  362. # threshold < size
  363. self.config.multipart_chunksize = 4
  364. self.config.multipart_threshold = 2
  365. self.assert_multipart_parts()
  366. def test_initial_data_parts_threshold_equal(self):
  367. # threshold == size
  368. self.config.multipart_chunksize = 4
  369. self.config.multipart_threshold = 4
  370. self.assert_multipart_parts()
  371. def test_initial_data_parts_threshold_greater(self):
  372. # threshold > size
  373. self.config.multipart_chunksize = 4
  374. self.config.multipart_threshold = 8
  375. self.assert_multipart_parts()
  376. class TestUploadSubmissionTask(BaseSubmissionTaskTest):
  377. def setUp(self):
  378. super().setUp()
  379. self.tempdir = tempfile.mkdtemp()
  380. self.filename = os.path.join(self.tempdir, 'myfile')
  381. self.content = b'0' * (MIN_UPLOAD_CHUNKSIZE * 3)
  382. self.config.multipart_chunksize = MIN_UPLOAD_CHUNKSIZE
  383. self.config.multipart_threshold = MIN_UPLOAD_CHUNKSIZE * 5
  384. with open(self.filename, 'wb') as f:
  385. f.write(self.content)
  386. self.bucket = 'mybucket'
  387. self.key = 'mykey'
  388. self.extra_args = {}
  389. self.subscribers = []
  390. # A list to keep track of all of the bodies sent over the wire
  391. # and their order.
  392. self.sent_bodies = []
  393. self.client.meta.events.register(
  394. 'before-parameter-build.s3.*', self.collect_body
  395. )
  396. self.call_args = self.get_call_args()
  397. self.transfer_future = self.get_transfer_future(self.call_args)
  398. self.submission_main_kwargs = {
  399. 'client': self.client,
  400. 'config': self.config,
  401. 'osutil': self.osutil,
  402. 'request_executor': self.executor,
  403. 'transfer_future': self.transfer_future,
  404. }
  405. self.submission_task = self.get_task(
  406. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs
  407. )
  408. def tearDown(self):
  409. super().tearDown()
  410. shutil.rmtree(self.tempdir)
  411. def collect_body(self, params, **kwargs):
  412. if 'Body' in params:
  413. self.sent_bodies.append(params['Body'].read())
  414. def get_call_args(self, **kwargs):
  415. default_call_args = {
  416. 'fileobj': self.filename,
  417. 'bucket': self.bucket,
  418. 'key': self.key,
  419. 'extra_args': self.extra_args,
  420. 'subscribers': self.subscribers,
  421. }
  422. default_call_args.update(kwargs)
  423. return CallArgs(**default_call_args)
  424. def add_multipart_upload_stubbed_responses(self):
  425. self.stubber.add_response(
  426. method='create_multipart_upload',
  427. service_response={'UploadId': 'my-id'},
  428. )
  429. self.stubber.add_response(
  430. method='upload_part', service_response={'ETag': 'etag-1'}
  431. )
  432. self.stubber.add_response(
  433. method='upload_part', service_response={'ETag': 'etag-2'}
  434. )
  435. self.stubber.add_response(
  436. method='upload_part', service_response={'ETag': 'etag-3'}
  437. )
  438. self.stubber.add_response(
  439. method='complete_multipart_upload', service_response={}
  440. )
  441. def wrap_executor_in_recorder(self):
  442. self.executor = RecordingExecutor(self.executor)
  443. self.submission_main_kwargs['request_executor'] = self.executor
  444. def use_fileobj_in_call_args(self, fileobj):
  445. self.call_args = self.get_call_args(fileobj=fileobj)
  446. self.transfer_future = self.get_transfer_future(self.call_args)
  447. self.submission_main_kwargs['transfer_future'] = self.transfer_future
  448. def assert_tag_value_for_put_object(self, tag_value):
  449. self.assertEqual(self.executor.submissions[0]['tag'], tag_value)
  450. def assert_tag_value_for_upload_parts(self, tag_value):
  451. for submission in self.executor.submissions[1:-1]:
  452. self.assertEqual(submission['tag'], tag_value)
  453. def test_provide_file_size_on_put(self):
  454. self.call_args.subscribers.append(FileSizeProvider(len(self.content)))
  455. self.stubber.add_response(
  456. method='put_object',
  457. service_response={},
  458. expected_params={
  459. 'Body': ANY,
  460. 'Bucket': self.bucket,
  461. 'Key': self.key,
  462. },
  463. )
  464. # With this submitter, it will fail to stat the file if a transfer
  465. # size is not provided.
  466. self.submission_main_kwargs['osutil'] = OSUtilsExceptionOnFileSize()
  467. self.submission_task = self.get_task(
  468. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs
  469. )
  470. self.submission_task()
  471. self.transfer_future.result()
  472. self.stubber.assert_no_pending_responses()
  473. self.assertEqual(self.sent_bodies, [self.content])
  474. def test_submits_no_tag_for_put_object_filename(self):
  475. self.wrap_executor_in_recorder()
  476. self.stubber.add_response('put_object', {})
  477. self.submission_task = self.get_task(
  478. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs
  479. )
  480. self.submission_task()
  481. self.transfer_future.result()
  482. self.stubber.assert_no_pending_responses()
  483. # Make sure no tag to limit that task specifically was not associated
  484. # to that task submission.
  485. self.assert_tag_value_for_put_object(None)
  486. def test_submits_no_tag_for_multipart_filename(self):
  487. self.wrap_executor_in_recorder()
  488. # Set up for a multipart upload.
  489. self.add_multipart_upload_stubbed_responses()
  490. self.config.multipart_threshold = 1
  491. self.submission_task = self.get_task(
  492. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs
  493. )
  494. self.submission_task()
  495. self.transfer_future.result()
  496. self.stubber.assert_no_pending_responses()
  497. # Make sure no tag to limit any of the upload part tasks were
  498. # were associated when submitted to the executor
  499. self.assert_tag_value_for_upload_parts(None)
  500. def test_submits_no_tag_for_put_object_fileobj(self):
  501. self.wrap_executor_in_recorder()
  502. self.stubber.add_response('put_object', {})
  503. with open(self.filename, 'rb') as f:
  504. self.use_fileobj_in_call_args(f)
  505. self.submission_task = self.get_task(
  506. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs
  507. )
  508. self.submission_task()
  509. self.transfer_future.result()
  510. self.stubber.assert_no_pending_responses()
  511. # Make sure no tag to limit that task specifically was not associated
  512. # to that task submission.
  513. self.assert_tag_value_for_put_object(None)
  514. def test_submits_tag_for_multipart_fileobj(self):
  515. self.wrap_executor_in_recorder()
  516. # Set up for a multipart upload.
  517. self.add_multipart_upload_stubbed_responses()
  518. self.config.multipart_threshold = 1
  519. with open(self.filename, 'rb') as f:
  520. self.use_fileobj_in_call_args(f)
  521. self.submission_task = self.get_task(
  522. UploadSubmissionTask, main_kwargs=self.submission_main_kwargs
  523. )
  524. self.submission_task()
  525. self.transfer_future.result()
  526. self.stubber.assert_no_pending_responses()
  527. # Make sure tags to limit all of the upload part tasks were
  528. # were associated when submitted to the executor as these tasks will
  529. # have chunks of data stored with them in memory.
  530. self.assert_tag_value_for_upload_parts(IN_MEMORY_UPLOAD_TAG)
  531. class TestPutObjectTask(BaseUploadTest):
  532. def test_main(self):
  533. extra_args = {'Metadata': {'foo': 'bar'}}
  534. with open(self.filename, 'rb') as fileobj:
  535. task = self.get_task(
  536. PutObjectTask,
  537. main_kwargs={
  538. 'client': self.client,
  539. 'fileobj': fileobj,
  540. 'bucket': self.bucket,
  541. 'key': self.key,
  542. 'extra_args': extra_args,
  543. },
  544. )
  545. self.stubber.add_response(
  546. method='put_object',
  547. service_response={},
  548. expected_params={
  549. 'Body': ANY,
  550. 'Bucket': self.bucket,
  551. 'Key': self.key,
  552. 'Metadata': {'foo': 'bar'},
  553. },
  554. )
  555. task()
  556. self.stubber.assert_no_pending_responses()
  557. self.assertEqual(self.sent_bodies, [self.content])
  558. class TestUploadPartTask(BaseUploadTest):
  559. def test_main(self):
  560. extra_args = {'RequestPayer': 'requester'}
  561. upload_id = 'my-id'
  562. part_number = 1
  563. etag = 'foo'
  564. with open(self.filename, 'rb') as fileobj:
  565. task = self.get_task(
  566. UploadPartTask,
  567. main_kwargs={
  568. 'client': self.client,
  569. 'fileobj': fileobj,
  570. 'bucket': self.bucket,
  571. 'key': self.key,
  572. 'upload_id': upload_id,
  573. 'part_number': part_number,
  574. 'extra_args': extra_args,
  575. },
  576. )
  577. self.stubber.add_response(
  578. method='upload_part',
  579. service_response={'ETag': etag},
  580. expected_params={
  581. 'Body': ANY,
  582. 'Bucket': self.bucket,
  583. 'Key': self.key,
  584. 'UploadId': upload_id,
  585. 'PartNumber': part_number,
  586. 'RequestPayer': 'requester',
  587. },
  588. )
  589. rval = task()
  590. self.stubber.assert_no_pending_responses()
  591. self.assertEqual(rval, {'ETag': etag, 'PartNumber': part_number})
  592. self.assertEqual(self.sent_bodies, [self.content])