test_copy.py 20 KB


  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from botocore.exceptions import ClientError
  14. from botocore.stub import Stubber
  15. from s3transfer.manager import TransferConfig, TransferManager
  16. from s3transfer.utils import MIN_UPLOAD_CHUNKSIZE
  17. from tests import BaseGeneralInterfaceTest, FileSizeProvider
  18. class BaseCopyTest(BaseGeneralInterfaceTest):
  19. def setUp(self):
  20. super().setUp()
  21. self.config = TransferConfig(
  22. max_request_concurrency=1,
  23. multipart_chunksize=MIN_UPLOAD_CHUNKSIZE,
  24. multipart_threshold=MIN_UPLOAD_CHUNKSIZE * 4,
  25. )
  26. self._manager = TransferManager(self.client, self.config)
  27. # Initialize some default arguments
  28. self.bucket = 'mybucket'
  29. self.key = 'mykey'
  30. self.copy_source = {'Bucket': 'mysourcebucket', 'Key': 'mysourcekey'}
  31. self.extra_args = {}
  32. self.subscribers = []
  33. self.half_chunksize = int(MIN_UPLOAD_CHUNKSIZE / 2)
  34. self.content = b'0' * (2 * MIN_UPLOAD_CHUNKSIZE + self.half_chunksize)
  35. @property
  36. def manager(self):
  37. return self._manager
  38. @property
  39. def method(self):
  40. return self.manager.copy
  41. def create_call_kwargs(self):
  42. return {
  43. 'copy_source': self.copy_source,
  44. 'bucket': self.bucket,
  45. 'key': self.key,
  46. }
  47. def create_invalid_extra_args(self):
  48. return {'Foo': 'bar'}
  49. def create_stubbed_responses(self):
  50. return [
  51. {
  52. 'method': 'head_object',
  53. 'service_response': {'ContentLength': len(self.content)},
  54. },
  55. {'method': 'copy_object', 'service_response': {}},
  56. ]
  57. def create_expected_progress_callback_info(self):
  58. return [
  59. {'bytes_transferred': len(self.content)},
  60. ]
  61. def add_head_object_response(self, expected_params=None, stubber=None):
  62. if not stubber:
  63. stubber = self.stubber
  64. head_response = self.create_stubbed_responses()[0]
  65. if expected_params:
  66. head_response['expected_params'] = expected_params
  67. stubber.add_response(**head_response)
  68. def add_successful_copy_responses(
  69. self,
  70. expected_copy_params=None,
  71. expected_create_mpu_params=None,
  72. expected_complete_mpu_params=None,
  73. ):
  74. # Add all responses needed to do the copy of the object.
  75. # Should account for both ranged and nonranged downloads.
  76. stubbed_responses = self.create_stubbed_responses()[1:]
  77. # If the length of copy responses is greater than one then it is
  78. # a multipart copy.
  79. copy_responses = stubbed_responses[0:1]
  80. if len(stubbed_responses) > 1:
  81. copy_responses = stubbed_responses[1:-1]
  82. # Add the expected create multipart upload params.
  83. if expected_create_mpu_params:
  84. stubbed_responses[0][
  85. 'expected_params'
  86. ] = expected_create_mpu_params
  87. # Add any expected copy parameters.
  88. if expected_copy_params:
  89. for i, copy_response in enumerate(copy_responses):
  90. if isinstance(expected_copy_params, list):
  91. copy_response['expected_params'] = expected_copy_params[i]
  92. else:
  93. copy_response['expected_params'] = expected_copy_params
  94. # Add the expected complete multipart upload params.
  95. if expected_complete_mpu_params:
  96. stubbed_responses[-1][
  97. 'expected_params'
  98. ] = expected_complete_mpu_params
  99. # Add the responses to the stubber.
  100. for stubbed_response in stubbed_responses:
  101. self.stubber.add_response(**stubbed_response)
  102. def test_can_provide_file_size(self):
  103. self.add_successful_copy_responses()
  104. call_kwargs = self.create_call_kwargs()
  105. call_kwargs['subscribers'] = [FileSizeProvider(len(self.content))]
  106. future = self.manager.copy(**call_kwargs)
  107. future.result()
  108. # The HeadObject should have not happened and should have been able
  109. # to successfully copy the file.
  110. self.stubber.assert_no_pending_responses()
  111. def test_provide_copy_source_as_dict(self):
  112. self.copy_source['VersionId'] = 'mysourceversionid'
  113. expected_params = {
  114. 'Bucket': 'mysourcebucket',
  115. 'Key': 'mysourcekey',
  116. 'VersionId': 'mysourceversionid',
  117. }
  118. self.add_head_object_response(expected_params=expected_params)
  119. self.add_successful_copy_responses()
  120. future = self.manager.copy(**self.create_call_kwargs())
  121. future.result()
  122. self.stubber.assert_no_pending_responses()
  123. def test_invalid_copy_source(self):
  124. self.copy_source = ['bucket', 'key']
  125. future = self.manager.copy(**self.create_call_kwargs())
  126. with self.assertRaises(TypeError):
  127. future.result()
  128. def test_provide_copy_source_client(self):
  129. source_client = self.session.create_client(
  130. 's3',
  131. 'eu-central-1',
  132. aws_access_key_id='foo',
  133. aws_secret_access_key='bar',
  134. )
  135. source_stubber = Stubber(source_client)
  136. source_stubber.activate()
  137. self.addCleanup(source_stubber.deactivate)
  138. self.add_head_object_response(stubber=source_stubber)
  139. self.add_successful_copy_responses()
  140. call_kwargs = self.create_call_kwargs()
  141. call_kwargs['source_client'] = source_client
  142. future = self.manager.copy(**call_kwargs)
  143. future.result()
  144. # Make sure that all of the responses were properly
  145. # used for both clients.
  146. source_stubber.assert_no_pending_responses()
  147. self.stubber.assert_no_pending_responses()
  148. class TestNonMultipartCopy(BaseCopyTest):
  149. __test__ = True
  150. def test_copy(self):
  151. expected_head_params = {
  152. 'Bucket': 'mysourcebucket',
  153. 'Key': 'mysourcekey',
  154. }
  155. expected_copy_object = {
  156. 'Bucket': self.bucket,
  157. 'Key': self.key,
  158. 'CopySource': self.copy_source,
  159. }
  160. self.add_head_object_response(expected_params=expected_head_params)
  161. self.add_successful_copy_responses(
  162. expected_copy_params=expected_copy_object
  163. )
  164. future = self.manager.copy(**self.create_call_kwargs())
  165. future.result()
  166. self.stubber.assert_no_pending_responses()
  167. def test_copy_with_extra_args(self):
  168. self.extra_args['MetadataDirective'] = 'REPLACE'
  169. expected_head_params = {
  170. 'Bucket': 'mysourcebucket',
  171. 'Key': 'mysourcekey',
  172. }
  173. expected_copy_object = {
  174. 'Bucket': self.bucket,
  175. 'Key': self.key,
  176. 'CopySource': self.copy_source,
  177. 'MetadataDirective': 'REPLACE',
  178. }
  179. self.add_head_object_response(expected_params=expected_head_params)
  180. self.add_successful_copy_responses(
  181. expected_copy_params=expected_copy_object
  182. )
  183. call_kwargs = self.create_call_kwargs()
  184. call_kwargs['extra_args'] = self.extra_args
  185. future = self.manager.copy(**call_kwargs)
  186. future.result()
  187. self.stubber.assert_no_pending_responses()
  188. def test_copy_maps_extra_args_to_head_object(self):
  189. self.extra_args['CopySourceSSECustomerAlgorithm'] = 'AES256'
  190. expected_head_params = {
  191. 'Bucket': 'mysourcebucket',
  192. 'Key': 'mysourcekey',
  193. 'SSECustomerAlgorithm': 'AES256',
  194. }
  195. expected_copy_object = {
  196. 'Bucket': self.bucket,
  197. 'Key': self.key,
  198. 'CopySource': self.copy_source,
  199. 'CopySourceSSECustomerAlgorithm': 'AES256',
  200. }
  201. self.add_head_object_response(expected_params=expected_head_params)
  202. self.add_successful_copy_responses(
  203. expected_copy_params=expected_copy_object
  204. )
  205. call_kwargs = self.create_call_kwargs()
  206. call_kwargs['extra_args'] = self.extra_args
  207. future = self.manager.copy(**call_kwargs)
  208. future.result()
  209. self.stubber.assert_no_pending_responses()
  210. def test_allowed_copy_params_are_valid(self):
  211. op_model = self.client.meta.service_model.operation_model('CopyObject')
  212. for allowed_upload_arg in self._manager.ALLOWED_COPY_ARGS:
  213. self.assertIn(allowed_upload_arg, op_model.input_shape.members)
  214. def test_copy_with_tagging(self):
  215. extra_args = {'Tagging': 'tag1=val1', 'TaggingDirective': 'REPLACE'}
  216. self.add_head_object_response()
  217. self.add_successful_copy_responses(
  218. expected_copy_params={
  219. 'Bucket': self.bucket,
  220. 'Key': self.key,
  221. 'CopySource': self.copy_source,
  222. 'Tagging': 'tag1=val1',
  223. 'TaggingDirective': 'REPLACE',
  224. }
  225. )
  226. future = self.manager.copy(
  227. self.copy_source, self.bucket, self.key, extra_args
  228. )
  229. future.result()
  230. self.stubber.assert_no_pending_responses()
  231. def test_raise_exception_on_s3_object_lambda_resource(self):
  232. s3_object_lambda_arn = (
  233. 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  234. 'accesspoint:my-accesspoint'
  235. )
  236. with self.assertRaisesRegex(ValueError, 'methods do not support'):
  237. self.manager.copy(self.copy_source, s3_object_lambda_arn, self.key)
  238. def test_raise_exception_on_s3_object_lambda_resource_as_source(self):
  239. source = {
  240. 'Bucket': 'arn:aws:s3-object-lambda:us-west-2:123456789012:'
  241. 'accesspoint:my-accesspoint'
  242. }
  243. with self.assertRaisesRegex(ValueError, 'methods do not support'):
  244. self.manager.copy(source, self.bucket, self.key)
  245. class TestMultipartCopy(BaseCopyTest):
  246. __test__ = True
  247. def setUp(self):
  248. super().setUp()
  249. self.config = TransferConfig(
  250. max_request_concurrency=1,
  251. multipart_threshold=1,
  252. multipart_chunksize=4,
  253. )
  254. self._manager = TransferManager(self.client, self.config)
  255. def create_stubbed_responses(self):
  256. return [
  257. {
  258. 'method': 'head_object',
  259. 'service_response': {'ContentLength': len(self.content)},
  260. },
  261. {
  262. 'method': 'create_multipart_upload',
  263. 'service_response': {'UploadId': 'my-upload-id'},
  264. },
  265. {
  266. 'method': 'upload_part_copy',
  267. 'service_response': {'CopyPartResult': {'ETag': 'etag-1'}},
  268. },
  269. {
  270. 'method': 'upload_part_copy',
  271. 'service_response': {'CopyPartResult': {'ETag': 'etag-2'}},
  272. },
  273. {
  274. 'method': 'upload_part_copy',
  275. 'service_response': {'CopyPartResult': {'ETag': 'etag-3'}},
  276. },
  277. {'method': 'complete_multipart_upload', 'service_response': {}},
  278. ]
  279. def create_expected_progress_callback_info(self):
  280. # Note that last read is from the empty sentinel indicating
  281. # that the stream is done.
  282. return [
  283. {'bytes_transferred': MIN_UPLOAD_CHUNKSIZE},
  284. {'bytes_transferred': MIN_UPLOAD_CHUNKSIZE},
  285. {'bytes_transferred': self.half_chunksize},
  286. ]
  287. def add_create_multipart_upload_response(self):
  288. self.stubber.add_response(**self.create_stubbed_responses()[1])
  289. def _get_expected_params(self):
  290. upload_id = 'my-upload-id'
  291. # Add expected parameters to the head object
  292. expected_head_params = {
  293. 'Bucket': 'mysourcebucket',
  294. 'Key': 'mysourcekey',
  295. }
  296. # Add expected parameters for the create multipart
  297. expected_create_mpu_params = {
  298. 'Bucket': self.bucket,
  299. 'Key': self.key,
  300. }
  301. expected_copy_params = []
  302. # Add expected parameters to the copy part
  303. ranges = [
  304. 'bytes=0-5242879',
  305. 'bytes=5242880-10485759',
  306. 'bytes=10485760-13107199',
  307. ]
  308. for i, range_val in enumerate(ranges):
  309. expected_copy_params.append(
  310. {
  311. 'Bucket': self.bucket,
  312. 'Key': self.key,
  313. 'CopySource': self.copy_source,
  314. 'UploadId': upload_id,
  315. 'PartNumber': i + 1,
  316. 'CopySourceRange': range_val,
  317. }
  318. )
  319. # Add expected parameters for the complete multipart
  320. expected_complete_mpu_params = {
  321. 'Bucket': self.bucket,
  322. 'Key': self.key,
  323. 'UploadId': upload_id,
  324. 'MultipartUpload': {
  325. 'Parts': [
  326. {'ETag': 'etag-1', 'PartNumber': 1},
  327. {'ETag': 'etag-2', 'PartNumber': 2},
  328. {'ETag': 'etag-3', 'PartNumber': 3},
  329. ]
  330. },
  331. }
  332. return expected_head_params, {
  333. 'expected_create_mpu_params': expected_create_mpu_params,
  334. 'expected_copy_params': expected_copy_params,
  335. 'expected_complete_mpu_params': expected_complete_mpu_params,
  336. }
  337. def _add_params_to_expected_params(
  338. self, add_copy_kwargs, operation_types, new_params
  339. ):
  340. expected_params_to_update = []
  341. for operation_type in operation_types:
  342. add_copy_kwargs_key = 'expected_' + operation_type + '_params'
  343. expected_params = add_copy_kwargs[add_copy_kwargs_key]
  344. if isinstance(expected_params, list):
  345. expected_params_to_update.extend(expected_params)
  346. else:
  347. expected_params_to_update.append(expected_params)
  348. for expected_params in expected_params_to_update:
  349. expected_params.update(new_params)
  350. def test_copy(self):
  351. head_params, add_copy_kwargs = self._get_expected_params()
  352. self.add_head_object_response(expected_params=head_params)
  353. self.add_successful_copy_responses(**add_copy_kwargs)
  354. future = self.manager.copy(**self.create_call_kwargs())
  355. future.result()
  356. self.stubber.assert_no_pending_responses()
  357. def test_copy_with_extra_args(self):
  358. # This extra argument should be added to the head object,
  359. # the create multipart upload, and upload part copy.
  360. self.extra_args['RequestPayer'] = 'requester'
  361. head_params, add_copy_kwargs = self._get_expected_params()
  362. head_params.update(self.extra_args)
  363. self.add_head_object_response(expected_params=head_params)
  364. self._add_params_to_expected_params(
  365. add_copy_kwargs,
  366. ['create_mpu', 'copy', 'complete_mpu'],
  367. self.extra_args,
  368. )
  369. self.add_successful_copy_responses(**add_copy_kwargs)
  370. call_kwargs = self.create_call_kwargs()
  371. call_kwargs['extra_args'] = self.extra_args
  372. future = self.manager.copy(**call_kwargs)
  373. future.result()
  374. self.stubber.assert_no_pending_responses()
  375. def test_copy_blacklists_args_to_create_multipart(self):
  376. # This argument can never be used for multipart uploads
  377. self.extra_args['MetadataDirective'] = 'COPY'
  378. head_params, add_copy_kwargs = self._get_expected_params()
  379. self.add_head_object_response(expected_params=head_params)
  380. self.add_successful_copy_responses(**add_copy_kwargs)
  381. call_kwargs = self.create_call_kwargs()
  382. call_kwargs['extra_args'] = self.extra_args
  383. future = self.manager.copy(**call_kwargs)
  384. future.result()
  385. self.stubber.assert_no_pending_responses()
  386. def test_copy_args_to_only_create_multipart(self):
  387. self.extra_args['ACL'] = 'private'
  388. head_params, add_copy_kwargs = self._get_expected_params()
  389. self.add_head_object_response(expected_params=head_params)
  390. self._add_params_to_expected_params(
  391. add_copy_kwargs, ['create_mpu'], self.extra_args
  392. )
  393. self.add_successful_copy_responses(**add_copy_kwargs)
  394. call_kwargs = self.create_call_kwargs()
  395. call_kwargs['extra_args'] = self.extra_args
  396. future = self.manager.copy(**call_kwargs)
  397. future.result()
  398. self.stubber.assert_no_pending_responses()
  399. def test_copy_passes_args_to_create_multipart_and_upload_part(self):
  400. # This will only be used for the complete multipart upload
  401. # and upload part.
  402. self.extra_args['SSECustomerAlgorithm'] = 'AES256'
  403. head_params, add_copy_kwargs = self._get_expected_params()
  404. self.add_head_object_response(expected_params=head_params)
  405. self._add_params_to_expected_params(
  406. add_copy_kwargs, ['create_mpu', 'copy'], self.extra_args
  407. )
  408. self.add_successful_copy_responses(**add_copy_kwargs)
  409. call_kwargs = self.create_call_kwargs()
  410. call_kwargs['extra_args'] = self.extra_args
  411. future = self.manager.copy(**call_kwargs)
  412. future.result()
  413. self.stubber.assert_no_pending_responses()
  414. def test_copy_maps_extra_args_to_head_object(self):
  415. self.extra_args['CopySourceSSECustomerAlgorithm'] = 'AES256'
  416. head_params, add_copy_kwargs = self._get_expected_params()
  417. # The CopySourceSSECustomerAlgorithm needs to get mapped to
  418. # SSECustomerAlgorithm for HeadObject
  419. head_params['SSECustomerAlgorithm'] = 'AES256'
  420. self.add_head_object_response(expected_params=head_params)
  421. # However, it needs to remain the same for UploadPartCopy.
  422. self._add_params_to_expected_params(
  423. add_copy_kwargs, ['copy'], self.extra_args
  424. )
  425. self.add_successful_copy_responses(**add_copy_kwargs)
  426. call_kwargs = self.create_call_kwargs()
  427. call_kwargs['extra_args'] = self.extra_args
  428. future = self.manager.copy(**call_kwargs)
  429. future.result()
  430. self.stubber.assert_no_pending_responses()
  431. def test_abort_on_failure(self):
  432. # First add the head object and create multipart upload
  433. self.add_head_object_response()
  434. self.add_create_multipart_upload_response()
  435. # Cause an error on upload_part_copy
  436. self.stubber.add_client_error('upload_part_copy', 'ArbitraryFailure')
  437. # Add the abort multipart to ensure it gets cleaned up on failure
  438. self.stubber.add_response(
  439. 'abort_multipart_upload',
  440. service_response={},
  441. expected_params={
  442. 'Bucket': self.bucket,
  443. 'Key': self.key,
  444. 'UploadId': 'my-upload-id',
  445. },
  446. )
  447. future = self.manager.copy(**self.create_call_kwargs())
  448. with self.assertRaisesRegex(ClientError, 'ArbitraryFailure'):
  449. future.result()
  450. self.stubber.assert_no_pending_responses()
  451. def test_mp_copy_with_tagging_directive(self):
  452. extra_args = {'Tagging': 'tag1=val1', 'TaggingDirective': 'REPLACE'}
  453. self.add_head_object_response()
  454. self.add_successful_copy_responses(
  455. expected_create_mpu_params={
  456. 'Bucket': self.bucket,
  457. 'Key': self.key,
  458. 'Tagging': 'tag1=val1',
  459. }
  460. )
  461. future = self.manager.copy(
  462. self.copy_source, self.bucket, self.key, extra_args
  463. )
  464. future.result()
  465. self.stubber.assert_no_pending_responses()