123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731 |
- # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://aws.amazon.com/apache2.0/
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import copy
- import logging
- import re
- import threading
- from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket
- from s3transfer.constants import ALLOWED_DOWNLOAD_ARGS, KB, MB
- from s3transfer.copies import CopySubmissionTask
- from s3transfer.delete import DeleteSubmissionTask
- from s3transfer.download import DownloadSubmissionTask
- from s3transfer.exceptions import CancelledError, FatalError
- from s3transfer.futures import (
- IN_MEMORY_DOWNLOAD_TAG,
- IN_MEMORY_UPLOAD_TAG,
- BoundedExecutor,
- TransferCoordinator,
- TransferFuture,
- TransferMeta,
- )
- from s3transfer.upload import UploadSubmissionTask
- from s3transfer.utils import (
- CallArgs,
- OSUtils,
- SlidingWindowSemaphore,
- TaskSemaphore,
- get_callbacks,
- signal_not_transferring,
- signal_transferring,
- )
- logger = logging.getLogger(__name__)
- class TransferConfig:
- def __init__(
- self,
- multipart_threshold=8 * MB,
- multipart_chunksize=8 * MB,
- max_request_concurrency=10,
- max_submission_concurrency=5,
- max_request_queue_size=1000,
- max_submission_queue_size=1000,
- max_io_queue_size=1000,
- io_chunksize=256 * KB,
- num_download_attempts=5,
- max_in_memory_upload_chunks=10,
- max_in_memory_download_chunks=10,
- max_bandwidth=None,
- ):
- """Configurations for the transfer manager
- :param multipart_threshold: The threshold for which multipart
- transfers occur.
- :param max_request_concurrency: The maximum number of S3 API
- transfer-related requests that can happen at a time.
- :param max_submission_concurrency: The maximum number of threads
- processing a call to a TransferManager method. Processing a
- call usually entails determining which S3 API requests that need
- to be enqueued, but does **not** entail making any of the
- S3 API data transferring requests needed to perform the transfer.
- The threads controlled by ``max_request_concurrency`` is
- responsible for that.
- :param multipart_chunksize: The size of each transfer if a request
- becomes a multipart transfer.
- :param max_request_queue_size: The maximum amount of S3 API requests
- that can be queued at a time.
- :param max_submission_queue_size: The maximum amount of
- TransferManager method calls that can be queued at a time.
- :param max_io_queue_size: The maximum amount of read parts that
- can be queued to be written to disk per download. The default
- size for each elementin this queue is 8 KB.
- :param io_chunksize: The max size of each chunk in the io queue.
- Currently, this is size used when reading from the downloaded
- stream as well.
- :param num_download_attempts: The number of download attempts that
- will be tried upon errors with downloading an object in S3. Note
- that these retries account for errors that occur when streaming
- down the data from s3 (i.e. socket errors and read timeouts that
- occur after receiving an OK response from s3).
- Other retryable exceptions such as throttling errors and 5xx errors
- are already retried by botocore (this default is 5). The
- ``num_download_attempts`` does not take into account the
- number of exceptions retried by botocore.
- :param max_in_memory_upload_chunks: The number of chunks that can
- be stored in memory at a time for all ongoing upload requests.
- This pertains to chunks of data that need to be stored in memory
- during an upload if the data is sourced from a file-like object.
- The total maximum memory footprint due to a in-memory upload
- chunks is roughly equal to:
- max_in_memory_upload_chunks * multipart_chunksize
- + max_submission_concurrency * multipart_chunksize
- ``max_submission_concurrency`` has an affect on this value because
- for each thread pulling data off of a file-like object, they may
- be waiting with a single read chunk to be submitted for upload
- because the ``max_in_memory_upload_chunks`` value has been reached
- by the threads making the upload request.
- :param max_in_memory_download_chunks: The number of chunks that can
- be buffered in memory and **not** in the io queue at a time for all
- ongoing download requests. This pertains specifically to file-like
- objects that cannot be seeked. The total maximum memory footprint
- due to a in-memory download chunks is roughly equal to:
- max_in_memory_download_chunks * multipart_chunksize
- :param max_bandwidth: The maximum bandwidth that will be consumed
- in uploading and downloading file content. The value is in terms of
- bytes per second.
- """
- self.multipart_threshold = multipart_threshold
- self.multipart_chunksize = multipart_chunksize
- self.max_request_concurrency = max_request_concurrency
- self.max_submission_concurrency = max_submission_concurrency
- self.max_request_queue_size = max_request_queue_size
- self.max_submission_queue_size = max_submission_queue_size
- self.max_io_queue_size = max_io_queue_size
- self.io_chunksize = io_chunksize
- self.num_download_attempts = num_download_attempts
- self.max_in_memory_upload_chunks = max_in_memory_upload_chunks
- self.max_in_memory_download_chunks = max_in_memory_download_chunks
- self.max_bandwidth = max_bandwidth
- self._validate_attrs_are_nonzero()
- def _validate_attrs_are_nonzero(self):
- for attr, attr_val in self.__dict__.items():
- if attr_val is not None and attr_val <= 0:
- raise ValueError(
- 'Provided parameter %s of value %s must be greater than '
- '0.' % (attr, attr_val)
- )
- class TransferManager:
- ALLOWED_DOWNLOAD_ARGS = ALLOWED_DOWNLOAD_ARGS
- ALLOWED_UPLOAD_ARGS = [
- 'ACL',
- 'CacheControl',
- 'ChecksumAlgorithm',
- 'ContentDisposition',
- 'ContentEncoding',
- 'ContentLanguage',
- 'ContentType',
- 'ExpectedBucketOwner',
- 'Expires',
- 'GrantFullControl',
- 'GrantRead',
- 'GrantReadACP',
- 'GrantWriteACP',
- 'Metadata',
- 'ObjectLockLegalHoldStatus',
- 'ObjectLockMode',
- 'ObjectLockRetainUntilDate',
- 'RequestPayer',
- 'ServerSideEncryption',
- 'StorageClass',
- 'SSECustomerAlgorithm',
- 'SSECustomerKey',
- 'SSECustomerKeyMD5',
- 'SSEKMSKeyId',
- 'SSEKMSEncryptionContext',
- 'Tagging',
- 'WebsiteRedirectLocation',
- ]
- ALLOWED_COPY_ARGS = ALLOWED_UPLOAD_ARGS + [
- 'CopySourceIfMatch',
- 'CopySourceIfModifiedSince',
- 'CopySourceIfNoneMatch',
- 'CopySourceIfUnmodifiedSince',
- 'CopySourceSSECustomerAlgorithm',
- 'CopySourceSSECustomerKey',
- 'CopySourceSSECustomerKeyMD5',
- 'MetadataDirective',
- 'TaggingDirective',
- ]
- ALLOWED_DELETE_ARGS = [
- 'MFA',
- 'VersionId',
- 'RequestPayer',
- 'ExpectedBucketOwner',
- ]
- VALIDATE_SUPPORTED_BUCKET_VALUES = True
- _UNSUPPORTED_BUCKET_PATTERNS = {
- 'S3 Object Lambda': re.compile(
- r'^arn:(aws).*:s3-object-lambda:[a-z\-0-9]+:[0-9]{12}:'
- r'accesspoint[/:][a-zA-Z0-9\-]{1,63}'
- ),
- }
- def __init__(self, client, config=None, osutil=None, executor_cls=None):
- """A transfer manager interface for Amazon S3
- :param client: Client to be used by the manager
- :param config: TransferConfig to associate specific configurations
- :param osutil: OSUtils object to use for os-related behavior when
- using with transfer manager.
- :type executor_cls: s3transfer.futures.BaseExecutor
- :param executor_cls: The class of executor to use with the transfer
- manager. By default, concurrent.futures.ThreadPoolExecutor is used.
- """
- self._client = client
- self._config = config
- if config is None:
- self._config = TransferConfig()
- self._osutil = osutil
- if osutil is None:
- self._osutil = OSUtils()
- self._coordinator_controller = TransferCoordinatorController()
- # A counter to create unique id's for each transfer submitted.
- self._id_counter = 0
- # The executor responsible for making S3 API transfer requests
- self._request_executor = BoundedExecutor(
- max_size=self._config.max_request_queue_size,
- max_num_threads=self._config.max_request_concurrency,
- tag_semaphores={
- IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
- self._config.max_in_memory_upload_chunks
- ),
- IN_MEMORY_DOWNLOAD_TAG: SlidingWindowSemaphore(
- self._config.max_in_memory_download_chunks
- ),
- },
- executor_cls=executor_cls,
- )
- # The executor responsible for submitting the necessary tasks to
- # perform the desired transfer
- self._submission_executor = BoundedExecutor(
- max_size=self._config.max_submission_queue_size,
- max_num_threads=self._config.max_submission_concurrency,
- executor_cls=executor_cls,
- )
- # There is one thread available for writing to disk. It will handle
- # downloads for all files.
- self._io_executor = BoundedExecutor(
- max_size=self._config.max_io_queue_size,
- max_num_threads=1,
- executor_cls=executor_cls,
- )
- # The component responsible for limiting bandwidth usage if it
- # is configured.
- self._bandwidth_limiter = None
- if self._config.max_bandwidth is not None:
- logger.debug(
- 'Setting max_bandwidth to %s', self._config.max_bandwidth
- )
- leaky_bucket = LeakyBucket(self._config.max_bandwidth)
- self._bandwidth_limiter = BandwidthLimiter(leaky_bucket)
- self._register_handlers()
- @property
- def client(self):
- return self._client
- @property
- def config(self):
- return self._config
- def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
- """Uploads a file to S3
- :type fileobj: str or seekable file-like object
- :param fileobj: The name of a file to upload or a seekable file-like
- object to upload. It is recommended to use a filename because
- file-like objects may result in higher memory usage.
- :type bucket: str
- :param bucket: The name of the bucket to upload to
- :type key: str
- :param key: The name of the key to upload to
- :type extra_args: dict
- :param extra_args: Extra arguments that may be passed to the
- client operation
- :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
- :param subscribers: The list of subscribers to be invoked in the
- order provided based on the event emit during the process of
- the transfer request.
- :rtype: s3transfer.futures.TransferFuture
- :returns: Transfer future representing the upload
- """
- if extra_args is None:
- extra_args = {}
- if subscribers is None:
- subscribers = []
- self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
- self._validate_if_bucket_supported(bucket)
- call_args = CallArgs(
- fileobj=fileobj,
- bucket=bucket,
- key=key,
- extra_args=extra_args,
- subscribers=subscribers,
- )
- extra_main_kwargs = {}
- if self._bandwidth_limiter:
- extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
- return self._submit_transfer(
- call_args, UploadSubmissionTask, extra_main_kwargs
- )
- def download(
- self, bucket, key, fileobj, extra_args=None, subscribers=None
- ):
- """Downloads a file from S3
- :type bucket: str
- :param bucket: The name of the bucket to download from
- :type key: str
- :param key: The name of the key to download from
- :type fileobj: str or seekable file-like object
- :param fileobj: The name of a file to download or a seekable file-like
- object to download. It is recommended to use a filename because
- file-like objects may result in higher memory usage.
- :type extra_args: dict
- :param extra_args: Extra arguments that may be passed to the
- client operation
- :type subscribers: list(s3transfer.subscribers.BaseSubscriber)
- :param subscribers: The list of subscribers to be invoked in the
- order provided based on the event emit during the process of
- the transfer request.
- :rtype: s3transfer.futures.TransferFuture
- :returns: Transfer future representing the download
- """
- if extra_args is None:
- extra_args = {}
- if subscribers is None:
- subscribers = []
- self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
- self._validate_if_bucket_supported(bucket)
- call_args = CallArgs(
- bucket=bucket,
- key=key,
- fileobj=fileobj,
- extra_args=extra_args,
- subscribers=subscribers,
- )
- extra_main_kwargs = {'io_executor': self._io_executor}
- if self._bandwidth_limiter:
- extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
- return self._submit_transfer(
- call_args, DownloadSubmissionTask, extra_main_kwargs
- )
- def copy(
- self,
- copy_source,
- bucket,
- key,
- extra_args=None,
- subscribers=None,
- source_client=None,
- ):
- """Copies a file in S3
- :type copy_source: dict
- :param copy_source: The name of the source bucket, key name of the
- source object, and optional version ID of the source object. The
- dictionary format is:
- ``{'Bucket': 'bucket', 'Key': 'key', 'VersionId': 'id'}``. Note
- that the ``VersionId`` key is optional and may be omitted.
- :type bucket: str
- :param bucket: The name of the bucket to copy to
- :type key: str
- :param key: The name of the key to copy to
- :type extra_args: dict
- :param extra_args: Extra arguments that may be passed to the
- client operation
- :type subscribers: a list of subscribers
- :param subscribers: The list of subscribers to be invoked in the
- order provided based on the event emit during the process of
- the transfer request.
- :type source_client: botocore or boto3 Client
- :param source_client: The client to be used for operation that
- may happen at the source object. For example, this client is
- used for the head_object that determines the size of the copy.
- If no client is provided, the transfer manager's client is used
- as the client for the source object.
- :rtype: s3transfer.futures.TransferFuture
- :returns: Transfer future representing the copy
- """
- if extra_args is None:
- extra_args = {}
- if subscribers is None:
- subscribers = []
- if source_client is None:
- source_client = self._client
- self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
- if isinstance(copy_source, dict):
- self._validate_if_bucket_supported(copy_source.get('Bucket'))
- self._validate_if_bucket_supported(bucket)
- call_args = CallArgs(
- copy_source=copy_source,
- bucket=bucket,
- key=key,
- extra_args=extra_args,
- subscribers=subscribers,
- source_client=source_client,
- )
- return self._submit_transfer(call_args, CopySubmissionTask)
- def delete(self, bucket, key, extra_args=None, subscribers=None):
- """Delete an S3 object.
- :type bucket: str
- :param bucket: The name of the bucket.
- :type key: str
- :param key: The name of the S3 object to delete.
- :type extra_args: dict
- :param extra_args: Extra arguments that may be passed to the
- DeleteObject call.
- :type subscribers: list
- :param subscribers: A list of subscribers to be invoked during the
- process of the transfer request. Note that the ``on_progress``
- callback is not invoked during object deletion.
- :rtype: s3transfer.futures.TransferFuture
- :return: Transfer future representing the deletion.
- """
- if extra_args is None:
- extra_args = {}
- if subscribers is None:
- subscribers = []
- self._validate_all_known_args(extra_args, self.ALLOWED_DELETE_ARGS)
- self._validate_if_bucket_supported(bucket)
- call_args = CallArgs(
- bucket=bucket,
- key=key,
- extra_args=extra_args,
- subscribers=subscribers,
- )
- return self._submit_transfer(call_args, DeleteSubmissionTask)
- def _validate_if_bucket_supported(self, bucket):
- # s3 high level operations don't support some resources
- # (eg. S3 Object Lambda) only direct API calls are available
- # for such resources
- if self.VALIDATE_SUPPORTED_BUCKET_VALUES:
- for resource, pattern in self._UNSUPPORTED_BUCKET_PATTERNS.items():
- match = pattern.match(bucket)
- if match:
- raise ValueError(
- 'TransferManager methods do not support %s '
- 'resource. Use direct client calls instead.' % resource
- )
- def _validate_all_known_args(self, actual, allowed):
- for kwarg in actual:
- if kwarg not in allowed:
- raise ValueError(
- "Invalid extra_args key '%s', "
- "must be one of: %s" % (kwarg, ', '.join(allowed))
- )
- def _submit_transfer(
- self, call_args, submission_task_cls, extra_main_kwargs=None
- ):
- if not extra_main_kwargs:
- extra_main_kwargs = {}
- # Create a TransferFuture to return back to the user
- transfer_future, components = self._get_future_with_components(
- call_args
- )
- # Add any provided done callbacks to the created transfer future
- # to be invoked on the transfer future being complete.
- for callback in get_callbacks(transfer_future, 'done'):
- components['coordinator'].add_done_callback(callback)
- # Get the main kwargs needed to instantiate the submission task
- main_kwargs = self._get_submission_task_main_kwargs(
- transfer_future, extra_main_kwargs
- )
- # Submit a SubmissionTask that will submit all of the necessary
- # tasks needed to complete the S3 transfer.
- self._submission_executor.submit(
- submission_task_cls(
- transfer_coordinator=components['coordinator'],
- main_kwargs=main_kwargs,
- )
- )
- # Increment the unique id counter for future transfer requests
- self._id_counter += 1
- return transfer_future
- def _get_future_with_components(self, call_args):
- transfer_id = self._id_counter
- # Creates a new transfer future along with its components
- transfer_coordinator = TransferCoordinator(transfer_id=transfer_id)
- # Track the transfer coordinator for transfers to manage.
- self._coordinator_controller.add_transfer_coordinator(
- transfer_coordinator
- )
- # Also make sure that the transfer coordinator is removed once
- # the transfer completes so it does not stick around in memory.
- transfer_coordinator.add_done_callback(
- self._coordinator_controller.remove_transfer_coordinator,
- transfer_coordinator,
- )
- components = {
- 'meta': TransferMeta(call_args, transfer_id=transfer_id),
- 'coordinator': transfer_coordinator,
- }
- transfer_future = TransferFuture(**components)
- return transfer_future, components
- def _get_submission_task_main_kwargs(
- self, transfer_future, extra_main_kwargs
- ):
- main_kwargs = {
- 'client': self._client,
- 'config': self._config,
- 'osutil': self._osutil,
- 'request_executor': self._request_executor,
- 'transfer_future': transfer_future,
- }
- main_kwargs.update(extra_main_kwargs)
- return main_kwargs
- def _register_handlers(self):
- # Register handlers to enable/disable callbacks on uploads.
- event_name = 'request-created.s3'
- self._client.meta.events.register_first(
- event_name,
- signal_not_transferring,
- unique_id='s3upload-not-transferring',
- )
- self._client.meta.events.register_last(
- event_name, signal_transferring, unique_id='s3upload-transferring'
- )
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_value, *args):
- cancel = False
- cancel_msg = ''
- cancel_exc_type = FatalError
- # If a exception was raised in the context handler, signal to cancel
- # all of the inprogress futures in the shutdown.
- if exc_type:
- cancel = True
- cancel_msg = str(exc_value)
- if not cancel_msg:
- cancel_msg = repr(exc_value)
- # If it was a KeyboardInterrupt, the cancellation was initiated
- # by the user.
- if isinstance(exc_value, KeyboardInterrupt):
- cancel_exc_type = CancelledError
- self._shutdown(cancel, cancel_msg, cancel_exc_type)
- def shutdown(self, cancel=False, cancel_msg=''):
- """Shutdown the TransferManager
- It will wait till all transfers complete before it completely shuts
- down.
- :type cancel: boolean
- :param cancel: If True, calls TransferFuture.cancel() for
- all in-progress in transfers. This is useful if you want the
- shutdown to happen quicker.
- :type cancel_msg: str
- :param cancel_msg: The message to specify if canceling all in-progress
- transfers.
- """
- self._shutdown(cancel, cancel, cancel_msg)
- def _shutdown(self, cancel, cancel_msg, exc_type=CancelledError):
- if cancel:
- # Cancel all in-flight transfers if requested, before waiting
- # for them to complete.
- self._coordinator_controller.cancel(cancel_msg, exc_type)
- try:
- # Wait until there are no more in-progress transfers. This is
- # wrapped in a try statement because this can be interrupted
- # with a KeyboardInterrupt that needs to be caught.
- self._coordinator_controller.wait()
- except KeyboardInterrupt:
- # If not errors were raised in the try block, the cancel should
- # have no coordinators it needs to run cancel on. If there was
- # an error raised in the try statement we want to cancel all of
- # the inflight transfers before shutting down to speed that
- # process up.
- self._coordinator_controller.cancel('KeyboardInterrupt()')
- raise
- finally:
- # Shutdown all of the executors.
- self._submission_executor.shutdown()
- self._request_executor.shutdown()
- self._io_executor.shutdown()
- class TransferCoordinatorController:
- def __init__(self):
- """Abstraction to control all transfer coordinators
- This abstraction allows the manager to wait for inprogress transfers
- to complete and cancel all inprogress transfers.
- """
- self._lock = threading.Lock()
- self._tracked_transfer_coordinators = set()
- @property
- def tracked_transfer_coordinators(self):
- """The set of transfer coordinators being tracked"""
- with self._lock:
- # We return a copy because the set is mutable and if you were to
- # iterate over the set, it may be changing in length due to
- # additions and removals of transfer coordinators.
- return copy.copy(self._tracked_transfer_coordinators)
- def add_transfer_coordinator(self, transfer_coordinator):
- """Adds a transfer coordinator of a transfer to be canceled if needed
- :type transfer_coordinator: s3transfer.futures.TransferCoordinator
- :param transfer_coordinator: The transfer coordinator for the
- particular transfer
- """
- with self._lock:
- self._tracked_transfer_coordinators.add(transfer_coordinator)
- def remove_transfer_coordinator(self, transfer_coordinator):
- """Remove a transfer coordinator from cancellation consideration
- Typically, this method is invoked by the transfer coordinator itself
- to remove its self when it completes its transfer.
- :type transfer_coordinator: s3transfer.futures.TransferCoordinator
- :param transfer_coordinator: The transfer coordinator for the
- particular transfer
- """
- with self._lock:
- self._tracked_transfer_coordinators.remove(transfer_coordinator)
- def cancel(self, msg='', exc_type=CancelledError):
- """Cancels all inprogress transfers
- This cancels the inprogress transfers by calling cancel() on all
- tracked transfer coordinators.
- :param msg: The message to pass on to each transfer coordinator that
- gets cancelled.
- :param exc_type: The type of exception to set for the cancellation
- """
- for transfer_coordinator in self.tracked_transfer_coordinators:
- transfer_coordinator.cancel(msg, exc_type)
- def wait(self):
- """Wait until there are no more inprogress transfers
- This will not stop when failures are encountered and not propagate any
- of these errors from failed transfers, but it can be interrupted with
- a KeyboardInterrupt.
- """
- try:
- transfer_coordinator = None
- for transfer_coordinator in self.tracked_transfer_coordinators:
- transfer_coordinator.result()
- except KeyboardInterrupt:
- logger.debug('Received KeyboardInterrupt in wait()')
- # If Keyboard interrupt is raised while waiting for
- # the result, then exit out of the wait and raise the
- # exception
- if transfer_coordinator:
- logger.debug(
- 'On KeyboardInterrupt was waiting for %s',
- transfer_coordinator,
- )
- raise
- except Exception:
- # A general exception could have been thrown because
- # of result(). We just want to ignore this and continue
- # because we at least know that the transfer coordinator
- # has completed.
- pass
|