futures.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import logging
  15. import sys
  16. import threading
  17. from collections import namedtuple
  18. from concurrent import futures
  19. from s3transfer.compat import MAXINT
  20. from s3transfer.exceptions import CancelledError, TransferNotDoneError
  21. from s3transfer.utils import FunctionContainer, TaskSemaphore
  22. logger = logging.getLogger(__name__)
  23. class BaseTransferFuture:
  24. @property
  25. def meta(self):
  26. """The metadata associated to the TransferFuture"""
  27. raise NotImplementedError('meta')
  28. def done(self):
  29. """Determines if a TransferFuture has completed
  30. :returns: True if completed. False, otherwise.
  31. """
  32. raise NotImplementedError('done()')
  33. def result(self):
  34. """Waits until TransferFuture is done and returns the result
  35. If the TransferFuture succeeded, it will return the result. If the
  36. TransferFuture failed, it will raise the exception associated to the
  37. failure.
  38. """
  39. raise NotImplementedError('result()')
  40. def cancel(self):
  41. """Cancels the request associated with the TransferFuture"""
  42. raise NotImplementedError('cancel()')
  43. class BaseTransferMeta:
  44. @property
  45. def call_args(self):
  46. """The call args used in the transfer request"""
  47. raise NotImplementedError('call_args')
  48. @property
  49. def transfer_id(self):
  50. """The unique id of the transfer"""
  51. raise NotImplementedError('transfer_id')
  52. @property
  53. def user_context(self):
  54. """A dictionary that requesters can store data in"""
  55. raise NotImplementedError('user_context')
  56. class TransferFuture(BaseTransferFuture):
  57. def __init__(self, meta=None, coordinator=None):
  58. """The future associated to a submitted transfer request
  59. :type meta: TransferMeta
  60. :param meta: The metadata associated to the request. This object
  61. is visible to the requester.
  62. :type coordinator: TransferCoordinator
  63. :param coordinator: The coordinator associated to the request. This
  64. object is not visible to the requester.
  65. """
  66. self._meta = meta
  67. if meta is None:
  68. self._meta = TransferMeta()
  69. self._coordinator = coordinator
  70. if coordinator is None:
  71. self._coordinator = TransferCoordinator()
  72. @property
  73. def meta(self):
  74. return self._meta
  75. def done(self):
  76. return self._coordinator.done()
  77. def result(self):
  78. try:
  79. # Usually the result() method blocks until the transfer is done,
  80. # however if a KeyboardInterrupt is raised we want want to exit
  81. # out of this and propagate the exception.
  82. return self._coordinator.result()
  83. except KeyboardInterrupt as e:
  84. self.cancel()
  85. raise e
  86. def cancel(self):
  87. self._coordinator.cancel()
  88. def set_exception(self, exception):
  89. """Sets the exception on the future."""
  90. if not self.done():
  91. raise TransferNotDoneError(
  92. 'set_exception can only be called once the transfer is '
  93. 'complete.'
  94. )
  95. self._coordinator.set_exception(exception, override=True)
  96. class TransferMeta(BaseTransferMeta):
  97. """Holds metadata about the TransferFuture"""
  98. def __init__(self, call_args=None, transfer_id=None):
  99. self._call_args = call_args
  100. self._transfer_id = transfer_id
  101. self._size = None
  102. self._user_context = {}
  103. @property
  104. def call_args(self):
  105. """The call args used in the transfer request"""
  106. return self._call_args
  107. @property
  108. def transfer_id(self):
  109. """The unique id of the transfer"""
  110. return self._transfer_id
  111. @property
  112. def size(self):
  113. """The size of the transfer request if known"""
  114. return self._size
  115. @property
  116. def user_context(self):
  117. """A dictionary that requesters can store data in"""
  118. return self._user_context
  119. def provide_transfer_size(self, size):
  120. """A method to provide the size of a transfer request
  121. By providing this value, the TransferManager will not try to
  122. call HeadObject or use the use OS to determine the size of the
  123. transfer.
  124. """
  125. self._size = size
  126. class TransferCoordinator:
  127. """A helper class for managing TransferFuture"""
  128. def __init__(self, transfer_id=None):
  129. self.transfer_id = transfer_id
  130. self._status = 'not-started'
  131. self._result = None
  132. self._exception = None
  133. self._associated_futures = set()
  134. self._failure_cleanups = []
  135. self._done_callbacks = []
  136. self._done_event = threading.Event()
  137. self._lock = threading.Lock()
  138. self._associated_futures_lock = threading.Lock()
  139. self._done_callbacks_lock = threading.Lock()
  140. self._failure_cleanups_lock = threading.Lock()
  141. def __repr__(self):
  142. return '{}(transfer_id={})'.format(
  143. self.__class__.__name__, self.transfer_id
  144. )
  145. @property
  146. def exception(self):
  147. return self._exception
  148. @property
  149. def associated_futures(self):
  150. """The list of futures associated to the inprogress TransferFuture
  151. Once the transfer finishes this list becomes empty as the transfer
  152. is considered done and there should be no running futures left.
  153. """
  154. with self._associated_futures_lock:
  155. # We return a copy of the list because we do not want to
  156. # processing the returned list while another thread is adding
  157. # more futures to the actual list.
  158. return copy.copy(self._associated_futures)
  159. @property
  160. def failure_cleanups(self):
  161. """The list of callbacks to call when the TransferFuture fails"""
  162. return self._failure_cleanups
  163. @property
  164. def status(self):
  165. """The status of the TransferFuture
  166. The currently supported states are:
  167. * not-started - Has yet to start. If in this state, a transfer
  168. can be canceled immediately and nothing will happen.
  169. * queued - SubmissionTask is about to submit tasks
  170. * running - Is inprogress. In-progress as of now means that
  171. the SubmissionTask that runs the transfer is being executed. So
  172. there is no guarantee any transfer requests had been made to
  173. S3 if this state is reached.
  174. * cancelled - Was cancelled
  175. * failed - An exception other than CancelledError was thrown
  176. * success - No exceptions were thrown and is done.
  177. """
  178. return self._status
  179. def set_result(self, result):
  180. """Set a result for the TransferFuture
  181. Implies that the TransferFuture succeeded. This will always set a
  182. result because it is invoked on the final task where there is only
  183. ever one final task and it is ran at the very end of a transfer
  184. process. So if a result is being set for this final task, the transfer
  185. succeeded even if something came a long and canceled the transfer
  186. on the final task.
  187. """
  188. with self._lock:
  189. self._exception = None
  190. self._result = result
  191. self._status = 'success'
  192. def set_exception(self, exception, override=False):
  193. """Set an exception for the TransferFuture
  194. Implies the TransferFuture failed.
  195. :param exception: The exception that cause the transfer to fail.
  196. :param override: If True, override any existing state.
  197. """
  198. with self._lock:
  199. if not self.done() or override:
  200. self._exception = exception
  201. self._status = 'failed'
  202. def result(self):
  203. """Waits until TransferFuture is done and returns the result
  204. If the TransferFuture succeeded, it will return the result. If the
  205. TransferFuture failed, it will raise the exception associated to the
  206. failure.
  207. """
  208. # Doing a wait() with no timeout cannot be interrupted in python2 but
  209. # can be interrupted in python3 so we just wait with the largest
  210. # possible value integer value, which is on the scale of billions of
  211. # years...
  212. self._done_event.wait(MAXINT)
  213. # Once done waiting, raise an exception if present or return the
  214. # final result.
  215. if self._exception:
  216. raise self._exception
  217. return self._result
  218. def cancel(self, msg='', exc_type=CancelledError):
  219. """Cancels the TransferFuture
  220. :param msg: The message to attach to the cancellation
  221. :param exc_type: The type of exception to set for the cancellation
  222. """
  223. with self._lock:
  224. if not self.done():
  225. should_announce_done = False
  226. logger.debug('%s cancel(%s) called', self, msg)
  227. self._exception = exc_type(msg)
  228. if self._status == 'not-started':
  229. should_announce_done = True
  230. self._status = 'cancelled'
  231. if should_announce_done:
  232. self.announce_done()
  233. def set_status_to_queued(self):
  234. """Sets the TransferFutrue's status to running"""
  235. self._transition_to_non_done_state('queued')
  236. def set_status_to_running(self):
  237. """Sets the TransferFuture's status to running"""
  238. self._transition_to_non_done_state('running')
  239. def _transition_to_non_done_state(self, desired_state):
  240. with self._lock:
  241. if self.done():
  242. raise RuntimeError(
  243. 'Unable to transition from done state %s to non-done '
  244. 'state %s.' % (self.status, desired_state)
  245. )
  246. self._status = desired_state
  247. def submit(self, executor, task, tag=None):
  248. """Submits a task to a provided executor
  249. :type executor: s3transfer.futures.BoundedExecutor
  250. :param executor: The executor to submit the callable to
  251. :type task: s3transfer.tasks.Task
  252. :param task: The task to submit to the executor
  253. :type tag: s3transfer.futures.TaskTag
  254. :param tag: A tag to associate to the submitted task
  255. :rtype: concurrent.futures.Future
  256. :returns: A future representing the submitted task
  257. """
  258. logger.debug(
  259. "Submitting task {} to executor {} for transfer request: {}.".format(
  260. task, executor, self.transfer_id
  261. )
  262. )
  263. future = executor.submit(task, tag=tag)
  264. # Add this created future to the list of associated future just
  265. # in case it is needed during cleanups.
  266. self.add_associated_future(future)
  267. future.add_done_callback(
  268. FunctionContainer(self.remove_associated_future, future)
  269. )
  270. return future
  271. def done(self):
  272. """Determines if a TransferFuture has completed
  273. :returns: False if status is equal to 'failed', 'cancelled', or
  274. 'success'. True, otherwise
  275. """
  276. return self.status in ['failed', 'cancelled', 'success']
  277. def add_associated_future(self, future):
  278. """Adds a future to be associated with the TransferFuture"""
  279. with self._associated_futures_lock:
  280. self._associated_futures.add(future)
  281. def remove_associated_future(self, future):
  282. """Removes a future's association to the TransferFuture"""
  283. with self._associated_futures_lock:
  284. self._associated_futures.remove(future)
  285. def add_done_callback(self, function, *args, **kwargs):
  286. """Add a done callback to be invoked when transfer is done"""
  287. with self._done_callbacks_lock:
  288. self._done_callbacks.append(
  289. FunctionContainer(function, *args, **kwargs)
  290. )
  291. def add_failure_cleanup(self, function, *args, **kwargs):
  292. """Adds a callback to call upon failure"""
  293. with self._failure_cleanups_lock:
  294. self._failure_cleanups.append(
  295. FunctionContainer(function, *args, **kwargs)
  296. )
  297. def announce_done(self):
  298. """Announce that future is done running and run associated callbacks
  299. This will run any failure cleanups if the transfer failed if not
  300. they have not been run, allows the result() to be unblocked, and will
  301. run any done callbacks associated to the TransferFuture if they have
  302. not already been ran.
  303. """
  304. if self.status != 'success':
  305. self._run_failure_cleanups()
  306. self._done_event.set()
  307. self._run_done_callbacks()
  308. def _run_done_callbacks(self):
  309. # Run the callbacks and remove the callbacks from the internal
  310. # list so they do not get ran again if done is announced more than
  311. # once.
  312. with self._done_callbacks_lock:
  313. self._run_callbacks(self._done_callbacks)
  314. self._done_callbacks = []
  315. def _run_failure_cleanups(self):
  316. # Run the cleanup callbacks and remove the callbacks from the internal
  317. # list so they do not get ran again if done is announced more than
  318. # once.
  319. with self._failure_cleanups_lock:
  320. self._run_callbacks(self.failure_cleanups)
  321. self._failure_cleanups = []
  322. def _run_callbacks(self, callbacks):
  323. for callback in callbacks:
  324. self._run_callback(callback)
  325. def _run_callback(self, callback):
  326. try:
  327. callback()
  328. # We do not want a callback interrupting the process, especially
  329. # in the failure cleanups. So log and catch, the exception.
  330. except Exception:
  331. logger.debug("Exception raised in %s." % callback, exc_info=True)
  332. class BoundedExecutor:
  333. EXECUTOR_CLS = futures.ThreadPoolExecutor
  334. def __init__(
  335. self, max_size, max_num_threads, tag_semaphores=None, executor_cls=None
  336. ):
  337. """An executor implementation that has a maximum queued up tasks
  338. The executor will block if the number of tasks that have been
  339. submitted and is currently working on is past its maximum.
  340. :params max_size: The maximum number of inflight futures. An inflight
  341. future means that the task is either queued up or is currently
  342. being executed. A size of None or 0 means that the executor will
  343. have no bound in terms of the number of inflight futures.
  344. :params max_num_threads: The maximum number of threads the executor
  345. uses.
  346. :type tag_semaphores: dict
  347. :params tag_semaphores: A dictionary where the key is the name of the
  348. tag and the value is the semaphore to use when limiting the
  349. number of tasks the executor is processing at a time.
  350. :type executor_cls: BaseExecutor
  351. :param underlying_executor_cls: The executor class that
  352. get bounded by this executor. If None is provided, the
  353. concurrent.futures.ThreadPoolExecutor class is used.
  354. """
  355. self._max_num_threads = max_num_threads
  356. if executor_cls is None:
  357. executor_cls = self.EXECUTOR_CLS
  358. self._executor = executor_cls(max_workers=self._max_num_threads)
  359. self._semaphore = TaskSemaphore(max_size)
  360. self._tag_semaphores = tag_semaphores
  361. def submit(self, task, tag=None, block=True):
  362. """Submit a task to complete
  363. :type task: s3transfer.tasks.Task
  364. :param task: The task to run __call__ on
  365. :type tag: s3transfer.futures.TaskTag
  366. :param tag: An optional tag to associate to the task. This
  367. is used to override which semaphore to use.
  368. :type block: boolean
  369. :param block: True if to wait till it is possible to submit a task.
  370. False, if not to wait and raise an error if not able to submit
  371. a task.
  372. :returns: The future associated to the submitted task
  373. """
  374. semaphore = self._semaphore
  375. # If a tag was provided, use the semaphore associated to that
  376. # tag.
  377. if tag:
  378. semaphore = self._tag_semaphores[tag]
  379. # Call acquire on the semaphore.
  380. acquire_token = semaphore.acquire(task.transfer_id, block)
  381. # Create a callback to invoke when task is done in order to call
  382. # release on the semaphore.
  383. release_callback = FunctionContainer(
  384. semaphore.release, task.transfer_id, acquire_token
  385. )
  386. # Submit the task to the underlying executor.
  387. future = ExecutorFuture(self._executor.submit(task))
  388. # Add the Semaphore.release() callback to the future such that
  389. # it is invoked once the future completes.
  390. future.add_done_callback(release_callback)
  391. return future
  392. def shutdown(self, wait=True):
  393. self._executor.shutdown(wait)
  394. class ExecutorFuture:
  395. def __init__(self, future):
  396. """A future returned from the executor
  397. Currently, it is just a wrapper around a concurrent.futures.Future.
  398. However, this can eventually grow to implement the needed functionality
  399. of concurrent.futures.Future if we move off of the library and not
  400. affect the rest of the codebase.
  401. :type future: concurrent.futures.Future
  402. :param future: The underlying future
  403. """
  404. self._future = future
  405. def result(self):
  406. return self._future.result()
  407. def add_done_callback(self, fn):
  408. """Adds a callback to be completed once future is done
  409. :param fn: A callable that takes no arguments. Note that is different
  410. than concurrent.futures.Future.add_done_callback that requires
  411. a single argument for the future.
  412. """
  413. # The done callback for concurrent.futures.Future will always pass a
  414. # the future in as the only argument. So we need to create the
  415. # proper signature wrapper that will invoke the callback provided.
  416. def done_callback(future_passed_to_callback):
  417. return fn()
  418. self._future.add_done_callback(done_callback)
  419. def done(self):
  420. return self._future.done()
  421. class BaseExecutor:
  422. """Base Executor class implementation needed to work with s3transfer"""
  423. def __init__(self, max_workers=None):
  424. pass
  425. def submit(self, fn, *args, **kwargs):
  426. raise NotImplementedError('submit()')
  427. def shutdown(self, wait=True):
  428. raise NotImplementedError('shutdown()')
  429. class NonThreadedExecutor(BaseExecutor):
  430. """A drop-in replacement non-threaded version of ThreadPoolExecutor"""
  431. def submit(self, fn, *args, **kwargs):
  432. future = NonThreadedExecutorFuture()
  433. try:
  434. result = fn(*args, **kwargs)
  435. future.set_result(result)
  436. except Exception:
  437. e, tb = sys.exc_info()[1:]
  438. logger.debug(
  439. 'Setting exception for %s to %s with traceback %s',
  440. future,
  441. e,
  442. tb,
  443. )
  444. future.set_exception_info(e, tb)
  445. return future
  446. def shutdown(self, wait=True):
  447. pass
  448. class NonThreadedExecutorFuture:
  449. """The Future returned from NonThreadedExecutor
  450. Note that this future is **not** thread-safe as it is being used
  451. from the context of a non-threaded environment.
  452. """
  453. def __init__(self):
  454. self._result = None
  455. self._exception = None
  456. self._traceback = None
  457. self._done = False
  458. self._done_callbacks = []
  459. def set_result(self, result):
  460. self._result = result
  461. self._set_done()
  462. def set_exception_info(self, exception, traceback):
  463. self._exception = exception
  464. self._traceback = traceback
  465. self._set_done()
  466. def result(self, timeout=None):
  467. if self._exception:
  468. raise self._exception.with_traceback(self._traceback)
  469. return self._result
  470. def _set_done(self):
  471. self._done = True
  472. for done_callback in self._done_callbacks:
  473. self._invoke_done_callback(done_callback)
  474. self._done_callbacks = []
  475. def _invoke_done_callback(self, done_callback):
  476. return done_callback(self)
  477. def done(self):
  478. return self._done
  479. def add_done_callback(self, fn):
  480. if self._done:
  481. self._invoke_done_callback(fn)
  482. else:
  483. self._done_callbacks.append(fn)
  484. TaskTag = namedtuple('TaskTag', ['name'])
  485. IN_MEMORY_UPLOAD_TAG = TaskTag('in_memory_upload')
  486. IN_MEMORY_DOWNLOAD_TAG = TaskTag('in_memory_download')