copies.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import math
  15. from s3transfer.tasks import (
  16. CompleteMultipartUploadTask,
  17. CreateMultipartUploadTask,
  18. SubmissionTask,
  19. Task,
  20. )
  21. from s3transfer.utils import (
  22. ChunksizeAdjuster,
  23. calculate_range_parameter,
  24. get_callbacks,
  25. get_filtered_dict,
  26. )
  27. class CopySubmissionTask(SubmissionTask):
  28. """Task for submitting tasks to execute a copy"""
  29. EXTRA_ARGS_TO_HEAD_ARGS_MAPPING = {
  30. 'CopySourceIfMatch': 'IfMatch',
  31. 'CopySourceIfModifiedSince': 'IfModifiedSince',
  32. 'CopySourceIfNoneMatch': 'IfNoneMatch',
  33. 'CopySourceIfUnmodifiedSince': 'IfUnmodifiedSince',
  34. 'CopySourceSSECustomerKey': 'SSECustomerKey',
  35. 'CopySourceSSECustomerAlgorithm': 'SSECustomerAlgorithm',
  36. 'CopySourceSSECustomerKeyMD5': 'SSECustomerKeyMD5',
  37. 'RequestPayer': 'RequestPayer',
  38. 'ExpectedBucketOwner': 'ExpectedBucketOwner',
  39. }
  40. UPLOAD_PART_COPY_ARGS = [
  41. 'CopySourceIfMatch',
  42. 'CopySourceIfModifiedSince',
  43. 'CopySourceIfNoneMatch',
  44. 'CopySourceIfUnmodifiedSince',
  45. 'CopySourceSSECustomerKey',
  46. 'CopySourceSSECustomerAlgorithm',
  47. 'CopySourceSSECustomerKeyMD5',
  48. 'SSECustomerKey',
  49. 'SSECustomerAlgorithm',
  50. 'SSECustomerKeyMD5',
  51. 'RequestPayer',
  52. 'ExpectedBucketOwner',
  53. ]
  54. CREATE_MULTIPART_ARGS_BLACKLIST = [
  55. 'CopySourceIfMatch',
  56. 'CopySourceIfModifiedSince',
  57. 'CopySourceIfNoneMatch',
  58. 'CopySourceIfUnmodifiedSince',
  59. 'CopySourceSSECustomerKey',
  60. 'CopySourceSSECustomerAlgorithm',
  61. 'CopySourceSSECustomerKeyMD5',
  62. 'MetadataDirective',
  63. 'TaggingDirective',
  64. ]
  65. COMPLETE_MULTIPART_ARGS = ['RequestPayer', 'ExpectedBucketOwner']
  66. def _submit(
  67. self, client, config, osutil, request_executor, transfer_future
  68. ):
  69. """
  70. :param client: The client associated with the transfer manager
  71. :type config: s3transfer.manager.TransferConfig
  72. :param config: The transfer config associated with the transfer
  73. manager
  74. :type osutil: s3transfer.utils.OSUtil
  75. :param osutil: The os utility associated to the transfer manager
  76. :type request_executor: s3transfer.futures.BoundedExecutor
  77. :param request_executor: The request executor associated with the
  78. transfer manager
  79. :type transfer_future: s3transfer.futures.TransferFuture
  80. :param transfer_future: The transfer future associated with the
  81. transfer request that tasks are being submitted for
  82. """
  83. # Determine the size if it was not provided
  84. if transfer_future.meta.size is None:
  85. # If a size was not provided figure out the size for the
  86. # user. Note that we will only use the client provided to
  87. # the TransferManager. If the object is outside of the region
  88. # of the client, they may have to provide the file size themselves
  89. # with a completely new client.
  90. call_args = transfer_future.meta.call_args
  91. head_object_request = (
  92. self._get_head_object_request_from_copy_source(
  93. call_args.copy_source
  94. )
  95. )
  96. extra_args = call_args.extra_args
  97. # Map any values that may be used in the head object that is
  98. # used in the copy object
  99. for param, value in extra_args.items():
  100. if param in self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING:
  101. head_object_request[
  102. self.EXTRA_ARGS_TO_HEAD_ARGS_MAPPING[param]
  103. ] = value
  104. response = call_args.source_client.head_object(
  105. **head_object_request
  106. )
  107. transfer_future.meta.provide_transfer_size(
  108. response['ContentLength']
  109. )
  110. # If it is greater than threshold do a multipart copy, otherwise
  111. # do a regular copy object.
  112. if transfer_future.meta.size < config.multipart_threshold:
  113. self._submit_copy_request(
  114. client, config, osutil, request_executor, transfer_future
  115. )
  116. else:
  117. self._submit_multipart_request(
  118. client, config, osutil, request_executor, transfer_future
  119. )
  120. def _submit_copy_request(
  121. self, client, config, osutil, request_executor, transfer_future
  122. ):
  123. call_args = transfer_future.meta.call_args
  124. # Get the needed progress callbacks for the task
  125. progress_callbacks = get_callbacks(transfer_future, 'progress')
  126. # Submit the request of a single copy.
  127. self._transfer_coordinator.submit(
  128. request_executor,
  129. CopyObjectTask(
  130. transfer_coordinator=self._transfer_coordinator,
  131. main_kwargs={
  132. 'client': client,
  133. 'copy_source': call_args.copy_source,
  134. 'bucket': call_args.bucket,
  135. 'key': call_args.key,
  136. 'extra_args': call_args.extra_args,
  137. 'callbacks': progress_callbacks,
  138. 'size': transfer_future.meta.size,
  139. },
  140. is_final=True,
  141. ),
  142. )
  143. def _submit_multipart_request(
  144. self, client, config, osutil, request_executor, transfer_future
  145. ):
  146. call_args = transfer_future.meta.call_args
  147. # Submit the request to create a multipart upload and make sure it
  148. # does not include any of the arguments used for copy part.
  149. create_multipart_extra_args = {}
  150. for param, val in call_args.extra_args.items():
  151. if param not in self.CREATE_MULTIPART_ARGS_BLACKLIST:
  152. create_multipart_extra_args[param] = val
  153. create_multipart_future = self._transfer_coordinator.submit(
  154. request_executor,
  155. CreateMultipartUploadTask(
  156. transfer_coordinator=self._transfer_coordinator,
  157. main_kwargs={
  158. 'client': client,
  159. 'bucket': call_args.bucket,
  160. 'key': call_args.key,
  161. 'extra_args': create_multipart_extra_args,
  162. },
  163. ),
  164. )
  165. # Determine how many parts are needed based on filesize and
  166. # desired chunksize.
  167. part_size = config.multipart_chunksize
  168. adjuster = ChunksizeAdjuster()
  169. part_size = adjuster.adjust_chunksize(
  170. part_size, transfer_future.meta.size
  171. )
  172. num_parts = int(
  173. math.ceil(transfer_future.meta.size / float(part_size))
  174. )
  175. # Submit requests to upload the parts of the file.
  176. part_futures = []
  177. progress_callbacks = get_callbacks(transfer_future, 'progress')
  178. for part_number in range(1, num_parts + 1):
  179. extra_part_args = self._extra_upload_part_args(
  180. call_args.extra_args
  181. )
  182. # The part number for upload part starts at 1 while the
  183. # range parameter starts at zero, so just subtract 1 off of
  184. # the part number
  185. extra_part_args['CopySourceRange'] = calculate_range_parameter(
  186. part_size,
  187. part_number - 1,
  188. num_parts,
  189. transfer_future.meta.size,
  190. )
  191. # Get the size of the part copy as well for the progress
  192. # callbacks.
  193. size = self._get_transfer_size(
  194. part_size,
  195. part_number - 1,
  196. num_parts,
  197. transfer_future.meta.size,
  198. )
  199. part_futures.append(
  200. self._transfer_coordinator.submit(
  201. request_executor,
  202. CopyPartTask(
  203. transfer_coordinator=self._transfer_coordinator,
  204. main_kwargs={
  205. 'client': client,
  206. 'copy_source': call_args.copy_source,
  207. 'bucket': call_args.bucket,
  208. 'key': call_args.key,
  209. 'part_number': part_number,
  210. 'extra_args': extra_part_args,
  211. 'callbacks': progress_callbacks,
  212. 'size': size,
  213. },
  214. pending_main_kwargs={
  215. 'upload_id': create_multipart_future
  216. },
  217. ),
  218. )
  219. )
  220. complete_multipart_extra_args = self._extra_complete_multipart_args(
  221. call_args.extra_args
  222. )
  223. # Submit the request to complete the multipart upload.
  224. self._transfer_coordinator.submit(
  225. request_executor,
  226. CompleteMultipartUploadTask(
  227. transfer_coordinator=self._transfer_coordinator,
  228. main_kwargs={
  229. 'client': client,
  230. 'bucket': call_args.bucket,
  231. 'key': call_args.key,
  232. 'extra_args': complete_multipart_extra_args,
  233. },
  234. pending_main_kwargs={
  235. 'upload_id': create_multipart_future,
  236. 'parts': part_futures,
  237. },
  238. is_final=True,
  239. ),
  240. )
  241. def _get_head_object_request_from_copy_source(self, copy_source):
  242. if isinstance(copy_source, dict):
  243. return copy.copy(copy_source)
  244. else:
  245. raise TypeError(
  246. 'Expecting dictionary formatted: '
  247. '{"Bucket": bucket_name, "Key": key} '
  248. 'but got %s or type %s.' % (copy_source, type(copy_source))
  249. )
  250. def _extra_upload_part_args(self, extra_args):
  251. # Only the args in COPY_PART_ARGS actually need to be passed
  252. # onto the upload_part_copy calls.
  253. return get_filtered_dict(extra_args, self.UPLOAD_PART_COPY_ARGS)
  254. def _extra_complete_multipart_args(self, extra_args):
  255. return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
  256. def _get_transfer_size(
  257. self, part_size, part_index, num_parts, total_transfer_size
  258. ):
  259. if part_index == num_parts - 1:
  260. # The last part may be different in size then the rest of the
  261. # parts.
  262. return total_transfer_size - (part_index * part_size)
  263. return part_size
  264. class CopyObjectTask(Task):
  265. """Task to do a nonmultipart copy"""
  266. def _main(
  267. self, client, copy_source, bucket, key, extra_args, callbacks, size
  268. ):
  269. """
  270. :param client: The client to use when calling PutObject
  271. :param copy_source: The CopySource parameter to use
  272. :param bucket: The name of the bucket to copy to
  273. :param key: The name of the key to copy to
  274. :param extra_args: A dictionary of any extra arguments that may be
  275. used in the upload.
  276. :param callbacks: List of callbacks to call after copy
  277. :param size: The size of the transfer. This value is passed into
  278. the callbacks
  279. """
  280. client.copy_object(
  281. CopySource=copy_source, Bucket=bucket, Key=key, **extra_args
  282. )
  283. for callback in callbacks:
  284. callback(bytes_transferred=size)
  285. class CopyPartTask(Task):
  286. """Task to upload a part in a multipart copy"""
  287. def _main(
  288. self,
  289. client,
  290. copy_source,
  291. bucket,
  292. key,
  293. upload_id,
  294. part_number,
  295. extra_args,
  296. callbacks,
  297. size,
  298. ):
  299. """
  300. :param client: The client to use when calling PutObject
  301. :param copy_source: The CopySource parameter to use
  302. :param bucket: The name of the bucket to upload to
  303. :param key: The name of the key to upload to
  304. :param upload_id: The id of the upload
  305. :param part_number: The number representing the part of the multipart
  306. upload
  307. :param extra_args: A dictionary of any extra arguments that may be
  308. used in the upload.
  309. :param callbacks: List of callbacks to call after copy part
  310. :param size: The size of the transfer. This value is passed into
  311. the callbacks
  312. :rtype: dict
  313. :returns: A dictionary representing a part::
  314. {'Etag': etag_value, 'PartNumber': part_number}
  315. This value can be appended to a list to be used to complete
  316. the multipart upload.
  317. """
  318. response = client.upload_part_copy(
  319. CopySource=copy_source,
  320. Bucket=bucket,
  321. Key=key,
  322. UploadId=upload_id,
  323. PartNumber=part_number,
  324. **extra_args
  325. )
  326. for callback in callbacks:
  327. callback(bytes_transferred=size)
  328. etag = response['CopyPartResult']['ETag']
  329. return {'ETag': etag, 'PartNumber': part_number}