upload.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import math
  14. from io import BytesIO
  15. from s3transfer.compat import readable, seekable
  16. from s3transfer.futures import IN_MEMORY_UPLOAD_TAG
  17. from s3transfer.tasks import (
  18. CompleteMultipartUploadTask,
  19. CreateMultipartUploadTask,
  20. SubmissionTask,
  21. Task,
  22. )
  23. from s3transfer.utils import (
  24. ChunksizeAdjuster,
  25. DeferredOpenFile,
  26. get_callbacks,
  27. get_filtered_dict,
  28. )
  29. class AggregatedProgressCallback:
  30. def __init__(self, callbacks, threshold=1024 * 256):
  31. """Aggregates progress updates for every provided progress callback
  32. :type callbacks: A list of functions that accepts bytes_transferred
  33. as a single argument
  34. :param callbacks: The callbacks to invoke when threshold is reached
  35. :type threshold: int
  36. :param threshold: The progress threshold in which to take the
  37. aggregated progress and invoke the progress callback with that
  38. aggregated progress total
  39. """
  40. self._callbacks = callbacks
  41. self._threshold = threshold
  42. self._bytes_seen = 0
  43. def __call__(self, bytes_transferred):
  44. self._bytes_seen += bytes_transferred
  45. if self._bytes_seen >= self._threshold:
  46. self._trigger_callbacks()
  47. def flush(self):
  48. """Flushes out any progress that has not been sent to its callbacks"""
  49. if self._bytes_seen > 0:
  50. self._trigger_callbacks()
  51. def _trigger_callbacks(self):
  52. for callback in self._callbacks:
  53. callback(bytes_transferred=self._bytes_seen)
  54. self._bytes_seen = 0
  55. class InterruptReader:
  56. """Wrapper that can interrupt reading using an error
  57. It uses a transfer coordinator to propagate an error if it notices
  58. that a read is being made while the file is being read from.
  59. :type fileobj: file-like obj
  60. :param fileobj: The file-like object to read from
  61. :type transfer_coordinator: s3transfer.futures.TransferCoordinator
  62. :param transfer_coordinator: The transfer coordinator to use if the
  63. reader needs to be interrupted.
  64. """
  65. def __init__(self, fileobj, transfer_coordinator):
  66. self._fileobj = fileobj
  67. self._transfer_coordinator = transfer_coordinator
  68. def read(self, amount=None):
  69. # If there is an exception, then raise the exception.
  70. # We raise an error instead of returning no bytes because for
  71. # requests where the content length and md5 was sent, it will
  72. # cause md5 mismatches and retries as there was no indication that
  73. # the stream being read from encountered any issues.
  74. if self._transfer_coordinator.exception:
  75. raise self._transfer_coordinator.exception
  76. return self._fileobj.read(amount)
  77. def seek(self, where, whence=0):
  78. self._fileobj.seek(where, whence)
  79. def tell(self):
  80. return self._fileobj.tell()
  81. def close(self):
  82. self._fileobj.close()
  83. def __enter__(self):
  84. return self
  85. def __exit__(self, *args, **kwargs):
  86. self.close()
  87. class UploadInputManager:
  88. """Base manager class for handling various types of files for uploads
  89. This class is typically used for the UploadSubmissionTask class to help
  90. determine the following:
  91. * How to determine the size of the file
  92. * How to determine if a multipart upload is required
  93. * How to retrieve the body for a PutObject
  94. * How to retrieve the bodies for a set of UploadParts
  95. The answers/implementations differ for the various types of file inputs
  96. that may be accepted. All implementations must subclass and override
  97. public methods from this class.
  98. """
  99. def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
  100. self._osutil = osutil
  101. self._transfer_coordinator = transfer_coordinator
  102. self._bandwidth_limiter = bandwidth_limiter
  103. @classmethod
  104. def is_compatible(cls, upload_source):
  105. """Determines if the source for the upload is compatible with manager
  106. :param upload_source: The source for which the upload will pull data
  107. from.
  108. :returns: True if the manager can handle the type of source specified
  109. otherwise returns False.
  110. """
  111. raise NotImplementedError('must implement _is_compatible()')
  112. def stores_body_in_memory(self, operation_name):
  113. """Whether the body it provides are stored in-memory
  114. :type operation_name: str
  115. :param operation_name: The name of the client operation that the body
  116. is being used for. Valid operation_names are ``put_object`` and
  117. ``upload_part``.
  118. :rtype: boolean
  119. :returns: True if the body returned by the manager will be stored in
  120. memory. False if the manager will not directly store the body in
  121. memory.
  122. """
  123. raise NotImplementedError('must implement store_body_in_memory()')
  124. def provide_transfer_size(self, transfer_future):
  125. """Provides the transfer size of an upload
  126. :type transfer_future: s3transfer.futures.TransferFuture
  127. :param transfer_future: The future associated with upload request
  128. """
  129. raise NotImplementedError('must implement provide_transfer_size()')
  130. def requires_multipart_upload(self, transfer_future, config):
  131. """Determines where a multipart upload is required
  132. :type transfer_future: s3transfer.futures.TransferFuture
  133. :param transfer_future: The future associated with upload request
  134. :type config: s3transfer.manager.TransferConfig
  135. :param config: The config associated to the transfer manager
  136. :rtype: boolean
  137. :returns: True, if the upload should be multipart based on
  138. configuration and size. False, otherwise.
  139. """
  140. raise NotImplementedError('must implement requires_multipart_upload()')
  141. def get_put_object_body(self, transfer_future):
  142. """Returns the body to use for PutObject
  143. :type transfer_future: s3transfer.futures.TransferFuture
  144. :param transfer_future: The future associated with upload request
  145. :type config: s3transfer.manager.TransferConfig
  146. :param config: The config associated to the transfer manager
  147. :rtype: s3transfer.utils.ReadFileChunk
  148. :returns: A ReadFileChunk including all progress callbacks
  149. associated with the transfer future.
  150. """
  151. raise NotImplementedError('must implement get_put_object_body()')
  152. def yield_upload_part_bodies(self, transfer_future, chunksize):
  153. """Yields the part number and body to use for each UploadPart
  154. :type transfer_future: s3transfer.futures.TransferFuture
  155. :param transfer_future: The future associated with upload request
  156. :type chunksize: int
  157. :param chunksize: The chunksize to use for this upload.
  158. :rtype: int, s3transfer.utils.ReadFileChunk
  159. :returns: Yields the part number and the ReadFileChunk including all
  160. progress callbacks associated with the transfer future for that
  161. specific yielded part.
  162. """
  163. raise NotImplementedError('must implement yield_upload_part_bodies()')
  164. def _wrap_fileobj(self, fileobj):
  165. fileobj = InterruptReader(fileobj, self._transfer_coordinator)
  166. if self._bandwidth_limiter:
  167. fileobj = self._bandwidth_limiter.get_bandwith_limited_stream(
  168. fileobj, self._transfer_coordinator, enabled=False
  169. )
  170. return fileobj
  171. def _get_progress_callbacks(self, transfer_future):
  172. callbacks = get_callbacks(transfer_future, 'progress')
  173. # We only want to be wrapping the callbacks if there are callbacks to
  174. # invoke because we do not want to be doing any unnecessary work if
  175. # there are no callbacks to invoke.
  176. if callbacks:
  177. return [AggregatedProgressCallback(callbacks)]
  178. return []
  179. def _get_close_callbacks(self, aggregated_progress_callbacks):
  180. return [callback.flush for callback in aggregated_progress_callbacks]
  181. class UploadFilenameInputManager(UploadInputManager):
  182. """Upload utility for filenames"""
  183. @classmethod
  184. def is_compatible(cls, upload_source):
  185. return isinstance(upload_source, str)
  186. def stores_body_in_memory(self, operation_name):
  187. return False
  188. def provide_transfer_size(self, transfer_future):
  189. transfer_future.meta.provide_transfer_size(
  190. self._osutil.get_file_size(transfer_future.meta.call_args.fileobj)
  191. )
  192. def requires_multipart_upload(self, transfer_future, config):
  193. return transfer_future.meta.size >= config.multipart_threshold
  194. def get_put_object_body(self, transfer_future):
  195. # Get a file-like object for the given input
  196. fileobj, full_size = self._get_put_object_fileobj_with_full_size(
  197. transfer_future
  198. )
  199. # Wrap fileobj with interrupt reader that will quickly cancel
  200. # uploads if needed instead of having to wait for the socket
  201. # to completely read all of the data.
  202. fileobj = self._wrap_fileobj(fileobj)
  203. callbacks = self._get_progress_callbacks(transfer_future)
  204. close_callbacks = self._get_close_callbacks(callbacks)
  205. size = transfer_future.meta.size
  206. # Return the file-like object wrapped into a ReadFileChunk to get
  207. # progress.
  208. return self._osutil.open_file_chunk_reader_from_fileobj(
  209. fileobj=fileobj,
  210. chunk_size=size,
  211. full_file_size=full_size,
  212. callbacks=callbacks,
  213. close_callbacks=close_callbacks,
  214. )
  215. def yield_upload_part_bodies(self, transfer_future, chunksize):
  216. full_file_size = transfer_future.meta.size
  217. num_parts = self._get_num_parts(transfer_future, chunksize)
  218. for part_number in range(1, num_parts + 1):
  219. callbacks = self._get_progress_callbacks(transfer_future)
  220. close_callbacks = self._get_close_callbacks(callbacks)
  221. start_byte = chunksize * (part_number - 1)
  222. # Get a file-like object for that part and the size of the full
  223. # file size for the associated file-like object for that part.
  224. fileobj, full_size = self._get_upload_part_fileobj_with_full_size(
  225. transfer_future.meta.call_args.fileobj,
  226. start_byte=start_byte,
  227. part_size=chunksize,
  228. full_file_size=full_file_size,
  229. )
  230. # Wrap fileobj with interrupt reader that will quickly cancel
  231. # uploads if needed instead of having to wait for the socket
  232. # to completely read all of the data.
  233. fileobj = self._wrap_fileobj(fileobj)
  234. # Wrap the file-like object into a ReadFileChunk to get progress.
  235. read_file_chunk = self._osutil.open_file_chunk_reader_from_fileobj(
  236. fileobj=fileobj,
  237. chunk_size=chunksize,
  238. full_file_size=full_size,
  239. callbacks=callbacks,
  240. close_callbacks=close_callbacks,
  241. )
  242. yield part_number, read_file_chunk
  243. def _get_deferred_open_file(self, fileobj, start_byte):
  244. fileobj = DeferredOpenFile(
  245. fileobj, start_byte, open_function=self._osutil.open
  246. )
  247. return fileobj
  248. def _get_put_object_fileobj_with_full_size(self, transfer_future):
  249. fileobj = transfer_future.meta.call_args.fileobj
  250. size = transfer_future.meta.size
  251. return self._get_deferred_open_file(fileobj, 0), size
  252. def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
  253. start_byte = kwargs['start_byte']
  254. full_size = kwargs['full_file_size']
  255. return self._get_deferred_open_file(fileobj, start_byte), full_size
  256. def _get_num_parts(self, transfer_future, part_size):
  257. return int(math.ceil(transfer_future.meta.size / float(part_size)))
  258. class UploadSeekableInputManager(UploadFilenameInputManager):
  259. """Upload utility for an open file object"""
  260. @classmethod
  261. def is_compatible(cls, upload_source):
  262. return readable(upload_source) and seekable(upload_source)
  263. def stores_body_in_memory(self, operation_name):
  264. if operation_name == 'put_object':
  265. return False
  266. else:
  267. return True
  268. def provide_transfer_size(self, transfer_future):
  269. fileobj = transfer_future.meta.call_args.fileobj
  270. # To determine size, first determine the starting position
  271. # Seek to the end and then find the difference in the length
  272. # between the end and start positions.
  273. start_position = fileobj.tell()
  274. fileobj.seek(0, 2)
  275. end_position = fileobj.tell()
  276. fileobj.seek(start_position)
  277. transfer_future.meta.provide_transfer_size(
  278. end_position - start_position
  279. )
  280. def _get_upload_part_fileobj_with_full_size(self, fileobj, **kwargs):
  281. # Note: It is unfortunate that in order to do a multithreaded
  282. # multipart upload we cannot simply copy the filelike object
  283. # since there is not really a mechanism in python (i.e. os.dup
  284. # points to the same OS filehandle which causes concurrency
  285. # issues). So instead we need to read from the fileobj and
  286. # chunk the data out to separate file-like objects in memory.
  287. data = fileobj.read(kwargs['part_size'])
  288. # We return the length of the data instead of the full_file_size
  289. # because we partitioned the data into separate BytesIO objects
  290. # meaning the BytesIO object has no knowledge of its start position
  291. # relative the input source nor access to the rest of the input
  292. # source. So we must treat it as its own standalone file.
  293. return BytesIO(data), len(data)
  294. def _get_put_object_fileobj_with_full_size(self, transfer_future):
  295. fileobj = transfer_future.meta.call_args.fileobj
  296. # The current position needs to be taken into account when retrieving
  297. # the full size of the file.
  298. size = fileobj.tell() + transfer_future.meta.size
  299. return fileobj, size
  300. class UploadNonSeekableInputManager(UploadInputManager):
  301. """Upload utility for a file-like object that cannot seek."""
  302. def __init__(self, osutil, transfer_coordinator, bandwidth_limiter=None):
  303. super().__init__(osutil, transfer_coordinator, bandwidth_limiter)
  304. self._initial_data = b''
  305. @classmethod
  306. def is_compatible(cls, upload_source):
  307. return readable(upload_source)
  308. def stores_body_in_memory(self, operation_name):
  309. return True
  310. def provide_transfer_size(self, transfer_future):
  311. # No-op because there is no way to do this short of reading the entire
  312. # body into memory.
  313. return
  314. def requires_multipart_upload(self, transfer_future, config):
  315. # If the user has set the size, we can use that.
  316. if transfer_future.meta.size is not None:
  317. return transfer_future.meta.size >= config.multipart_threshold
  318. # This is tricky to determine in this case because we can't know how
  319. # large the input is. So to figure it out, we read data into memory
  320. # up until the threshold and compare how much data was actually read
  321. # against the threshold.
  322. fileobj = transfer_future.meta.call_args.fileobj
  323. threshold = config.multipart_threshold
  324. self._initial_data = self._read(fileobj, threshold, False)
  325. if len(self._initial_data) < threshold:
  326. return False
  327. else:
  328. return True
  329. def get_put_object_body(self, transfer_future):
  330. callbacks = self._get_progress_callbacks(transfer_future)
  331. close_callbacks = self._get_close_callbacks(callbacks)
  332. fileobj = transfer_future.meta.call_args.fileobj
  333. body = self._wrap_data(
  334. self._initial_data + fileobj.read(), callbacks, close_callbacks
  335. )
  336. # Zero out the stored data so we don't have additional copies
  337. # hanging around in memory.
  338. self._initial_data = None
  339. return body
  340. def yield_upload_part_bodies(self, transfer_future, chunksize):
  341. file_object = transfer_future.meta.call_args.fileobj
  342. part_number = 0
  343. # Continue reading parts from the file-like object until it is empty.
  344. while True:
  345. callbacks = self._get_progress_callbacks(transfer_future)
  346. close_callbacks = self._get_close_callbacks(callbacks)
  347. part_number += 1
  348. part_content = self._read(file_object, chunksize)
  349. if not part_content:
  350. break
  351. part_object = self._wrap_data(
  352. part_content, callbacks, close_callbacks
  353. )
  354. # Zero out part_content to avoid hanging on to additional data.
  355. part_content = None
  356. yield part_number, part_object
  357. def _read(self, fileobj, amount, truncate=True):
  358. """
  359. Reads a specific amount of data from a stream and returns it. If there
  360. is any data in initial_data, that will be popped out first.
  361. :type fileobj: A file-like object that implements read
  362. :param fileobj: The stream to read from.
  363. :type amount: int
  364. :param amount: The number of bytes to read from the stream.
  365. :type truncate: bool
  366. :param truncate: Whether or not to truncate initial_data after
  367. reading from it.
  368. :return: Generator which generates part bodies from the initial data.
  369. """
  370. # If the the initial data is empty, we simply read from the fileobj
  371. if len(self._initial_data) == 0:
  372. return fileobj.read(amount)
  373. # If the requested number of bytes is less than the amount of
  374. # initial data, pull entirely from initial data.
  375. if amount <= len(self._initial_data):
  376. data = self._initial_data[:amount]
  377. # Truncate initial data so we don't hang onto the data longer
  378. # than we need.
  379. if truncate:
  380. self._initial_data = self._initial_data[amount:]
  381. return data
  382. # At this point there is some initial data left, but not enough to
  383. # satisfy the number of bytes requested. Pull out the remaining
  384. # initial data and read the rest from the fileobj.
  385. amount_to_read = amount - len(self._initial_data)
  386. data = self._initial_data + fileobj.read(amount_to_read)
  387. # Zero out initial data so we don't hang onto the data any more.
  388. if truncate:
  389. self._initial_data = b''
  390. return data
  391. def _wrap_data(self, data, callbacks, close_callbacks):
  392. """
  393. Wraps data with the interrupt reader and the file chunk reader.
  394. :type data: bytes
  395. :param data: The data to wrap.
  396. :type callbacks: list
  397. :param callbacks: The callbacks associated with the transfer future.
  398. :type close_callbacks: list
  399. :param close_callbacks: The callbacks to be called when closing the
  400. wrapper for the data.
  401. :return: Fully wrapped data.
  402. """
  403. fileobj = self._wrap_fileobj(BytesIO(data))
  404. return self._osutil.open_file_chunk_reader_from_fileobj(
  405. fileobj=fileobj,
  406. chunk_size=len(data),
  407. full_file_size=len(data),
  408. callbacks=callbacks,
  409. close_callbacks=close_callbacks,
  410. )
  411. class UploadSubmissionTask(SubmissionTask):
  412. """Task for submitting tasks to execute an upload"""
  413. UPLOAD_PART_ARGS = [
  414. 'ChecksumAlgorithm',
  415. 'SSECustomerKey',
  416. 'SSECustomerAlgorithm',
  417. 'SSECustomerKeyMD5',
  418. 'RequestPayer',
  419. 'ExpectedBucketOwner',
  420. ]
  421. COMPLETE_MULTIPART_ARGS = ['RequestPayer', 'ExpectedBucketOwner']
  422. def _get_upload_input_manager_cls(self, transfer_future):
  423. """Retrieves a class for managing input for an upload based on file type
  424. :type transfer_future: s3transfer.futures.TransferFuture
  425. :param transfer_future: The transfer future for the request
  426. :rtype: class of UploadInputManager
  427. :returns: The appropriate class to use for managing a specific type of
  428. input for uploads.
  429. """
  430. upload_manager_resolver_chain = [
  431. UploadFilenameInputManager,
  432. UploadSeekableInputManager,
  433. UploadNonSeekableInputManager,
  434. ]
  435. fileobj = transfer_future.meta.call_args.fileobj
  436. for upload_manager_cls in upload_manager_resolver_chain:
  437. if upload_manager_cls.is_compatible(fileobj):
  438. return upload_manager_cls
  439. raise RuntimeError(
  440. 'Input {} of type: {} is not supported.'.format(
  441. fileobj, type(fileobj)
  442. )
  443. )
  444. def _submit(
  445. self,
  446. client,
  447. config,
  448. osutil,
  449. request_executor,
  450. transfer_future,
  451. bandwidth_limiter=None,
  452. ):
  453. """
  454. :param client: The client associated with the transfer manager
  455. :type config: s3transfer.manager.TransferConfig
  456. :param config: The transfer config associated with the transfer
  457. manager
  458. :type osutil: s3transfer.utils.OSUtil
  459. :param osutil: The os utility associated to the transfer manager
  460. :type request_executor: s3transfer.futures.BoundedExecutor
  461. :param request_executor: The request executor associated with the
  462. transfer manager
  463. :type transfer_future: s3transfer.futures.TransferFuture
  464. :param transfer_future: The transfer future associated with the
  465. transfer request that tasks are being submitted for
  466. """
  467. upload_input_manager = self._get_upload_input_manager_cls(
  468. transfer_future
  469. )(osutil, self._transfer_coordinator, bandwidth_limiter)
  470. # Determine the size if it was not provided
  471. if transfer_future.meta.size is None:
  472. upload_input_manager.provide_transfer_size(transfer_future)
  473. # Do a multipart upload if needed, otherwise do a regular put object.
  474. if not upload_input_manager.requires_multipart_upload(
  475. transfer_future, config
  476. ):
  477. self._submit_upload_request(
  478. client,
  479. config,
  480. osutil,
  481. request_executor,
  482. transfer_future,
  483. upload_input_manager,
  484. )
  485. else:
  486. self._submit_multipart_request(
  487. client,
  488. config,
  489. osutil,
  490. request_executor,
  491. transfer_future,
  492. upload_input_manager,
  493. )
  494. def _submit_upload_request(
  495. self,
  496. client,
  497. config,
  498. osutil,
  499. request_executor,
  500. transfer_future,
  501. upload_input_manager,
  502. ):
  503. call_args = transfer_future.meta.call_args
  504. # Get any tags that need to be associated to the put object task
  505. put_object_tag = self._get_upload_task_tag(
  506. upload_input_manager, 'put_object'
  507. )
  508. # Submit the request of a single upload.
  509. self._transfer_coordinator.submit(
  510. request_executor,
  511. PutObjectTask(
  512. transfer_coordinator=self._transfer_coordinator,
  513. main_kwargs={
  514. 'client': client,
  515. 'fileobj': upload_input_manager.get_put_object_body(
  516. transfer_future
  517. ),
  518. 'bucket': call_args.bucket,
  519. 'key': call_args.key,
  520. 'extra_args': call_args.extra_args,
  521. },
  522. is_final=True,
  523. ),
  524. tag=put_object_tag,
  525. )
  526. def _submit_multipart_request(
  527. self,
  528. client,
  529. config,
  530. osutil,
  531. request_executor,
  532. transfer_future,
  533. upload_input_manager,
  534. ):
  535. call_args = transfer_future.meta.call_args
  536. # Submit the request to create a multipart upload.
  537. create_multipart_future = self._transfer_coordinator.submit(
  538. request_executor,
  539. CreateMultipartUploadTask(
  540. transfer_coordinator=self._transfer_coordinator,
  541. main_kwargs={
  542. 'client': client,
  543. 'bucket': call_args.bucket,
  544. 'key': call_args.key,
  545. 'extra_args': call_args.extra_args,
  546. },
  547. ),
  548. )
  549. # Submit requests to upload the parts of the file.
  550. part_futures = []
  551. extra_part_args = self._extra_upload_part_args(call_args.extra_args)
  552. # Get any tags that need to be associated to the submitted task
  553. # for upload the data
  554. upload_part_tag = self._get_upload_task_tag(
  555. upload_input_manager, 'upload_part'
  556. )
  557. size = transfer_future.meta.size
  558. adjuster = ChunksizeAdjuster()
  559. chunksize = adjuster.adjust_chunksize(config.multipart_chunksize, size)
  560. part_iterator = upload_input_manager.yield_upload_part_bodies(
  561. transfer_future, chunksize
  562. )
  563. for part_number, fileobj in part_iterator:
  564. part_futures.append(
  565. self._transfer_coordinator.submit(
  566. request_executor,
  567. UploadPartTask(
  568. transfer_coordinator=self._transfer_coordinator,
  569. main_kwargs={
  570. 'client': client,
  571. 'fileobj': fileobj,
  572. 'bucket': call_args.bucket,
  573. 'key': call_args.key,
  574. 'part_number': part_number,
  575. 'extra_args': extra_part_args,
  576. },
  577. pending_main_kwargs={
  578. 'upload_id': create_multipart_future
  579. },
  580. ),
  581. tag=upload_part_tag,
  582. )
  583. )
  584. complete_multipart_extra_args = self._extra_complete_multipart_args(
  585. call_args.extra_args
  586. )
  587. # Submit the request to complete the multipart upload.
  588. self._transfer_coordinator.submit(
  589. request_executor,
  590. CompleteMultipartUploadTask(
  591. transfer_coordinator=self._transfer_coordinator,
  592. main_kwargs={
  593. 'client': client,
  594. 'bucket': call_args.bucket,
  595. 'key': call_args.key,
  596. 'extra_args': complete_multipart_extra_args,
  597. },
  598. pending_main_kwargs={
  599. 'upload_id': create_multipart_future,
  600. 'parts': part_futures,
  601. },
  602. is_final=True,
  603. ),
  604. )
  605. def _extra_upload_part_args(self, extra_args):
  606. # Only the args in UPLOAD_PART_ARGS actually need to be passed
  607. # onto the upload_part calls.
  608. return get_filtered_dict(extra_args, self.UPLOAD_PART_ARGS)
  609. def _extra_complete_multipart_args(self, extra_args):
  610. return get_filtered_dict(extra_args, self.COMPLETE_MULTIPART_ARGS)
  611. def _get_upload_task_tag(self, upload_input_manager, operation_name):
  612. tag = None
  613. if upload_input_manager.stores_body_in_memory(operation_name):
  614. tag = IN_MEMORY_UPLOAD_TAG
  615. return tag
  616. class PutObjectTask(Task):
  617. """Task to do a nonmultipart upload"""
  618. def _main(self, client, fileobj, bucket, key, extra_args):
  619. """
  620. :param client: The client to use when calling PutObject
  621. :param fileobj: The file to upload.
  622. :param bucket: The name of the bucket to upload to
  623. :param key: The name of the key to upload to
  624. :param extra_args: A dictionary of any extra arguments that may be
  625. used in the upload.
  626. """
  627. with fileobj as body:
  628. client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
  629. class UploadPartTask(Task):
  630. """Task to upload a part in a multipart upload"""
  631. def _main(
  632. self, client, fileobj, bucket, key, upload_id, part_number, extra_args
  633. ):
  634. """
  635. :param client: The client to use when calling PutObject
  636. :param fileobj: The file to upload.
  637. :param bucket: The name of the bucket to upload to
  638. :param key: The name of the key to upload to
  639. :param upload_id: The id of the upload
  640. :param part_number: The number representing the part of the multipart
  641. upload
  642. :param extra_args: A dictionary of any extra arguments that may be
  643. used in the upload.
  644. :rtype: dict
  645. :returns: A dictionary representing a part::
  646. {'Etag': etag_value, 'PartNumber': part_number}
  647. This value can be appended to a list to be used to complete
  648. the multipart upload.
  649. """
  650. with fileobj as body:
  651. response = client.upload_part(
  652. Bucket=bucket,
  653. Key=key,
  654. UploadId=upload_id,
  655. PartNumber=part_number,
  656. Body=body,
  657. **extra_args,
  658. )
  659. etag = response['ETag']
  660. part_metadata = {'ETag': etag, 'PartNumber': part_number}
  661. if 'ChecksumAlgorithm' in extra_args:
  662. algorithm_name = extra_args['ChecksumAlgorithm'].upper()
  663. checksum_member = f'Checksum{algorithm_name}'
  664. if checksum_member in response:
  665. part_metadata[checksum_member] = response[checksum_member]
  666. return part_metadata