test_upload.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import os
  14. import shutil
  15. import tempfile
  16. import time
  17. from io import BytesIO
  18. from botocore.awsrequest import AWSRequest
  19. from botocore.client import Config
  20. from botocore.exceptions import ClientError
  21. from botocore.stub import ANY
  22. from s3transfer.manager import TransferConfig, TransferManager
  23. from s3transfer.utils import ChunksizeAdjuster
  24. from tests import (
  25. BaseGeneralInterfaceTest,
  26. NonSeekableReader,
  27. RecordingOSUtils,
  28. RecordingSubscriber,
  29. mock,
  30. )
  31. class BaseUploadTest(BaseGeneralInterfaceTest):
  32. def setUp(self):
  33. super().setUp()
  34. # TODO: We do not want to use the real MIN_UPLOAD_CHUNKSIZE
  35. # when we're adjusting parts.
  36. # This is really wasteful and fails CI builds because self.contents
  37. # would normally use 10MB+ of memory.
  38. # Until there's an API to configure this, we're patching this with
  39. # a min size of 1. We can't patch MIN_UPLOAD_CHUNKSIZE directly
  40. # because it's already bound to a default value in the
  41. # chunksize adjuster. Instead we need to patch out the
  42. # chunksize adjuster class.
  43. self.adjuster_patch = mock.patch(
  44. 's3transfer.upload.ChunksizeAdjuster',
  45. lambda: ChunksizeAdjuster(min_size=1),
  46. )
  47. self.adjuster_patch.start()
  48. self.config = TransferConfig(max_request_concurrency=1)
  49. self._manager = TransferManager(self.client, self.config)
  50. # Create a temporary directory with files to read from
  51. self.tempdir = tempfile.mkdtemp()
  52. self.filename = os.path.join(self.tempdir, 'myfile')
  53. self.content = b'my content'
  54. with open(self.filename, 'wb') as f:
  55. f.write(self.content)
  56. # Initialize some default arguments
  57. self.bucket = 'mybucket'
  58. self.key = 'mykey'
  59. self.extra_args = {}
  60. self.subscribers = []
  61. # A list to keep track of all of the bodies sent over the wire
  62. # and their order.
  63. self.sent_bodies = []
  64. self.client.meta.events.register(
  65. 'before-parameter-build.s3.*', self.collect_body
  66. )
  67. def tearDown(self):
  68. super().tearDown()
  69. shutil.rmtree(self.tempdir)
  70. self.adjuster_patch.stop()
  71. def collect_body(self, params, model, **kwargs):
  72. # A handler to simulate the reading of the body including the
  73. # request-created event that signals to simulate the progress
  74. # callbacks
  75. if 'Body' in params:
  76. # TODO: This is not ideal. Need to figure out a better idea of
  77. # simulating reading of the request across the wire to trigger
  78. # progress callbacks
  79. request = AWSRequest(
  80. method='PUT',
  81. url='https://s3.amazonaws.com',
  82. data=params['Body'],
  83. )
  84. self.client.meta.events.emit(
  85. 'request-created.s3.%s' % model.name,
  86. request=request,
  87. operation_name=model.name,
  88. )
  89. self.sent_bodies.append(self._stream_body(params['Body']))
  90. def _stream_body(self, body):
  91. read_amt = 8 * 1024
  92. data = body.read(read_amt)
  93. collected_body = data
  94. while data:
  95. data = body.read(read_amt)
  96. collected_body += data
  97. return collected_body
  98. @property
  99. def manager(self):
  100. return self._manager
  101. @property
  102. def method(self):
  103. return self.manager.upload
  104. def create_call_kwargs(self):
  105. return {
  106. 'fileobj': self.filename,
  107. 'bucket': self.bucket,
  108. 'key': self.key,
  109. }
  110. def create_invalid_extra_args(self):
  111. return {'Foo': 'bar'}
  112. def create_stubbed_responses(self):
  113. return [{'method': 'put_object', 'service_response': {}}]
  114. def create_expected_progress_callback_info(self):
  115. return [{'bytes_transferred': 10}]
  116. def assert_expected_client_calls_were_correct(self):
  117. # We assert that expected client calls were made by ensuring that
  118. # there are no more pending responses. If there are no more pending
  119. # responses, then all stubbed responses were consumed.
  120. self.stubber.assert_no_pending_responses()
  121. class TestNonMultipartUpload(BaseUploadTest):
  122. __test__ = True
  123. def add_put_object_response_with_default_expected_params(
  124. self, extra_expected_params=None
  125. ):
  126. expected_params = {'Body': ANY, 'Bucket': self.bucket, 'Key': self.key}
  127. if extra_expected_params:
  128. expected_params.update(extra_expected_params)
  129. upload_response = self.create_stubbed_responses()[0]
  130. upload_response['expected_params'] = expected_params
  131. self.stubber.add_response(**upload_response)
  132. def assert_put_object_body_was_correct(self):
  133. self.assertEqual(self.sent_bodies, [self.content])
  134. def test_upload(self):
  135. self.extra_args['RequestPayer'] = 'requester'
  136. self.add_put_object_response_with_default_expected_params(
  137. extra_expected_params={'RequestPayer': 'requester'}
  138. )
  139. future = self.manager.upload(
  140. self.filename, self.bucket, self.key, self.extra_args
  141. )
  142. future.result()
  143. self.assert_expected_client_calls_were_correct()
  144. self.assert_put_object_body_was_correct()
  145. def test_upload_with_checksum(self):
  146. self.extra_args['ChecksumAlgorithm'] = 'crc32'
  147. self.add_put_object_response_with_default_expected_params(
  148. extra_expected_params={'ChecksumAlgorithm': 'crc32'}
  149. )
  150. future = self.manager.upload(
  151. self.filename, self.bucket, self.key, self.extra_args
  152. )
  153. future.result()
  154. self.assert_expected_client_calls_were_correct()
  155. self.assert_put_object_body_was_correct()
  156. def test_upload_for_fileobj(self):
  157. self.add_put_object_response_with_default_expected_params()
  158. with open(self.filename, 'rb') as f:
  159. future = self.manager.upload(
  160. f, self.bucket, self.key, self.extra_args
  161. )
  162. future.result()
  163. self.assert_expected_client_calls_were_correct()
  164. self.assert_put_object_body_was_correct()
  165. def test_upload_for_seekable_filelike_obj(self):
  166. self.add_put_object_response_with_default_expected_params()
  167. bytes_io = BytesIO(self.content)
  168. future = self.manager.upload(
  169. bytes_io, self.bucket, self.key, self.extra_args
  170. )
  171. future.result()
  172. self.assert_expected_client_calls_were_correct()
  173. self.assert_put_object_body_was_correct()
  174. def test_upload_for_seekable_filelike_obj_that_has_been_seeked(self):
  175. self.add_put_object_response_with_default_expected_params()
  176. bytes_io = BytesIO(self.content)
  177. seek_pos = 5
  178. bytes_io.seek(seek_pos)
  179. future = self.manager.upload(
  180. bytes_io, self.bucket, self.key, self.extra_args
  181. )
  182. future.result()
  183. self.assert_expected_client_calls_were_correct()
  184. self.assertEqual(b''.join(self.sent_bodies), self.content[seek_pos:])
  185. def test_upload_for_non_seekable_filelike_obj(self):
  186. self.add_put_object_response_with_default_expected_params()
  187. body = NonSeekableReader(self.content)
  188. future = self.manager.upload(
  189. body, self.bucket, self.key, self.extra_args
  190. )
  191. future.result()
  192. self.assert_expected_client_calls_were_correct()
  193. self.assert_put_object_body_was_correct()
  194. def test_sigv4_progress_callbacks_invoked_once(self):
  195. # Reset the client and manager to use sigv4
  196. self.reset_stubber_with_new_client(
  197. {'config': Config(signature_version='s3v4')}
  198. )
  199. self.client.meta.events.register(
  200. 'before-parameter-build.s3.*', self.collect_body
  201. )
  202. self._manager = TransferManager(self.client, self.config)
  203. # Add the stubbed response.
  204. self.add_put_object_response_with_default_expected_params()
  205. subscriber = RecordingSubscriber()
  206. future = self.manager.upload(
  207. self.filename, self.bucket, self.key, subscribers=[subscriber]
  208. )
  209. future.result()
  210. self.assert_expected_client_calls_were_correct()
  211. # The amount of bytes seen should be the same as the file size
  212. self.assertEqual(subscriber.calculate_bytes_seen(), len(self.content))
  213. def test_uses_provided_osutil(self):
  214. osutil = RecordingOSUtils()
  215. # Use the recording os utility for the transfer manager
  216. self._manager = TransferManager(self.client, self.config, osutil)
  217. self.add_put_object_response_with_default_expected_params()
  218. future = self.manager.upload(self.filename, self.bucket, self.key)
  219. future.result()
  220. # The upload should have used the os utility. We check this by making
  221. # sure that the recorded opens are as expected.
  222. expected_opens = [(self.filename, 'rb')]
  223. self.assertEqual(osutil.open_records, expected_opens)
  224. def test_allowed_upload_params_are_valid(self):
  225. op_model = self.client.meta.service_model.operation_model('PutObject')
  226. for allowed_upload_arg in self._manager.ALLOWED_UPLOAD_ARGS:
  227. self.assertIn(allowed_upload_arg, op_model.input_shape.members)
  228. def test_upload_with_bandwidth_limiter(self):
  229. self.content = b'a' * 1024 * 1024
  230. with open(self.filename, 'wb') as f:
  231. f.write(self.content)
  232. self.config = TransferConfig(
  233. max_request_concurrency=1, max_bandwidth=len(self.content) / 2
  234. )
  235. self._manager = TransferManager(self.client, self.config)
  236. self.add_put_object_response_with_default_expected_params()
  237. start = time.time()
  238. future = self.manager.upload(self.filename, self.bucket, self.key)
  239. future.result()
  240. # This is just a smoke test to make sure that the limiter is
  241. # being used and not necessary its exactness. So we set the maximum
  242. # bandwidth to len(content)/2 per sec and make sure that it is
  243. # noticeably slower. Ideally it will take more than two seconds, but
  244. # given tracking at the beginning of transfers are not entirely
  245. # accurate setting at the initial start of a transfer, we give us
  246. # some flexibility by setting the expected time to half of the
  247. # theoretical time to take.
  248. self.assertGreaterEqual(time.time() - start, 1)
  249. self.assert_expected_client_calls_were_correct()
  250. self.assert_put_object_body_was_correct()
  251. def test_raise_exception_on_s3_object_lambda_resource(self):
  252. s3_object_lambda_arn = (
  253. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  254. 'accesspoint:my-accesspoint'
  255. )
  256. with self.assertRaisesRegex(ValueError, 'methods do not support'):
  257. self.manager.upload(self.filename, s3_object_lambda_arn, self.key)
  258. class TestMultipartUpload(BaseUploadTest):
  259. __test__ = True
  260. def setUp(self):
  261. super().setUp()
  262. self.chunksize = 4
  263. self.config = TransferConfig(
  264. max_request_concurrency=1,
  265. multipart_threshold=1,
  266. multipart_chunksize=self.chunksize,
  267. )
  268. self._manager = TransferManager(self.client, self.config)
  269. self.multipart_id = 'my-upload-id'
  270. def create_stubbed_responses(self):
  271. return [
  272. {
  273. 'method': 'create_multipart_upload',
  274. 'service_response': {'UploadId': self.multipart_id},
  275. },
  276. {'method': 'upload_part', 'service_response': {'ETag': 'etag-1'}},
  277. {'method': 'upload_part', 'service_response': {'ETag': 'etag-2'}},
  278. {'method': 'upload_part', 'service_response': {'ETag': 'etag-3'}},
  279. {'method': 'complete_multipart_upload', 'service_response': {}},
  280. ]
  281. def create_expected_progress_callback_info(self):
  282. return [
  283. {'bytes_transferred': 4},
  284. {'bytes_transferred': 4},
  285. {'bytes_transferred': 2},
  286. ]
  287. def assert_upload_part_bodies_were_correct(self):
  288. expected_contents = []
  289. for i in range(0, len(self.content), self.chunksize):
  290. end_i = i + self.chunksize
  291. if end_i > len(self.content):
  292. expected_contents.append(self.content[i:])
  293. else:
  294. expected_contents.append(self.content[i:end_i])
  295. self.assertEqual(self.sent_bodies, expected_contents)
  296. def add_create_multipart_response_with_default_expected_params(
  297. self, extra_expected_params=None
  298. ):
  299. expected_params = {'Bucket': self.bucket, 'Key': self.key}
  300. if extra_expected_params:
  301. expected_params.update(extra_expected_params)
  302. response = self.create_stubbed_responses()[0]
  303. response['expected_params'] = expected_params
  304. self.stubber.add_response(**response)
  305. def add_upload_part_responses_with_default_expected_params(
  306. self, extra_expected_params=None
  307. ):
  308. num_parts = 3
  309. upload_part_responses = self.create_stubbed_responses()[1:-1]
  310. for i in range(num_parts):
  311. upload_part_response = upload_part_responses[i]
  312. expected_params = {
  313. 'Bucket': self.bucket,
  314. 'Key': self.key,
  315. 'UploadId': self.multipart_id,
  316. 'Body': ANY,
  317. 'PartNumber': i + 1,
  318. }
  319. if extra_expected_params:
  320. expected_params.update(extra_expected_params)
  321. # If ChecksumAlgorithm is present stub the response checksums
  322. if 'ChecksumAlgorithm' in extra_expected_params:
  323. name = extra_expected_params['ChecksumAlgorithm']
  324. checksum_member = 'Checksum%s' % name.upper()
  325. response = upload_part_response['service_response']
  326. response[checksum_member] = 'sum%s==' % (i + 1)
  327. upload_part_response['expected_params'] = expected_params
  328. self.stubber.add_response(**upload_part_response)
  329. def add_complete_multipart_response_with_default_expected_params(
  330. self, extra_expected_params=None
  331. ):
  332. expected_params = {
  333. 'Bucket': self.bucket,
  334. 'Key': self.key,
  335. 'UploadId': self.multipart_id,
  336. 'MultipartUpload': {
  337. 'Parts': [
  338. {'ETag': 'etag-1', 'PartNumber': 1},
  339. {'ETag': 'etag-2', 'PartNumber': 2},
  340. {'ETag': 'etag-3', 'PartNumber': 3},
  341. ]
  342. },
  343. }
  344. if extra_expected_params:
  345. expected_params.update(extra_expected_params)
  346. response = self.create_stubbed_responses()[-1]
  347. response['expected_params'] = expected_params
  348. self.stubber.add_response(**response)
  349. def test_upload(self):
  350. self.extra_args['RequestPayer'] = 'requester'
  351. # Add requester pays to the create multipart upload and upload parts.
  352. self.add_create_multipart_response_with_default_expected_params(
  353. extra_expected_params={'RequestPayer': 'requester'}
  354. )
  355. self.add_upload_part_responses_with_default_expected_params(
  356. extra_expected_params={'RequestPayer': 'requester'}
  357. )
  358. self.add_complete_multipart_response_with_default_expected_params(
  359. extra_expected_params={'RequestPayer': 'requester'}
  360. )
  361. future = self.manager.upload(
  362. self.filename, self.bucket, self.key, self.extra_args
  363. )
  364. future.result()
  365. self.assert_expected_client_calls_were_correct()
  366. def test_upload_for_fileobj(self):
  367. self.add_create_multipart_response_with_default_expected_params()
  368. self.add_upload_part_responses_with_default_expected_params()
  369. self.add_complete_multipart_response_with_default_expected_params()
  370. with open(self.filename, 'rb') as f:
  371. future = self.manager.upload(
  372. f, self.bucket, self.key, self.extra_args
  373. )
  374. future.result()
  375. self.assert_expected_client_calls_were_correct()
  376. self.assert_upload_part_bodies_were_correct()
  377. def test_upload_for_seekable_filelike_obj(self):
  378. self.add_create_multipart_response_with_default_expected_params()
  379. self.add_upload_part_responses_with_default_expected_params()
  380. self.add_complete_multipart_response_with_default_expected_params()
  381. bytes_io = BytesIO(self.content)
  382. future = self.manager.upload(
  383. bytes_io, self.bucket, self.key, self.extra_args
  384. )
  385. future.result()
  386. self.assert_expected_client_calls_were_correct()
  387. self.assert_upload_part_bodies_were_correct()
  388. def test_upload_for_seekable_filelike_obj_that_has_been_seeked(self):
  389. self.add_create_multipart_response_with_default_expected_params()
  390. self.add_upload_part_responses_with_default_expected_params()
  391. self.add_complete_multipart_response_with_default_expected_params()
  392. bytes_io = BytesIO(self.content)
  393. seek_pos = 1
  394. bytes_io.seek(seek_pos)
  395. future = self.manager.upload(
  396. bytes_io, self.bucket, self.key, self.extra_args
  397. )
  398. future.result()
  399. self.assert_expected_client_calls_were_correct()
  400. self.assertEqual(b''.join(self.sent_bodies), self.content[seek_pos:])
  401. def test_upload_for_non_seekable_filelike_obj(self):
  402. self.add_create_multipart_response_with_default_expected_params()
  403. self.add_upload_part_responses_with_default_expected_params()
  404. self.add_complete_multipart_response_with_default_expected_params()
  405. stream = NonSeekableReader(self.content)
  406. future = self.manager.upload(
  407. stream, self.bucket, self.key, self.extra_args
  408. )
  409. future.result()
  410. self.assert_expected_client_calls_were_correct()
  411. self.assert_upload_part_bodies_were_correct()
  412. def test_limits_in_memory_chunks_for_fileobj(self):
  413. # Limit the maximum in memory chunks to one but make number of
  414. # threads more than one. This means that the upload will have to
  415. # happen sequentially despite having many threads available because
  416. # data is sequentially partitioned into chunks in memory and since
  417. # there can only every be one in memory chunk, each upload part will
  418. # have to happen one at a time.
  419. self.config.max_request_concurrency = 10
  420. self.config.max_in_memory_upload_chunks = 1
  421. self._manager = TransferManager(self.client, self.config)
  422. # Add some default stubbed responses.
  423. # These responses are added in order of part number so if the
  424. # multipart upload is not done sequentially, which it should because
  425. # we limit the in memory upload chunks to one, the stubber will
  426. # raise exceptions for mismatching parameters for partNumber when
  427. # once the upload() method is called on the transfer manager.
  428. # If there is a mismatch, the stubber error will propagate on
  429. # the future.result()
  430. self.add_create_multipart_response_with_default_expected_params()
  431. self.add_upload_part_responses_with_default_expected_params()
  432. self.add_complete_multipart_response_with_default_expected_params()
  433. with open(self.filename, 'rb') as f:
  434. future = self.manager.upload(
  435. f, self.bucket, self.key, self.extra_args
  436. )
  437. future.result()
  438. # Make sure that the stubber had all of its stubbed responses consumed.
  439. self.assert_expected_client_calls_were_correct()
  440. # Ensure the contents were uploaded in sequentially order by checking
  441. # the sent contents were in order.
  442. self.assert_upload_part_bodies_were_correct()
  443. def test_upload_failure_invokes_abort(self):
  444. self.stubber.add_response(
  445. method='create_multipart_upload',
  446. service_response={'UploadId': self.multipart_id},
  447. expected_params={'Bucket': self.bucket, 'Key': self.key},
  448. )
  449. self.stubber.add_response(
  450. method='upload_part',
  451. service_response={'ETag': 'etag-1'},
  452. expected_params={
  453. 'Bucket': self.bucket,
  454. 'Body': ANY,
  455. 'Key': self.key,
  456. 'UploadId': self.multipart_id,
  457. 'PartNumber': 1,
  458. },
  459. )
  460. # With the upload part failing this should immediately initiate
  461. # an abort multipart with no more upload parts called.
  462. self.stubber.add_client_error(method='upload_part')
  463. self.stubber.add_response(
  464. method='abort_multipart_upload',
  465. service_response={},
  466. expected_params={
  467. 'Bucket': self.bucket,
  468. 'Key': self.key,
  469. 'UploadId': self.multipart_id,
  470. },
  471. )
  472. future = self.manager.upload(self.filename, self.bucket, self.key)
  473. # The exception should get propagated to the future and not be
  474. # a cancelled error or something.
  475. with self.assertRaises(ClientError):
  476. future.result()
  477. self.assert_expected_client_calls_were_correct()
  478. def test_upload_passes_select_extra_args(self):
  479. self.extra_args['Metadata'] = {'foo': 'bar'}
  480. # Add metadata to expected create multipart upload call
  481. self.add_create_multipart_response_with_default_expected_params(
  482. extra_expected_params={'Metadata': {'foo': 'bar'}}
  483. )
  484. self.add_upload_part_responses_with_default_expected_params()
  485. self.add_complete_multipart_response_with_default_expected_params()
  486. future = self.manager.upload(
  487. self.filename, self.bucket, self.key, self.extra_args
  488. )
  489. future.result()
  490. self.assert_expected_client_calls_were_correct()
  491. def test_multipart_upload_passes_checksums(self):
  492. self.extra_args['ChecksumAlgorithm'] = 'sha1'
  493. # ChecksumAlgorithm should be passed on the create_multipart call
  494. self.add_create_multipart_response_with_default_expected_params(
  495. extra_expected_params={'ChecksumAlgorithm': 'sha1'},
  496. )
  497. # ChecksumAlgorithm should be forwarded and a SHA1 will come back
  498. self.add_upload_part_responses_with_default_expected_params(
  499. extra_expected_params={'ChecksumAlgorithm': 'sha1'},
  500. )
  501. # The checksums should be used in the complete call like etags
  502. self.add_complete_multipart_response_with_default_expected_params(
  503. extra_expected_params={
  504. 'MultipartUpload': {
  505. 'Parts': [
  506. {
  507. 'ETag': 'etag-1',
  508. 'PartNumber': 1,
  509. 'ChecksumSHA1': 'sum1==',
  510. },
  511. {
  512. 'ETag': 'etag-2',
  513. 'PartNumber': 2,
  514. 'ChecksumSHA1': 'sum2==',
  515. },
  516. {
  517. 'ETag': 'etag-3',
  518. 'PartNumber': 3,
  519. 'ChecksumSHA1': 'sum3==',
  520. },
  521. ]
  522. }
  523. },
  524. )
  525. future = self.manager.upload(
  526. self.filename, self.bucket, self.key, self.extra_args
  527. )
  528. future.result()
  529. self.assert_expected_client_calls_were_correct()