123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875 |
- # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://aws.amazon.com/apache2.0/
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- """Abstractions over S3's upload/download operations.
- This module provides high level abstractions for efficient
- uploads/downloads. It handles several things for the user:
- * Automatically switching to multipart transfers when
- a file is over a specific size threshold
- * Uploading/downloading a file in parallel
- * Throttling based on max bandwidth
- * Progress callbacks to monitor transfers
- * Retries. While botocore handles retries for streaming uploads,
- it is not possible for it to handle retries for streaming
- downloads. This module handles retries for both cases so
- you don't need to implement any retry logic yourself.
- This module has a reasonable set of defaults. It also allows you
- to configure many aspects of the transfer process including:
- * Multipart threshold size
- * Max parallel downloads
- * Max bandwidth
- * Socket timeouts
- * Retry amounts
- There is no support for s3->s3 multipart copies at this
- time.
- .. _ref_s3transfer_usage:
- Usage
- =====
- The simplest way to use this module is:
- .. code-block:: python
- client = boto3.client('s3', 'us-west-2')
- transfer = S3Transfer(client)
- # Upload /tmp/myfile to s3://bucket/key
- transfer.upload_file('/tmp/myfile', 'bucket', 'key')
- # Download s3://bucket/key to /tmp/myfile
- transfer.download_file('bucket', 'key', '/tmp/myfile')
- The ``upload_file`` and ``download_file`` methods also accept
- ``**kwargs``, which will be forwarded through to the corresponding
- client operation. Here are a few examples using ``upload_file``::
- # Making the object public
- transfer.upload_file('/tmp/myfile', 'bucket', 'key',
- extra_args={'ACL': 'public-read'})
- # Setting metadata
- transfer.upload_file('/tmp/myfile', 'bucket', 'key',
- extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
- # Setting content type
- transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
- extra_args={'ContentType': "application/json"})
- The ``S3Transfer`` class also supports progress callbacks so you can
- provide transfer progress to users. Both the ``upload_file`` and
- ``download_file`` methods take an optional ``callback`` parameter.
- Here's an example of how to print a simple progress percentage
- to the user:
- .. code-block:: python
- class ProgressPercentage(object):
- def __init__(self, filename):
- self._filename = filename
- self._size = float(os.path.getsize(filename))
- self._seen_so_far = 0
- self._lock = threading.Lock()
- def __call__(self, bytes_amount):
- # To simplify we'll assume this is hooked up
- # to a single filename.
- with self._lock:
- self._seen_so_far += bytes_amount
- percentage = (self._seen_so_far / self._size) * 100
- sys.stdout.write(
- "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
- self._size, percentage))
- sys.stdout.flush()
- transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
- # Upload /tmp/myfile to s3://bucket/key and print upload progress.
- transfer.upload_file('/tmp/myfile', 'bucket', 'key',
- callback=ProgressPercentage('/tmp/myfile'))
- You can also provide a TransferConfig object to the S3Transfer
- object that gives you more fine grained control over the
- transfer. For example:
- .. code-block:: python
- client = boto3.client('s3', 'us-west-2')
- config = TransferConfig(
- multipart_threshold=8 * 1024 * 1024,
- max_concurrency=10,
- num_download_attempts=10,
- )
- transfer = S3Transfer(client, config)
- transfer.upload_file('/tmp/foo', 'bucket', 'key')
- """
- import concurrent.futures
- import functools
- import logging
- import math
- import os
- import queue
- import random
- import socket
- import string
- import threading
- from botocore.compat import six # noqa: F401
- from botocore.exceptions import IncompleteReadError
- from botocore.vendored.requests.packages.urllib3.exceptions import (
- ReadTimeoutError,
- )
- import s3transfer.compat
- from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
- __author__ = 'Amazon Web Services'
- __version__ = '0.6.0'
- class NullHandler(logging.Handler):
- def emit(self, record):
- pass
- logger = logging.getLogger(__name__)
- logger.addHandler(NullHandler())
- MB = 1024 * 1024
- SHUTDOWN_SENTINEL = object()
- def random_file_extension(num_digits=8):
- return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
- def disable_upload_callbacks(request, operation_name, **kwargs):
- if operation_name in ['PutObject', 'UploadPart'] and hasattr(
- request.body, 'disable_callback'
- ):
- request.body.disable_callback()
- def enable_upload_callbacks(request, operation_name, **kwargs):
- if operation_name in ['PutObject', 'UploadPart'] and hasattr(
- request.body, 'enable_callback'
- ):
- request.body.enable_callback()
- class QueueShutdownError(Exception):
- pass
- class ReadFileChunk:
- def __init__(
- self,
- fileobj,
- start_byte,
- chunk_size,
- full_file_size,
- callback=None,
- enable_callback=True,
- ):
- """
- Given a file object shown below:
- |___________________________________________________|
- 0 | | full_file_size
- |----chunk_size---|
- start_byte
- :type fileobj: file
- :param fileobj: File like object
- :type start_byte: int
- :param start_byte: The first byte from which to start reading.
- :type chunk_size: int
- :param chunk_size: The max chunk size to read. Trying to read
- pass the end of the chunk size will behave like you've
- reached the end of the file.
- :type full_file_size: int
- :param full_file_size: The entire content length associated
- with ``fileobj``.
- :type callback: function(amount_read)
- :param callback: Called whenever data is read from this object.
- """
- self._fileobj = fileobj
- self._start_byte = start_byte
- self._size = self._calculate_file_size(
- self._fileobj,
- requested_size=chunk_size,
- start_byte=start_byte,
- actual_file_size=full_file_size,
- )
- self._fileobj.seek(self._start_byte)
- self._amount_read = 0
- self._callback = callback
- self._callback_enabled = enable_callback
- @classmethod
- def from_filename(
- cls,
- filename,
- start_byte,
- chunk_size,
- callback=None,
- enable_callback=True,
- ):
- """Convenience factory function to create from a filename.
- :type start_byte: int
- :param start_byte: The first byte from which to start reading.
- :type chunk_size: int
- :param chunk_size: The max chunk size to read. Trying to read
- pass the end of the chunk size will behave like you've
- reached the end of the file.
- :type full_file_size: int
- :param full_file_size: The entire content length associated
- with ``fileobj``.
- :type callback: function(amount_read)
- :param callback: Called whenever data is read from this object.
- :type enable_callback: bool
- :param enable_callback: Indicate whether to invoke callback
- during read() calls.
- :rtype: ``ReadFileChunk``
- :return: A new instance of ``ReadFileChunk``
- """
- f = open(filename, 'rb')
- file_size = os.fstat(f.fileno()).st_size
- return cls(
- f, start_byte, chunk_size, file_size, callback, enable_callback
- )
- def _calculate_file_size(
- self, fileobj, requested_size, start_byte, actual_file_size
- ):
- max_chunk_size = actual_file_size - start_byte
- return min(max_chunk_size, requested_size)
- def read(self, amount=None):
- if amount is None:
- amount_to_read = self._size - self._amount_read
- else:
- amount_to_read = min(self._size - self._amount_read, amount)
- data = self._fileobj.read(amount_to_read)
- self._amount_read += len(data)
- if self._callback is not None and self._callback_enabled:
- self._callback(len(data))
- return data
- def enable_callback(self):
- self._callback_enabled = True
- def disable_callback(self):
- self._callback_enabled = False
- def seek(self, where):
- self._fileobj.seek(self._start_byte + where)
- if self._callback is not None and self._callback_enabled:
- # To also rewind the callback() for an accurate progress report
- self._callback(where - self._amount_read)
- self._amount_read = where
- def close(self):
- self._fileobj.close()
- def tell(self):
- return self._amount_read
- def __len__(self):
- # __len__ is defined because requests will try to determine the length
- # of the stream to set a content length. In the normal case
- # of the file it will just stat the file, but we need to change that
- # behavior. By providing a __len__, requests will use that instead
- # of stat'ing the file.
- return self._size
- def __enter__(self):
- return self
- def __exit__(self, *args, **kwargs):
- self.close()
- def __iter__(self):
- # This is a workaround for http://bugs.python.org/issue17575
- # Basically httplib will try to iterate over the contents, even
- # if its a file like object. This wasn't noticed because we've
- # already exhausted the stream so iterating over the file immediately
- # stops, which is what we're simulating here.
- return iter([])
- class StreamReaderProgress:
- """Wrapper for a read only stream that adds progress callbacks."""
- def __init__(self, stream, callback=None):
- self._stream = stream
- self._callback = callback
- def read(self, *args, **kwargs):
- value = self._stream.read(*args, **kwargs)
- if self._callback is not None:
- self._callback(len(value))
- return value
- class OSUtils:
- def get_file_size(self, filename):
- return os.path.getsize(filename)
- def open_file_chunk_reader(self, filename, start_byte, size, callback):
- return ReadFileChunk.from_filename(
- filename, start_byte, size, callback, enable_callback=False
- )
- def open(self, filename, mode):
- return open(filename, mode)
- def remove_file(self, filename):
- """Remove a file, noop if file does not exist."""
- # Unlike os.remove, if the file does not exist,
- # then this method does nothing.
- try:
- os.remove(filename)
- except OSError:
- pass
- def rename_file(self, current_filename, new_filename):
- s3transfer.compat.rename_file(current_filename, new_filename)
- class MultipartUploader:
- # These are the extra_args that need to be forwarded onto
- # subsequent upload_parts.
- UPLOAD_PART_ARGS = [
- 'SSECustomerKey',
- 'SSECustomerAlgorithm',
- 'SSECustomerKeyMD5',
- 'RequestPayer',
- ]
- def __init__(
- self,
- client,
- config,
- osutil,
- executor_cls=concurrent.futures.ThreadPoolExecutor,
- ):
- self._client = client
- self._config = config
- self._os = osutil
- self._executor_cls = executor_cls
- def _extra_upload_part_args(self, extra_args):
- # Only the args in UPLOAD_PART_ARGS actually need to be passed
- # onto the upload_part calls.
- upload_parts_args = {}
- for key, value in extra_args.items():
- if key in self.UPLOAD_PART_ARGS:
- upload_parts_args[key] = value
- return upload_parts_args
- def upload_file(self, filename, bucket, key, callback, extra_args):
- response = self._client.create_multipart_upload(
- Bucket=bucket, Key=key, **extra_args
- )
- upload_id = response['UploadId']
- try:
- parts = self._upload_parts(
- upload_id, filename, bucket, key, callback, extra_args
- )
- except Exception as e:
- logger.debug(
- "Exception raised while uploading parts, "
- "aborting multipart upload.",
- exc_info=True,
- )
- self._client.abort_multipart_upload(
- Bucket=bucket, Key=key, UploadId=upload_id
- )
- raise S3UploadFailedError(
- "Failed to upload {} to {}: {}".format(
- filename, '/'.join([bucket, key]), e
- )
- )
- self._client.complete_multipart_upload(
- Bucket=bucket,
- Key=key,
- UploadId=upload_id,
- MultipartUpload={'Parts': parts},
- )
- def _upload_parts(
- self, upload_id, filename, bucket, key, callback, extra_args
- ):
- upload_parts_extra_args = self._extra_upload_part_args(extra_args)
- parts = []
- part_size = self._config.multipart_chunksize
- num_parts = int(
- math.ceil(self._os.get_file_size(filename) / float(part_size))
- )
- max_workers = self._config.max_concurrency
- with self._executor_cls(max_workers=max_workers) as executor:
- upload_partial = functools.partial(
- self._upload_one_part,
- filename,
- bucket,
- key,
- upload_id,
- part_size,
- upload_parts_extra_args,
- callback,
- )
- for part in executor.map(upload_partial, range(1, num_parts + 1)):
- parts.append(part)
- return parts
- def _upload_one_part(
- self,
- filename,
- bucket,
- key,
- upload_id,
- part_size,
- extra_args,
- callback,
- part_number,
- ):
- open_chunk_reader = self._os.open_file_chunk_reader
- with open_chunk_reader(
- filename, part_size * (part_number - 1), part_size, callback
- ) as body:
- response = self._client.upload_part(
- Bucket=bucket,
- Key=key,
- UploadId=upload_id,
- PartNumber=part_number,
- Body=body,
- **extra_args,
- )
- etag = response['ETag']
- return {'ETag': etag, 'PartNumber': part_number}
- class ShutdownQueue(queue.Queue):
- """A queue implementation that can be shutdown.
- Shutting down a queue means that this class adds a
- trigger_shutdown method that will trigger all subsequent
- calls to put() to fail with a ``QueueShutdownError``.
- It purposefully deviates from queue.Queue, and is *not* meant
- to be a drop in replacement for ``queue.Queue``.
- """
- def _init(self, maxsize):
- self._shutdown = False
- self._shutdown_lock = threading.Lock()
- # queue.Queue is an old style class so we don't use super().
- return queue.Queue._init(self, maxsize)
- def trigger_shutdown(self):
- with self._shutdown_lock:
- self._shutdown = True
- logger.debug("The IO queue is now shutdown.")
- def put(self, item):
- # Note: this is not sufficient, it's still possible to deadlock!
- # Need to hook into the condition vars used by this class.
- with self._shutdown_lock:
- if self._shutdown:
- raise QueueShutdownError(
- "Cannot put item to queue when " "queue has been shutdown."
- )
- return queue.Queue.put(self, item)
- class MultipartDownloader:
- def __init__(
- self,
- client,
- config,
- osutil,
- executor_cls=concurrent.futures.ThreadPoolExecutor,
- ):
- self._client = client
- self._config = config
- self._os = osutil
- self._executor_cls = executor_cls
- self._ioqueue = ShutdownQueue(self._config.max_io_queue)
- def download_file(
- self, bucket, key, filename, object_size, extra_args, callback=None
- ):
- with self._executor_cls(max_workers=2) as controller:
- # 1 thread for the future that manages the uploading of files
- # 1 thread for the future that manages IO writes.
- download_parts_handler = functools.partial(
- self._download_file_as_future,
- bucket,
- key,
- filename,
- object_size,
- callback,
- )
- parts_future = controller.submit(download_parts_handler)
- io_writes_handler = functools.partial(
- self._perform_io_writes, filename
- )
- io_future = controller.submit(io_writes_handler)
- results = concurrent.futures.wait(
- [parts_future, io_future],
- return_when=concurrent.futures.FIRST_EXCEPTION,
- )
- self._process_future_results(results)
- def _process_future_results(self, futures):
- finished, unfinished = futures
- for future in finished:
- future.result()
- def _download_file_as_future(
- self, bucket, key, filename, object_size, callback
- ):
- part_size = self._config.multipart_chunksize
- num_parts = int(math.ceil(object_size / float(part_size)))
- max_workers = self._config.max_concurrency
- download_partial = functools.partial(
- self._download_range,
- bucket,
- key,
- filename,
- part_size,
- num_parts,
- callback,
- )
- try:
- with self._executor_cls(max_workers=max_workers) as executor:
- list(executor.map(download_partial, range(num_parts)))
- finally:
- self._ioqueue.put(SHUTDOWN_SENTINEL)
- def _calculate_range_param(self, part_size, part_index, num_parts):
- start_range = part_index * part_size
- if part_index == num_parts - 1:
- end_range = ''
- else:
- end_range = start_range + part_size - 1
- range_param = f'bytes={start_range}-{end_range}'
- return range_param
- def _download_range(
- self, bucket, key, filename, part_size, num_parts, callback, part_index
- ):
- try:
- range_param = self._calculate_range_param(
- part_size, part_index, num_parts
- )
- max_attempts = self._config.num_download_attempts
- last_exception = None
- for i in range(max_attempts):
- try:
- logger.debug("Making get_object call.")
- response = self._client.get_object(
- Bucket=bucket, Key=key, Range=range_param
- )
- streaming_body = StreamReaderProgress(
- response['Body'], callback
- )
- buffer_size = 1024 * 16
- current_index = part_size * part_index
- for chunk in iter(
- lambda: streaming_body.read(buffer_size), b''
- ):
- self._ioqueue.put((current_index, chunk))
- current_index += len(chunk)
- return
- except (
- socket.timeout,
- OSError,
- ReadTimeoutError,
- IncompleteReadError,
- ) as e:
- logger.debug(
- "Retrying exception caught (%s), "
- "retrying request, (attempt %s / %s)",
- e,
- i,
- max_attempts,
- exc_info=True,
- )
- last_exception = e
- continue
- raise RetriesExceededError(last_exception)
- finally:
- logger.debug("EXITING _download_range for part: %s", part_index)
- def _perform_io_writes(self, filename):
- with self._os.open(filename, 'wb') as f:
- while True:
- task = self._ioqueue.get()
- if task is SHUTDOWN_SENTINEL:
- logger.debug(
- "Shutdown sentinel received in IO handler, "
- "shutting down IO handler."
- )
- return
- else:
- try:
- offset, data = task
- f.seek(offset)
- f.write(data)
- except Exception as e:
- logger.debug(
- "Caught exception in IO thread: %s",
- e,
- exc_info=True,
- )
- self._ioqueue.trigger_shutdown()
- raise
- class TransferConfig:
- def __init__(
- self,
- multipart_threshold=8 * MB,
- max_concurrency=10,
- multipart_chunksize=8 * MB,
- num_download_attempts=5,
- max_io_queue=100,
- ):
- self.multipart_threshold = multipart_threshold
- self.max_concurrency = max_concurrency
- self.multipart_chunksize = multipart_chunksize
- self.num_download_attempts = num_download_attempts
- self.max_io_queue = max_io_queue
- class S3Transfer:
- ALLOWED_DOWNLOAD_ARGS = [
- 'VersionId',
- 'SSECustomerAlgorithm',
- 'SSECustomerKey',
- 'SSECustomerKeyMD5',
- 'RequestPayer',
- ]
- ALLOWED_UPLOAD_ARGS = [
- 'ACL',
- 'CacheControl',
- 'ContentDisposition',
- 'ContentEncoding',
- 'ContentLanguage',
- 'ContentType',
- 'Expires',
- 'GrantFullControl',
- 'GrantRead',
- 'GrantReadACP',
- 'GrantWriteACL',
- 'Metadata',
- 'RequestPayer',
- 'ServerSideEncryption',
- 'StorageClass',
- 'SSECustomerAlgorithm',
- 'SSECustomerKey',
- 'SSECustomerKeyMD5',
- 'SSEKMSKeyId',
- 'SSEKMSEncryptionContext',
- 'Tagging',
- ]
- def __init__(self, client, config=None, osutil=None):
- self._client = client
- if config is None:
- config = TransferConfig()
- self._config = config
- if osutil is None:
- osutil = OSUtils()
- self._osutil = osutil
- def upload_file(
- self, filename, bucket, key, callback=None, extra_args=None
- ):
- """Upload a file to an S3 object.
- Variants have also been injected into S3 client, Bucket and Object.
- You don't have to use S3Transfer.upload_file() directly.
- """
- if extra_args is None:
- extra_args = {}
- self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
- events = self._client.meta.events
- events.register_first(
- 'request-created.s3',
- disable_upload_callbacks,
- unique_id='s3upload-callback-disable',
- )
- events.register_last(
- 'request-created.s3',
- enable_upload_callbacks,
- unique_id='s3upload-callback-enable',
- )
- if (
- self._osutil.get_file_size(filename)
- >= self._config.multipart_threshold
- ):
- self._multipart_upload(filename, bucket, key, callback, extra_args)
- else:
- self._put_object(filename, bucket, key, callback, extra_args)
- def _put_object(self, filename, bucket, key, callback, extra_args):
- # We're using open_file_chunk_reader so we can take advantage of the
- # progress callback functionality.
- open_chunk_reader = self._osutil.open_file_chunk_reader
- with open_chunk_reader(
- filename,
- 0,
- self._osutil.get_file_size(filename),
- callback=callback,
- ) as body:
- self._client.put_object(
- Bucket=bucket, Key=key, Body=body, **extra_args
- )
- def download_file(
- self, bucket, key, filename, extra_args=None, callback=None
- ):
- """Download an S3 object to a file.
- Variants have also been injected into S3 client, Bucket and Object.
- You don't have to use S3Transfer.download_file() directly.
- """
- # This method will issue a ``head_object`` request to determine
- # the size of the S3 object. This is used to determine if the
- # object is downloaded in parallel.
- if extra_args is None:
- extra_args = {}
- self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
- object_size = self._object_size(bucket, key, extra_args)
- temp_filename = filename + os.extsep + random_file_extension()
- try:
- self._download_file(
- bucket, key, temp_filename, object_size, extra_args, callback
- )
- except Exception:
- logger.debug(
- "Exception caught in download_file, removing partial "
- "file: %s",
- temp_filename,
- exc_info=True,
- )
- self._osutil.remove_file(temp_filename)
- raise
- else:
- self._osutil.rename_file(temp_filename, filename)
- def _download_file(
- self, bucket, key, filename, object_size, extra_args, callback
- ):
- if object_size >= self._config.multipart_threshold:
- self._ranged_download(
- bucket, key, filename, object_size, extra_args, callback
- )
- else:
- self._get_object(bucket, key, filename, extra_args, callback)
- def _validate_all_known_args(self, actual, allowed):
- for kwarg in actual:
- if kwarg not in allowed:
- raise ValueError(
- "Invalid extra_args key '%s', "
- "must be one of: %s" % (kwarg, ', '.join(allowed))
- )
- def _ranged_download(
- self, bucket, key, filename, object_size, extra_args, callback
- ):
- downloader = MultipartDownloader(
- self._client, self._config, self._osutil
- )
- downloader.download_file(
- bucket, key, filename, object_size, extra_args, callback
- )
- def _get_object(self, bucket, key, filename, extra_args, callback):
- # precondition: num_download_attempts > 0
- max_attempts = self._config.num_download_attempts
- last_exception = None
- for i in range(max_attempts):
- try:
- return self._do_get_object(
- bucket, key, filename, extra_args, callback
- )
- except (
- socket.timeout,
- OSError,
- ReadTimeoutError,
- IncompleteReadError,
- ) as e:
- # TODO: we need a way to reset the callback if the
- # download failed.
- logger.debug(
- "Retrying exception caught (%s), "
- "retrying request, (attempt %s / %s)",
- e,
- i,
- max_attempts,
- exc_info=True,
- )
- last_exception = e
- continue
- raise RetriesExceededError(last_exception)
- def _do_get_object(self, bucket, key, filename, extra_args, callback):
- response = self._client.get_object(
- Bucket=bucket, Key=key, **extra_args
- )
- streaming_body = StreamReaderProgress(response['Body'], callback)
- with self._osutil.open(filename, 'wb') as f:
- for chunk in iter(lambda: streaming_body.read(8192), b''):
- f.write(chunk)
- def _object_size(self, bucket, key, extra_args):
- return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
- 'ContentLength'
- ]
- def _multipart_upload(self, filename, bucket, key, callback, extra_args):
- uploader = MultipartUploader(self._client, self._config, self._osutil)
- uploader.upload_file(filename, bucket, key, callback, extra_args)
|