tasks.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import logging
  15. from s3transfer.utils import get_callbacks
  16. logger = logging.getLogger(__name__)
  17. class Task:
  18. """A task associated to a TransferFuture request
  19. This is a base class for other classes to subclass from. All subclassed
  20. classes must implement the main() method.
  21. """
  22. def __init__(
  23. self,
  24. transfer_coordinator,
  25. main_kwargs=None,
  26. pending_main_kwargs=None,
  27. done_callbacks=None,
  28. is_final=False,
  29. ):
  30. """
  31. :type transfer_coordinator: s3transfer.futures.TransferCoordinator
  32. :param transfer_coordinator: The context associated to the
  33. TransferFuture for which this Task is associated with.
  34. :type main_kwargs: dict
  35. :param main_kwargs: The keyword args that can be immediately supplied
  36. to the _main() method of the task
  37. :type pending_main_kwargs: dict
  38. :param pending_main_kwargs: The keyword args that are depended upon
  39. by the result from a dependent future(s). The result returned by
  40. the future(s) will be used as the value for the keyword argument
  41. when _main() is called. The values for each key can be:
  42. * a single future - Once completed, its value will be the
  43. result of that single future
  44. * a list of futures - Once all of the futures complete, the
  45. value used will be a list of each completed future result
  46. value in order of when they were originally supplied.
  47. :type done_callbacks: list of callbacks
  48. :param done_callbacks: A list of callbacks to call once the task is
  49. done completing. Each callback will be called with no arguments
  50. and will be called no matter if the task succeeds or an exception
  51. is raised.
  52. :type is_final: boolean
  53. :param is_final: True, to indicate that this task is the final task
  54. for the TransferFuture request. By setting this value to True, it
  55. will set the result of the entire TransferFuture to the result
  56. returned by this task's main() method.
  57. """
  58. self._transfer_coordinator = transfer_coordinator
  59. self._main_kwargs = main_kwargs
  60. if self._main_kwargs is None:
  61. self._main_kwargs = {}
  62. self._pending_main_kwargs = pending_main_kwargs
  63. if pending_main_kwargs is None:
  64. self._pending_main_kwargs = {}
  65. self._done_callbacks = done_callbacks
  66. if self._done_callbacks is None:
  67. self._done_callbacks = []
  68. self._is_final = is_final
  69. def __repr__(self):
  70. # These are the general main_kwarg parameters that we want to
  71. # display in the repr.
  72. params_to_display = [
  73. 'bucket',
  74. 'key',
  75. 'part_number',
  76. 'final_filename',
  77. 'transfer_future',
  78. 'offset',
  79. 'extra_args',
  80. ]
  81. main_kwargs_to_display = self._get_kwargs_with_params_to_include(
  82. self._main_kwargs, params_to_display
  83. )
  84. return '{}(transfer_id={}, {})'.format(
  85. self.__class__.__name__,
  86. self._transfer_coordinator.transfer_id,
  87. main_kwargs_to_display,
  88. )
  89. @property
  90. def transfer_id(self):
  91. """The id for the transfer request that the task belongs to"""
  92. return self._transfer_coordinator.transfer_id
  93. def _get_kwargs_with_params_to_include(self, kwargs, include):
  94. filtered_kwargs = {}
  95. for param in include:
  96. if param in kwargs:
  97. filtered_kwargs[param] = kwargs[param]
  98. return filtered_kwargs
  99. def _get_kwargs_with_params_to_exclude(self, kwargs, exclude):
  100. filtered_kwargs = {}
  101. for param, value in kwargs.items():
  102. if param in exclude:
  103. continue
  104. filtered_kwargs[param] = value
  105. return filtered_kwargs
  106. def __call__(self):
  107. """The callable to use when submitting a Task to an executor"""
  108. try:
  109. # Wait for all of futures this task depends on.
  110. self._wait_on_dependent_futures()
  111. # Gather up all of the main keyword arguments for main().
  112. # This includes the immediately provided main_kwargs and
  113. # the values for pending_main_kwargs that source from the return
  114. # values from the task's dependent futures.
  115. kwargs = self._get_all_main_kwargs()
  116. # If the task is not done (really only if some other related
  117. # task to the TransferFuture had failed) then execute the task's
  118. # main() method.
  119. if not self._transfer_coordinator.done():
  120. return self._execute_main(kwargs)
  121. except Exception as e:
  122. self._log_and_set_exception(e)
  123. finally:
  124. # Run any done callbacks associated to the task no matter what.
  125. for done_callback in self._done_callbacks:
  126. done_callback()
  127. if self._is_final:
  128. # If this is the final task announce that it is done if results
  129. # are waiting on its completion.
  130. self._transfer_coordinator.announce_done()
  131. def _execute_main(self, kwargs):
  132. # Do not display keyword args that should not be printed, especially
  133. # if they are going to make the logs hard to follow.
  134. params_to_exclude = ['data']
  135. kwargs_to_display = self._get_kwargs_with_params_to_exclude(
  136. kwargs, params_to_exclude
  137. )
  138. # Log what is about to be executed.
  139. logger.debug(f"Executing task {self} with kwargs {kwargs_to_display}")
  140. return_value = self._main(**kwargs)
  141. # If the task is the final task, then set the TransferFuture's
  142. # value to the return value from main().
  143. if self._is_final:
  144. self._transfer_coordinator.set_result(return_value)
  145. return return_value
  146. def _log_and_set_exception(self, exception):
  147. # If an exception is ever thrown than set the exception for the
  148. # entire TransferFuture.
  149. logger.debug("Exception raised.", exc_info=True)
  150. self._transfer_coordinator.set_exception(exception)
  151. def _main(self, **kwargs):
  152. """The method that will be ran in the executor
  153. This method must be implemented by subclasses from Task. main() can
  154. be implemented with any arguments decided upon by the subclass.
  155. """
  156. raise NotImplementedError('_main() must be implemented')
  157. def _wait_on_dependent_futures(self):
  158. # Gather all of the futures into that main() depends on.
  159. futures_to_wait_on = []
  160. for _, future in self._pending_main_kwargs.items():
  161. # If the pending main keyword arg is a list then extend the list.
  162. if isinstance(future, list):
  163. futures_to_wait_on.extend(future)
  164. # If the pending main keyword arg is a future append it to the list.
  165. else:
  166. futures_to_wait_on.append(future)
  167. # Now wait for all of the futures to complete.
  168. self._wait_until_all_complete(futures_to_wait_on)
  169. def _wait_until_all_complete(self, futures):
  170. # This is a basic implementation of the concurrent.futures.wait()
  171. #
  172. # concurrent.futures.wait() is not used instead because of this
  173. # reported issue: https://bugs.python.org/issue20319.
  174. # The issue would occasionally cause multipart uploads to hang
  175. # when wait() was called. With this approach, it avoids the
  176. # concurrency bug by removing any association with concurrent.futures
  177. # implementation of waiters.
  178. logger.debug(
  179. '%s about to wait for the following futures %s', self, futures
  180. )
  181. for future in futures:
  182. try:
  183. logger.debug('%s about to wait for %s', self, future)
  184. future.result()
  185. except Exception:
  186. # result() can also produce exceptions. We want to ignore
  187. # these to be deferred to error handling down the road.
  188. pass
  189. logger.debug('%s done waiting for dependent futures', self)
  190. def _get_all_main_kwargs(self):
  191. # Copy over all of the kwargs that we know is available.
  192. kwargs = copy.copy(self._main_kwargs)
  193. # Iterate through the kwargs whose values are pending on the result
  194. # of a future.
  195. for key, pending_value in self._pending_main_kwargs.items():
  196. # If the value is a list of futures, iterate though the list
  197. # appending on the result from each future.
  198. if isinstance(pending_value, list):
  199. result = []
  200. for future in pending_value:
  201. result.append(future.result())
  202. # Otherwise if the pending_value is a future, just wait for it.
  203. else:
  204. result = pending_value.result()
  205. # Add the retrieved value to the kwargs to be sent to the
  206. # main() call.
  207. kwargs[key] = result
  208. return kwargs
  209. class SubmissionTask(Task):
  210. """A base class for any submission task
  211. Submission tasks are the top-level task used to submit a series of tasks
  212. to execute a particular transfer.
  213. """
  214. def _main(self, transfer_future, **kwargs):
  215. """
  216. :type transfer_future: s3transfer.futures.TransferFuture
  217. :param transfer_future: The transfer future associated with the
  218. transfer request that tasks are being submitted for
  219. :param kwargs: Any additional kwargs that you may want to pass
  220. to the _submit() method
  221. """
  222. try:
  223. self._transfer_coordinator.set_status_to_queued()
  224. # Before submitting any tasks, run all of the on_queued callbacks
  225. on_queued_callbacks = get_callbacks(transfer_future, 'queued')
  226. for on_queued_callback in on_queued_callbacks:
  227. on_queued_callback()
  228. # Once callbacks have been ran set the status to running.
  229. self._transfer_coordinator.set_status_to_running()
  230. # Call the submit method to start submitting tasks to execute the
  231. # transfer.
  232. self._submit(transfer_future=transfer_future, **kwargs)
  233. except BaseException as e:
  234. # If there was an exception raised during the submission of task
  235. # there is a chance that the final task that signals if a transfer
  236. # is done and too run the cleanup may never have been submitted in
  237. # the first place so we need to account accordingly.
  238. #
  239. # Note that BaseException is caught, instead of Exception, because
  240. # for some implementations of executors, specifically the serial
  241. # implementation, the SubmissionTask is directly exposed to
  242. # KeyboardInterupts and so needs to cleanup and signal done
  243. # for those as well.
  244. # Set the exception, that caused the process to fail.
  245. self._log_and_set_exception(e)
  246. # Wait for all possibly associated futures that may have spawned
  247. # from this submission task have finished before we announce the
  248. # transfer done.
  249. self._wait_for_all_submitted_futures_to_complete()
  250. # Announce the transfer as done, which will run any cleanups
  251. # and done callbacks as well.
  252. self._transfer_coordinator.announce_done()
  253. def _submit(self, transfer_future, **kwargs):
  254. """The submission method to be implemented
  255. :type transfer_future: s3transfer.futures.TransferFuture
  256. :param transfer_future: The transfer future associated with the
  257. transfer request that tasks are being submitted for
  258. :param kwargs: Any additional keyword arguments you want to be passed
  259. in
  260. """
  261. raise NotImplementedError('_submit() must be implemented')
  262. def _wait_for_all_submitted_futures_to_complete(self):
  263. # We want to wait for all futures that were submitted to
  264. # complete as we do not want the cleanup callbacks or done callbacks
  265. # to be called to early. The main problem is any task that was
  266. # submitted may have submitted even more during its process and so
  267. # we need to account accordingly.
  268. # First get all of the futures that were submitted up to this point.
  269. submitted_futures = self._transfer_coordinator.associated_futures
  270. while submitted_futures:
  271. # Wait for those futures to complete.
  272. self._wait_until_all_complete(submitted_futures)
  273. # However, more futures may have been submitted as we waited so
  274. # we need to check again for any more associated futures.
  275. possibly_more_submitted_futures = (
  276. self._transfer_coordinator.associated_futures
  277. )
  278. # If the current list of submitted futures is equal to the
  279. # the list of associated futures for when after the wait completes,
  280. # we can ensure no more futures were submitted in waiting on
  281. # the current list of futures to complete ultimately meaning all
  282. # futures that may have spawned from the original submission task
  283. # have completed.
  284. if submitted_futures == possibly_more_submitted_futures:
  285. break
  286. submitted_futures = possibly_more_submitted_futures
  287. class CreateMultipartUploadTask(Task):
  288. """Task to initiate a multipart upload"""
  289. def _main(self, client, bucket, key, extra_args):
  290. """
  291. :param client: The client to use when calling CreateMultipartUpload
  292. :param bucket: The name of the bucket to upload to
  293. :param key: The name of the key to upload to
  294. :param extra_args: A dictionary of any extra arguments that may be
  295. used in the initialization.
  296. :returns: The upload id of the multipart upload
  297. """
  298. # Create the multipart upload.
  299. response = client.create_multipart_upload(
  300. Bucket=bucket, Key=key, **extra_args
  301. )
  302. upload_id = response['UploadId']
  303. # Add a cleanup if the multipart upload fails at any point.
  304. self._transfer_coordinator.add_failure_cleanup(
  305. client.abort_multipart_upload,
  306. Bucket=bucket,
  307. Key=key,
  308. UploadId=upload_id,
  309. )
  310. return upload_id
  311. class CompleteMultipartUploadTask(Task):
  312. """Task to complete a multipart upload"""
  313. def _main(self, client, bucket, key, upload_id, parts, extra_args):
  314. """
  315. :param client: The client to use when calling CompleteMultipartUpload
  316. :param bucket: The name of the bucket to upload to
  317. :param key: The name of the key to upload to
  318. :param upload_id: The id of the upload
  319. :param parts: A list of parts to use to complete the multipart upload::
  320. [{'Etag': etag_value, 'PartNumber': part_number}, ...]
  321. Each element in the list consists of a return value from
  322. ``UploadPartTask.main()``.
  323. :param extra_args: A dictionary of any extra arguments that may be
  324. used in completing the multipart transfer.
  325. """
  326. client.complete_multipart_upload(
  327. Bucket=bucket,
  328. Key=key,
  329. UploadId=upload_id,
  330. MultipartUpload={'Parts': parts},
  331. **extra_args,
  332. )