manager.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import copy
  14. import logging
  15. import re
  16. import threading
  17. from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
  18. from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, KB, MB
  19. from s3transfer.copies import CopySubmissionTask
  20. from s3transfer.delete import DeleteSubmissionTask
  21. from s3transfer.download import DownloadSubmissionTask
  22. from s3transfer.exceptions import CancelledError, FatalError
  23. from s3transfer.futures import (
  24. IN_MEMORY_DOWNLOAD_TAG,
  25. IN_MEMORY_UPLOAD_TAG,
  26. BoundedExecutor,
  27. TransferCoordinator,
  28. TransferFuture,
  29. TransferMeta,
  30. )
  31. from s3transfer.upload import UploadSubmissionTask
  32. from s3transfer.utils import (
  33. CallArgs,
  34. OSUtils,
  35. SlidingWindowSemaphore,
  36. TaskSemaphore,
  37. get_callbacks,
  38. signal_not_transferring,
  39. signal_transferring,
  40. )
  41. logger = logging.getLogger(__name__)
  42. class TransferConfig:
  43. def __init__(
  44. self,
  45. multipart_threshold=8 * MB,
  46. multipart_chunksize=8 * MB,
  47. max_request_concurrency=10,
  48. max_submission_concurrency=5,
  49. max_request_queue_size=1000,
  50. max_submission_queue_size=1000,
  51. max_io_queue_size=1000,
  52. io_chunksize=256 * KB,
  53. num_download_attempts=5,
  54. max_in_memory_upload_chunks=10,
  55. max_in_memory_download_chunks=10,
  56. max_bandwidth=None,
  57. ):
  58. """Configurations for the transfer manager
  59. :param multipart_threshold: The threshold for which multipart
  60. transfers occur.
  61. :param max_request_concurrency: The maximum number of S3 API
  62. transfer-related requests that can happen at a time.
  63. :param max_submission_concurrency: The maximum number of threads
  64. processing a call to a TransferManager method. Processing a
  65. call usually entails determining which S3 API requests that need
  66. to be enqueued, but does **not** entail making any of the
  67. S3 API data transferring requests needed to perform the transfer.
  68. The threads controlled by ``max_request_concurrency`` is
  69. responsible for that.
  70. :param multipart_chunksize: The size of each transfer if a request
  71. becomes a multipart transfer.
  72. :param max_request_queue_size: The maximum amount of S3 API requests
  73. that can be queued at a time.
  74. :param max_submission_queue_size: The maximum amount of
  75. TransferManager method calls that can be queued at a time.
  76. :param max_io_queue_size: The maximum amount of read parts that
  77. can be queued to be written to disk per download. The default
  78. size for each elementin this queue is 8 KB.
  79. :param io_chunksize: The max size of each chunk in the io queue.
  80. Currently, this is size used when reading from the downloaded
  81. stream as well.
  82. :param num_download_attempts: The number of download attempts that
  83. will be tried upon errors with downloading an object in S3. Note
  84. that these retries account for errors that occur when streaming
  85. down the data from s3 (i.e. socket errors and read timeouts that
  86. occur after receiving an OK response from s3).
  87. Other retryable exceptions such as throttling errors and 5xx errors
  88. are already retried by botocore (this default is 5). The
  89. ``num_download_attempts`` does not take into account the
  90. number of exceptions retried by botocore.
  91. :param max_in_memory_upload_chunks: The number of chunks that can
  92. be stored in memory at a time for all ongoing upload requests.
  93. This pertains to chunks of data that need to be stored in memory
  94. during an upload if the data is sourced from a file-like object.
  95. The total maximum memory footprint due to a in-memory upload
  96. chunks is roughly equal to:
  97. max_in_memory_upload_chunks * multipart_chunksize
  98. + max_submission_concurrency * multipart_chunksize
  99. ``max_submission_concurrency`` has an affect on this value because
  100. for each thread pulling data off of a file-like object, they may
  101. be waiting with a single read chunk to be submitted for upload
  102. because the ``max_in_memory_upload_chunks`` value has been reached
  103. by the threads making the upload request.
  104. :param max_in_memory_download_chunks: The number of chunks that can
  105. be buffered in memory and **not** in the io queue at a time for all
  106. ongoing download requests. This pertains specifically to file-like
  107. objects that cannot be seeked. The total maximum memory footprint
  108. due to a in-memory download chunks is roughly equal to:
  109. max_in_memory_download_chunks * multipart_chunksize
  110. :param max_bandwidth: The maximum bandwidth that will be consumed
  111. in uploading and downloading file content. The value is in terms of
  112. bytes per second.
  113. """
  114. self.multipart_threshold = multipart_threshold
  115. self.multipart_chunksize = multipart_chunksize
  116. self.max_request_concurrency = max_request_concurrency
  117. self.max_submission_concurrency = max_submission_concurrency
  118. self.max_request_queue_size = max_request_queue_size
  119. self.max_submission_queue_size = max_submission_queue_size
  120. self.max_io_queue_size = max_io_queue_size
  121. self.io_chunksize = io_chunksize
  122. self.num_download_attempts = num_download_attempts
  123. self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
  124. self.max_in_memory_download_chunks = max_in_memory_download_chunks
  125. self.max_bandwidth = max_bandwidth
  126. self._validate_attrs_are_nonzero()
  127. def _validate_attrs_are_nonzero(self):
  128. for attr, attr_val in self.__dict__.items():
  129. if attr_val is not None and attr_val <= 0:
  130. raise ValueError(
  131. 'Provided parameter %s of value %s must be greater than '
  132. '0.' % (attr, attr_val)
  133. )
  134. class TransferManager:
  135. ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
  136. ALLOWED_UPLOAD_ARGS = [
  137. 'ACL',
  138. 'CacheControl',
  139. 'ChecksumAlgorithm',
  140. 'ContentDisposition',
  141. 'ContentEncoding',
  142. 'ContentLanguage',
  143. 'ContentType',
  144. 'ExpectedBucketOwner',
  145. 'Expires',
  146. 'GrantFullControl',
  147. 'GrantRead',
  148. 'GrantReadACP',
  149. 'GrantWriteACP',
  150. 'Metadata',
  151. 'ObjectLockLegalHoldStatus',
  152. 'ObjectLockMode',
  153. 'ObjectLockRetainUntilDate',
  154. 'RequestPayer',
  155. 'ServerSideEncryption',
  156. 'StorageClass',
  157. 'SSECustomerAlgorithm',
  158. 'SSECustomerKey',
  159. 'SSECustomerKeyMD5',
  160. 'SSEKMSKeyId',
  161. 'SSEKMSEncryptionContext',
  162. 'Tagging',
  163. 'WebsiteRedirectLocation',
  164. ]
  165. ALLOWED_COPY_ARGS = ALLOWED_UPLOAD_ARGS + [
  166. 'CopySourceIfMatch',
  167. 'CopySourceIfModifiedSince',
  168. 'CopySourceIfNoneMatch',
  169. 'CopySourceIfUnmodifiedSince',
  170. 'CopySourceSSECustomerAlgorithm',
  171. 'CopySourceSSECustomerKey',
  172. 'CopySourceSSECustomerKeyMD5',
  173. 'MetadataDirective',
  174. 'TaggingDirective',
  175. ]
  176. ALLOWED_DELETE_ARGS = [
  177. 'MFA',
  178. 'VersionId',
  179. 'RequestPayer',
  180. 'ExpectedBucketOwner',
  181. ]
  182. VALIDATE_SUPPORTED_BUCKET_VALUES = True
  183. _UNSUPPORTED_BUCKET_PATTERNS = {
  184. 'S3 Object Lambda': re.compile(
  185. r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
  186. r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
  187. ),
  188. }
  189. def __init__(self, client, config=None, osutil=None, executor_cls=None):
  190. """A transfer manager interface for Amazon S3
  191. :param client: Client to be used by the manager
  192. :param config: TransferConfig to associate specific configurations
  193. :param osutil: OSUtils object to use for os-related behavior when
  194. using with transfer manager.
  195. :type executor_cls: s3transfer.futures.BaseExecutor
  196. :param executor_cls: The class of executor to use with the transfer
  197. manager. By default, concurrent.futures.ThreadPoolExecutor is used.
  198. """
  199. self._client = client
  200. self._config = config
  201. if config is None:
  202. self._config = TransferConfig()
  203. self._osutil = osutil
  204. if osutil is None:
  205. self._osutil = OSUtils()
  206. self._coordinator_controller = TransferCoordinatorController()
  207. # A counter to create unique id's for each transfer submitted.
  208. self._id_counter = 0
  209. # The executor responsible for making S3 API transfer requests
  210. self._request_executor = BoundedExecutor(
  211. max_size=self._config.max_request_queue_size,
  212. max_num_threads=self._config.max_request_concurrency,
  213. tag_semaphores={
  214. IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
  215. self._config.max_in_memory_upload_chunks
  216. ),
  217. IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
  218. self._config.max_in_memory_download_chunks
  219. ),
  220. },
  221. executor_cls=executor_cls,
  222. )
  223. # The executor responsible for submitting the necessary tasks to
  224. # perform the desired transfer
  225. self._submission_executor = BoundedExecutor(
  226. max_size=self._config.max_submission_queue_size,
  227. max_num_threads=self._config.max_submission_concurrency,
  228. executor_cls=executor_cls,
  229. )
  230. # There is one thread available for writing to disk. It will handle
  231. # downloads for all files.
  232. self._io_executor = BoundedExecutor(
  233. max_size=self._config.max_io_queue_size,
  234. max_num_threads=1,
  235. executor_cls=executor_cls,
  236. )
  237. # The component responsible for limiting bandwidth usage if it
  238. # is configured.
  239. self._bandwidth_limiter = None
  240. if self._config.max_bandwidth is not None:
  241. logger.debug(
  242. 'Setting max_bandwidth to %s', self._config.max_bandwidth
  243. )
  244. leaky_bucket = LeakyBucket(self._config.max_bandwidth)
  245. self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
  246. self._register_handlers()
  247. @property
  248. def client(self):
  249. return self._client
  250. @property
  251. def config(self):
  252. return self._config
  253. def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
  254. """Uploads a file to S3
  255. :type fileobj: str or seekable file-like object
  256. :param fileobj: The name of a file to upload or a seekable file-like
  257. object to upload. It is recommended to use a filename because
  258. file-like objects may result in higher memory usage.
  259. :type bucket: str
  260. :param bucket: The name of the bucket to upload to
  261. :type key: str
  262. :param key: The name of the key to upload to
  263. :type extra_args: dict
  264. :param extra_args: Extra arguments that may be passed to the
  265. client operation
  266. :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
  267. :param subscribers: The list of subscribers to be invoked in the
  268. order provided based on the event emit during the process of
  269. the transfer request.
  270. :rtype: s3transfer.futures.TransferFuture
  271. :returns: Transfer future representing the upload
  272. """
  273. if extra_args is None:
  274. extra_args = {}
  275. if subscribers is None:
  276. subscribers = []
  277. self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
  278. self._validate_if_bucket_supported(bucket)
  279. call_args = CallArgs(
  280. fileobj=fileobj,
  281. bucket=bucket,
  282. key=key,
  283. extra_args=extra_args,
  284. subscribers=subscribers,
  285. )
  286. extra_main_kwargs = {}
  287. if self._bandwidth_limiter:
  288. extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
  289. return self._submit_transfer(
  290. call_args, UploadSubmissionTask, extra_main_kwargs
  291. )
  292. def download(
  293. self, bucket, key, fileobj, extra_args=None, subscribers=None
  294. ):
  295. """Downloads a file from S3
  296. :type bucket: str
  297. :param bucket: The name of the bucket to download from
  298. :type key: str
  299. :param key: The name of the key to download from
  300. :type fileobj: str or seekable file-like object
  301. :param fileobj: The name of a file to download or a seekable file-like
  302. object to download. It is recommended to use a filename because
  303. file-like objects may result in higher memory usage.
  304. :type extra_args: dict
  305. :param extra_args: Extra arguments that may be passed to the
  306. client operation
  307. :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
  308. :param subscribers: The list of subscribers to be invoked in the
  309. order provided based on the event emit during the process of
  310. the transfer request.
  311. :rtype: s3transfer.futures.TransferFuture
  312. :returns: Transfer future representing the download
  313. """
  314. if extra_args is None:
  315. extra_args = {}
  316. if subscribers is None:
  317. subscribers = []
  318. self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
  319. self._validate_if_bucket_supported(bucket)
  320. call_args = CallArgs(
  321. bucket=bucket,
  322. key=key,
  323. fileobj=fileobj,
  324. extra_args=extra_args,
  325. subscribers=subscribers,
  326. )
  327. extra_main_kwargs = {'io_executor': self._io_executor}
  328. if self._bandwidth_limiter:
  329. extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
  330. return self._submit_transfer(
  331. call_args, DownloadSubmissionTask, extra_main_kwargs
  332. )
  333. def copy(
  334. self,
  335. copy_source,
  336. bucket,
  337. key,
  338. extra_args=None,
  339. subscribers=None,
  340. source_client=None,
  341. ):
  342. """Copies a file in S3
  343. :type copy_source: dict
  344. :param copy_source: The name of the source bucket, key name of the
  345. source object, and optional version ID of the source object. The
  346. dictionary format is:
  347. ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
  348. that the ``VersionId`` key is optional and may be omitted.
  349. :type bucket: str
  350. :param bucket: The name of the bucket to copy to
  351. :type key: str
  352. :param key: The name of the key to copy to
  353. :type extra_args: dict
  354. :param extra_args: Extra arguments that may be passed to the
  355. client operation
  356. :type subscribers: a list of subscribers
  357. :param subscribers: The list of subscribers to be invoked in the
  358. order provided based on the event emit during the process of
  359. the transfer request.
  360. :type source_client: botocore or boto3 Client
  361. :param source_client: The client to be used for operation that
  362. may happen at the source object. For example, this client is
  363. used for the head_object that determines the size of the copy.
  364. If no client is provided, the transfer manager's client is used
  365. as the client for the source object.
  366. :rtype: s3transfer.futures.TransferFuture
  367. :returns: Transfer future representing the copy
  368. """
  369. if extra_args is None:
  370. extra_args = {}
  371. if subscribers is None:
  372. subscribers = []
  373. if source_client is None:
  374. source_client = self._client
  375. self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
  376. if isinstance(copy_source, dict):
  377. self._validate_if_bucket_supported(copy_source.get('Bucket'))
  378. self._validate_if_bucket_supported(bucket)
  379. call_args = CallArgs(
  380. copy_source=copy_source,
  381. bucket=bucket,
  382. key=key,
  383. extra_args=extra_args,
  384. subscribers=subscribers,
  385. source_client=source_client,
  386. )
  387. return self._submit_transfer(call_args, CopySubmissionTask)
  388. def delete(self, bucket, key, extra_args=None, subscribers=None):
  389. """Delete an S3 object.
  390. :type bucket: str
  391. :param bucket: The name of the bucket.
  392. :type key: str
  393. :param key: The name of the S3 object to delete.
  394. :type extra_args: dict
  395. :param extra_args: Extra arguments that may be passed to the
  396. DeleteObject call.
  397. :type subscribers: list
  398. :param subscribers: A list of subscribers to be invoked during the
  399. process of the transfer request. Note that the ``on_progress``
  400. callback is not invoked during object deletion.
  401. :rtype: s3transfer.futures.TransferFuture
  402. :return: Transfer future representing the deletion.
  403. """
  404. if extra_args is None:
  405. extra_args = {}
  406. if subscribers is None:
  407. subscribers = []
  408. self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
  409. self._validate_if_bucket_supported(bucket)
  410. call_args = CallArgs(
  411. bucket=bucket,
  412. key=key,
  413. extra_args=extra_args,
  414. subscribers=subscribers,
  415. )
  416. return self._submit_transfer(call_args, DeleteSubmissionTask)
  417. def _validate_if_bucket_supported(self, bucket):
  418. # s3 high level operations don't support some resources
  419. # (eg. S3 Object Lambda) only direct API calls are available
  420. # for such resources
  421. if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
  422. for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
  423. match = pattern.match(bucket)
  424. if match:
  425. raise ValueError(
  426. 'TransferManager methods do not support %s '
  427. 'resource. Use direct client calls instead.' % resource
  428. )
  429. def _validate_all_known_args(self, actual, allowed):
  430. for kwarg in actual:
  431. if kwarg not in allowed:
  432. raise ValueError(
  433. "Invalid extra_args key '%s', "
  434. "must be one of: %s" % (kwarg, ', '.join(allowed))
  435. )
  436. def _submit_transfer(
  437. self, call_args, submission_task_cls, extra_main_kwargs=None
  438. ):
  439. if not extra_main_kwargs:
  440. extra_main_kwargs = {}
  441. # Create a TransferFuture to return back to the user
  442. transfer_future, components = self._get_future_with_components(
  443. call_args
  444. )
  445. # Add any provided done callbacks to the created transfer future
  446. # to be invoked on the transfer future being complete.
  447. for callback in get_callbacks(transfer_future, 'done'):
  448. components['coordinator'].add_done_callback(callback)
  449. # Get the main kwargs needed to instantiate the submission task
  450. main_kwargs = self._get_submission_task_main_kwargs(
  451. transfer_future, extra_main_kwargs
  452. )
  453. # Submit a SubmissionTask that will submit all of the necessary
  454. # tasks needed to complete the S3 transfer.
  455. self._submission_executor.submit(
  456. submission_task_cls(
  457. transfer_coordinator=components['coordinator'],
  458. main_kwargs=main_kwargs,
  459. )
  460. )
  461. # Increment the unique id counter for future transfer requests
  462. self._id_counter += 1
  463. return transfer_future
  464. def _get_future_with_components(self, call_args):
  465. transfer_id = self._id_counter
  466. # Creates a new transfer future along with its components
  467. transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
  468. # Track the transfer coordinator for transfers to manage.
  469. self._coordinator_controller.add_transfer_coordinator(
  470. transfer_coordinator
  471. )
  472. # Also make sure that the transfer coordinator is removed once
  473. # the transfer completes so it does not stick around in memory.
  474. transfer_coordinator.add_done_callback(
  475. self._coordinator_controller.remove_transfer_coordinator,
  476. transfer_coordinator,
  477. )
  478. components = {
  479. 'meta': TransferMeta(call_args, transfer_id=transfer_id),
  480. 'coordinator': transfer_coordinator,
  481. }
  482. transfer_future = TransferFuture(**components)
  483. return transfer_future, components
  484. def _get_submission_task_main_kwargs(
  485. self, transfer_future, extra_main_kwargs
  486. ):
  487. main_kwargs = {
  488. 'client': self._client,
  489. 'config': self._config,
  490. 'osutil': self._osutil,
  491. 'request_executor': self._request_executor,
  492. 'transfer_future': transfer_future,
  493. }
  494. main_kwargs.update(extra_main_kwargs)
  495. return main_kwargs
  496. def _register_handlers(self):
  497. # Register handlers to enable/disable callbacks on uploads.
  498. event_name = 'request-created.s3'
  499. self._client.meta.events.register_first(
  500. event_name,
  501. signal_not_transferring,
  502. unique_id='s3upload-not-transferring',
  503. )
  504. self._client.meta.events.register_last(
  505. event_name, signal_transferring, unique_id='s3upload-transferring'
  506. )
  507. def __enter__(self):
  508. return self
  509. def __exit__(self, exc_type, exc_value, *args):
  510. cancel = False
  511. cancel_msg = ''
  512. cancel_exc_type = FatalError
  513. # If a exception was raised in the context handler, signal to cancel
  514. # all of the inprogress futures in the shutdown.
  515. if exc_type:
  516. cancel = True
  517. cancel_msg = str(exc_value)
  518. if not cancel_msg:
  519. cancel_msg = repr(exc_value)
  520. # If it was a KeyboardInterrupt, the cancellation was initiated
  521. # by the user.
  522. if isinstance(exc_value, KeyboardInterrupt):
  523. cancel_exc_type = CancelledError
  524. self._shutdown(cancel, cancel_msg, cancel_exc_type)
  525. def shutdown(self, cancel=False, cancel_msg=''):
  526. """Shutdown the TransferManager
  527. It will wait till all transfers complete before it completely shuts
  528. down.
  529. :type cancel: boolean
  530. :param cancel: If True, calls TransferFuture.cancel() for
  531. all in-progress in transfers. This is useful if you want the
  532. shutdown to happen quicker.
  533. :type cancel_msg: str
  534. :param cancel_msg: The message to specify if canceling all in-progress
  535. transfers.
  536. """
  537. self._shutdown(cancel, cancel, cancel_msg)
  538. def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
  539. if cancel:
  540. # Cancel all in-flight transfers if requested, before waiting
  541. # for them to complete.
  542. self._coordinator_controller.cancel(cancel_msg, exc_type)
  543. try:
  544. # Wait until there are no more in-progress transfers. This is
  545. # wrapped in a try statement because this can be interrupted
  546. # with a KeyboardInterrupt that needs to be caught.
  547. self._coordinator_controller.wait()
  548. except KeyboardInterrupt:
  549. # If not errors were raised in the try block, the cancel should
  550. # have no coordinators it needs to run cancel on. If there was
  551. # an error raised in the try statement we want to cancel all of
  552. # the inflight transfers before shutting down to speed that
  553. # process up.
  554. self._coordinator_controller.cancel('KeyboardInterrupt()')
  555. raise
  556. finally:
  557. # Shutdown all of the executors.
  558. self._submission_executor.shutdown()
  559. self._request_executor.shutdown()
  560. self._io_executor.shutdown()
  561. class TransferCoordinatorController:
  562. def __init__(self):
  563. """Abstraction to control all transfer coordinators
  564. This abstraction allows the manager to wait for inprogress transfers
  565. to complete and cancel all inprogress transfers.
  566. """
  567. self._lock = threading.Lock()
  568. self._tracked_transfer_coordinators = set()
  569. @property
  570. def tracked_transfer_coordinators(self):
  571. """The set of transfer coordinators being tracked"""
  572. with self._lock:
  573. # We return a copy because the set is mutable and if you were to
  574. # iterate over the set, it may be changing in length due to
  575. # additions and removals of transfer coordinators.
  576. return copy.copy(self._tracked_transfer_coordinators)
  577. def add_transfer_coordinator(self, transfer_coordinator):
  578. """Adds a transfer coordinator of a transfer to be canceled if needed
  579. :type transfer_coordinator: s3transfer.futures.TransferCoordinator
  580. :param transfer_coordinator: The transfer coordinator for the
  581. particular transfer
  582. """
  583. with self._lock:
  584. self._tracked_transfer_coordinators.add(transfer_coordinator)
  585. def remove_transfer_coordinator(self, transfer_coordinator):
  586. """Remove a transfer coordinator from cancellation consideration
  587. Typically, this method is invoked by the transfer coordinator itself
  588. to remove its self when it completes its transfer.
  589. :type transfer_coordinator: s3transfer.futures.TransferCoordinator
  590. :param transfer_coordinator: The transfer coordinator for the
  591. particular transfer
  592. """
  593. with self._lock:
  594. self._tracked_transfer_coordinators.remove(transfer_coordinator)
  595. def cancel(self, msg='', exc_type=CancelledError):
  596. """Cancels all inprogress transfers
  597. This cancels the inprogress transfers by calling cancel() on all
  598. tracked transfer coordinators.
  599. :param msg: The message to pass on to each transfer coordinator that
  600. gets cancelled.
  601. :param exc_type: The type of exception to set for the cancellation
  602. """
  603. for transfer_coordinator in self.tracked_transfer_coordinators:
  604. transfer_coordinator.cancel(msg, exc_type)
  605. def wait(self):
  606. """Wait until there are no more inprogress transfers
  607. This will not stop when failures are encountered and not propagate any
  608. of these errors from failed transfers, but it can be interrupted with
  609. a KeyboardInterrupt.
  610. """
  611. try:
  612. transfer_coordinator = None
  613. for transfer_coordinator in self.tracked_transfer_coordinators:
  614. transfer_coordinator.result()
  615. except KeyboardInterrupt:
  616. logger.debug('Received KeyboardInterrupt in wait()')
  617. # If Keyboard interrupt is raised while waiting for
  618. # the result, then exit out of the wait and raise the
  619. # exception
  620. if transfer_coordinator:
  621. logger.debug(
  622. 'On KeyboardInterrupt was waiting for %s',
  623. transfer_coordinator,
  624. )
  625. raise
  626. except Exception:
  627. # A general exception could have been thrown because
  628. # of result(). We just want to ignore this and continue
  629. # because we at least know that the transfer coordinator
  630. # has completed.
  631. pass