123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802 |
- # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://aws.amazon.com/apache2.0/
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import functools
- import logging
- import math
- import os
- import random
- import socket
- import stat
- import string
- import threading
- from collections import defaultdict
- from botocore.exceptions import IncompleteReadError, ReadTimeoutError
- from s3transfer.compat import SOCKET_ERROR, fallocate, rename_file
- MAX_PARTS = 10000
- # The maximum file size you can upload via S3 per request.
- # See: http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
- # and: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
- MAX_SINGLE_UPLOAD_SIZE = 5 * (1024**3)
- MIN_UPLOAD_CHUNKSIZE = 5 * (1024**2)
- logger = logging.getLogger(__name__)
- S3_RETRYABLE_DOWNLOAD_ERRORS = (
- socket.timeout,
- SOCKET_ERROR,
- ReadTimeoutError,
- IncompleteReadError,
- )
- def random_file_extension(num_digits=8):
- return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
- def signal_not_transferring(request, operation_name, **kwargs):
- if operation_name in ['PutObject', 'UploadPart'] and hasattr(
- request.body, 'signal_not_transferring'
- ):
- request.body.signal_not_transferring()
- def signal_transferring(request, operation_name, **kwargs):
- if operation_name in ['PutObject', 'UploadPart'] and hasattr(
- request.body, 'signal_transferring'
- ):
- request.body.signal_transferring()
- def calculate_num_parts(size, part_size):
- return int(math.ceil(size / float(part_size)))
- def calculate_range_parameter(
- part_size, part_index, num_parts, total_size=None
- ):
- """Calculate the range parameter for multipart downloads/copies
- :type part_size: int
- :param part_size: The size of the part
- :type part_index: int
- :param part_index: The index for which this parts starts. This index starts
- at zero
- :type num_parts: int
- :param num_parts: The total number of parts in the transfer
- :returns: The value to use for Range parameter on downloads or
- the CopySourceRange parameter for copies
- """
- # Used to calculate the Range parameter
- start_range = part_index * part_size
- if part_index == num_parts - 1:
- end_range = ''
- if total_size is not None:
- end_range = str(total_size - 1)
- else:
- end_range = start_range + part_size - 1
- range_param = f'bytes={start_range}-{end_range}'
- return range_param
- def get_callbacks(transfer_future, callback_type):
- """Retrieves callbacks from a subscriber
- :type transfer_future: s3transfer.futures.TransferFuture
- :param transfer_future: The transfer future the subscriber is associated
- to.
- :type callback_type: str
- :param callback_type: The type of callback to retrieve from the subscriber.
- Valid types include:
- * 'queued'
- * 'progress'
- * 'done'
- :returns: A list of callbacks for the type specified. All callbacks are
- preinjected with the transfer future.
- """
- callbacks = []
- for subscriber in transfer_future.meta.call_args.subscribers:
- callback_name = 'on_' + callback_type
- if hasattr(subscriber, callback_name):
- callbacks.append(
- functools.partial(
- getattr(subscriber, callback_name), future=transfer_future
- )
- )
- return callbacks
- def invoke_progress_callbacks(callbacks, bytes_transferred):
- """Calls all progress callbacks
- :param callbacks: A list of progress callbacks to invoke
- :param bytes_transferred: The number of bytes transferred. This is passed
- to the callbacks. If no bytes were transferred the callbacks will not
- be invoked because no progress was achieved. It is also possible
- to receive a negative amount which comes from retrying a transfer
- request.
- """
- # Only invoke the callbacks if bytes were actually transferred.
- if bytes_transferred:
- for callback in callbacks:
- callback(bytes_transferred=bytes_transferred)
- def get_filtered_dict(original_dict, whitelisted_keys):
- """Gets a dictionary filtered by whitelisted keys
- :param original_dict: The original dictionary of arguments to source keys
- and values.
- :param whitelisted_key: A list of keys to include in the filtered
- dictionary.
- :returns: A dictionary containing key/values from the original dictionary
- whose key was included in the whitelist
- """
- filtered_dict = {}
- for key, value in original_dict.items():
- if key in whitelisted_keys:
- filtered_dict[key] = value
- return filtered_dict
- class CallArgs:
- def __init__(self, **kwargs):
- """A class that records call arguments
- The call arguments must be passed as keyword arguments. It will set
- each keyword argument as an attribute of the object along with its
- associated value.
- """
- for arg, value in kwargs.items():
- setattr(self, arg, value)
- class FunctionContainer:
- """An object that contains a function and any args or kwargs to call it
- When called the provided function will be called with provided args
- and kwargs.
- """
- def __init__(self, func, *args, **kwargs):
- self._func = func
- self._args = args
- self._kwargs = kwargs
- def __repr__(self):
- return 'Function: {} with args {} and kwargs {}'.format(
- self._func, self._args, self._kwargs
- )
- def __call__(self):
- return self._func(*self._args, **self._kwargs)
- class CountCallbackInvoker:
- """An abstraction to invoke a callback when a shared count reaches zero
- :param callback: Callback invoke when finalized count reaches zero
- """
- def __init__(self, callback):
- self._lock = threading.Lock()
- self._callback = callback
- self._count = 0
- self._is_finalized = False
- @property
- def current_count(self):
- with self._lock:
- return self._count
- def increment(self):
- """Increment the count by one"""
- with self._lock:
- if self._is_finalized:
- raise RuntimeError(
- 'Counter has been finalized it can no longer be '
- 'incremented.'
- )
- self._count += 1
- def decrement(self):
- """Decrement the count by one"""
- with self._lock:
- if self._count == 0:
- raise RuntimeError(
- 'Counter is at zero. It cannot dip below zero'
- )
- self._count -= 1
- if self._is_finalized and self._count == 0:
- self._callback()
- def finalize(self):
- """Finalize the counter
- Once finalized, the counter never be incremented and the callback
- can be invoked once the count reaches zero
- """
- with self._lock:
- self._is_finalized = True
- if self._count == 0:
- self._callback()
- class OSUtils:
- _MAX_FILENAME_LEN = 255
- def get_file_size(self, filename):
- return os.path.getsize(filename)
- def open_file_chunk_reader(self, filename, start_byte, size, callbacks):
- return ReadFileChunk.from_filename(
- filename, start_byte, size, callbacks, enable_callbacks=False
- )
- def open_file_chunk_reader_from_fileobj(
- self,
- fileobj,
- chunk_size,
- full_file_size,
- callbacks,
- close_callbacks=None,
- ):
- return ReadFileChunk(
- fileobj,
- chunk_size,
- full_file_size,
- callbacks=callbacks,
- enable_callbacks=False,
- close_callbacks=close_callbacks,
- )
- def open(self, filename, mode):
- return open(filename, mode)
- def remove_file(self, filename):
- """Remove a file, noop if file does not exist."""
- # Unlike os.remove, if the file does not exist,
- # then this method does nothing.
- try:
- os.remove(filename)
- except OSError:
- pass
- def rename_file(self, current_filename, new_filename):
- rename_file(current_filename, new_filename)
- def is_special_file(cls, filename):
- """Checks to see if a file is a special UNIX file.
- It checks if the file is a character special device, block special
- device, FIFO, or socket.
- :param filename: Name of the file
- :returns: True if the file is a special file. False, if is not.
- """
- # If it does not exist, it must be a new file so it cannot be
- # a special file.
- if not os.path.exists(filename):
- return False
- mode = os.stat(filename).st_mode
- # Character special device.
- if stat.S_ISCHR(mode):
- return True
- # Block special device
- if stat.S_ISBLK(mode):
- return True
- # Named pipe / FIFO
- if stat.S_ISFIFO(mode):
- return True
- # Socket.
- if stat.S_ISSOCK(mode):
- return True
- return False
- def get_temp_filename(self, filename):
- suffix = os.extsep + random_file_extension()
- path = os.path.dirname(filename)
- name = os.path.basename(filename)
- temp_filename = name[: self._MAX_FILENAME_LEN - len(suffix)] + suffix
- return os.path.join(path, temp_filename)
- def allocate(self, filename, size):
- try:
- with self.open(filename, 'wb') as f:
- fallocate(f, size)
- except OSError:
- self.remove_file(filename)
- raise
- class DeferredOpenFile:
- def __init__(self, filename, start_byte=0, mode='rb', open_function=open):
- """A class that defers the opening of a file till needed
- This is useful for deferring opening of a file till it is needed
- in a separate thread, as there is a limit of how many open files
- there can be in a single thread for most operating systems. The
- file gets opened in the following methods: ``read()``, ``seek()``,
- and ``__enter__()``
- :type filename: str
- :param filename: The name of the file to open
- :type start_byte: int
- :param start_byte: The byte to seek to when the file is opened.
- :type mode: str
- :param mode: The mode to use to open the file
- :type open_function: function
- :param open_function: The function to use to open the file
- """
- self._filename = filename
- self._fileobj = None
- self._start_byte = start_byte
- self._mode = mode
- self._open_function = open_function
- def _open_if_needed(self):
- if self._fileobj is None:
- self._fileobj = self._open_function(self._filename, self._mode)
- if self._start_byte != 0:
- self._fileobj.seek(self._start_byte)
- @property
- def name(self):
- return self._filename
- def read(self, amount=None):
- self._open_if_needed()
- return self._fileobj.read(amount)
- def write(self, data):
- self._open_if_needed()
- self._fileobj.write(data)
- def seek(self, where, whence=0):
- self._open_if_needed()
- self._fileobj.seek(where, whence)
- def tell(self):
- if self._fileobj is None:
- return self._start_byte
- return self._fileobj.tell()
- def close(self):
- if self._fileobj:
- self._fileobj.close()
- def __enter__(self):
- self._open_if_needed()
- return self
- def __exit__(self, *args, **kwargs):
- self.close()
- class ReadFileChunk:
- def __init__(
- self,
- fileobj,
- chunk_size,
- full_file_size,
- callbacks=None,
- enable_callbacks=True,
- close_callbacks=None,
- ):
- """
- Given a file object shown below::
- |___________________________________________________|
- 0 | | full_file_size
- |----chunk_size---|
- f.tell()
- :type fileobj: file
- :param fileobj: File like object
- :type chunk_size: int
- :param chunk_size: The max chunk size to read. Trying to read
- pass the end of the chunk size will behave like you've
- reached the end of the file.
- :type full_file_size: int
- :param full_file_size: The entire content length associated
- with ``fileobj``.
- :type callbacks: A list of function(amount_read)
- :param callbacks: Called whenever data is read from this object in the
- order provided.
- :type enable_callbacks: boolean
- :param enable_callbacks: True if to run callbacks. Otherwise, do not
- run callbacks
- :type close_callbacks: A list of function()
- :param close_callbacks: Called when close is called. The function
- should take no arguments.
- """
- self._fileobj = fileobj
- self._start_byte = self._fileobj.tell()
- self._size = self._calculate_file_size(
- self._fileobj,
- requested_size=chunk_size,
- start_byte=self._start_byte,
- actual_file_size=full_file_size,
- )
- # _amount_read represents the position in the chunk and may exceed
- # the chunk size, but won't allow reads out of bounds.
- self._amount_read = 0
- self._callbacks = callbacks
- if callbacks is None:
- self._callbacks = []
- self._callbacks_enabled = enable_callbacks
- self._close_callbacks = close_callbacks
- if close_callbacks is None:
- self._close_callbacks = close_callbacks
- @classmethod
- def from_filename(
- cls,
- filename,
- start_byte,
- chunk_size,
- callbacks=None,
- enable_callbacks=True,
- ):
- """Convenience factory function to create from a filename.
- :type start_byte: int
- :param start_byte: The first byte from which to start reading.
- :type chunk_size: int
- :param chunk_size: The max chunk size to read. Trying to read
- pass the end of the chunk size will behave like you've
- reached the end of the file.
- :type full_file_size: int
- :param full_file_size: The entire content length associated
- with ``fileobj``.
- :type callbacks: function(amount_read)
- :param callbacks: Called whenever data is read from this object.
- :type enable_callbacks: bool
- :param enable_callbacks: Indicate whether to invoke callback
- during read() calls.
- :rtype: ``ReadFileChunk``
- :return: A new instance of ``ReadFileChunk``
- """
- f = open(filename, 'rb')
- f.seek(start_byte)
- file_size = os.fstat(f.fileno()).st_size
- return cls(f, chunk_size, file_size, callbacks, enable_callbacks)
- def _calculate_file_size(
- self, fileobj, requested_size, start_byte, actual_file_size
- ):
- max_chunk_size = actual_file_size - start_byte
- return min(max_chunk_size, requested_size)
- def read(self, amount=None):
- amount_left = max(self._size - self._amount_read, 0)
- if amount is None:
- amount_to_read = amount_left
- else:
- amount_to_read = min(amount_left, amount)
- data = self._fileobj.read(amount_to_read)
- self._amount_read += len(data)
- if self._callbacks is not None and self._callbacks_enabled:
- invoke_progress_callbacks(self._callbacks, len(data))
- return data
- def signal_transferring(self):
- self.enable_callback()
- if hasattr(self._fileobj, 'signal_transferring'):
- self._fileobj.signal_transferring()
- def signal_not_transferring(self):
- self.disable_callback()
- if hasattr(self._fileobj, 'signal_not_transferring'):
- self._fileobj.signal_not_transferring()
- def enable_callback(self):
- self._callbacks_enabled = True
- def disable_callback(self):
- self._callbacks_enabled = False
- def seek(self, where, whence=0):
- if whence not in (0, 1, 2):
- # Mimic io's error for invalid whence values
- raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
- # Recalculate where based on chunk attributes so seek from file
- # start (whence=0) is always used
- where += self._start_byte
- if whence == 1:
- where += self._amount_read
- elif whence == 2:
- where += self._size
- self._fileobj.seek(max(where, self._start_byte))
- if self._callbacks is not None and self._callbacks_enabled:
- # To also rewind the callback() for an accurate progress report
- bounded_where = max(min(where - self._start_byte, self._size), 0)
- bounded_amount_read = min(self._amount_read, self._size)
- amount = bounded_where - bounded_amount_read
- invoke_progress_callbacks(
- self._callbacks, bytes_transferred=amount
- )
- self._amount_read = max(where - self._start_byte, 0)
- def close(self):
- if self._close_callbacks is not None and self._callbacks_enabled:
- for callback in self._close_callbacks:
- callback()
- self._fileobj.close()
- def tell(self):
- return self._amount_read
- def __len__(self):
- # __len__ is defined because requests will try to determine the length
- # of the stream to set a content length. In the normal case
- # of the file it will just stat the file, but we need to change that
- # behavior. By providing a __len__, requests will use that instead
- # of stat'ing the file.
- return self._size
- def __enter__(self):
- return self
- def __exit__(self, *args, **kwargs):
- self.close()
- def __iter__(self):
- # This is a workaround for http://bugs.python.org/issue17575
- # Basically httplib will try to iterate over the contents, even
- # if its a file like object. This wasn't noticed because we've
- # already exhausted the stream so iterating over the file immediately
- # stops, which is what we're simulating here.
- return iter([])
- class StreamReaderProgress:
- """Wrapper for a read only stream that adds progress callbacks."""
- def __init__(self, stream, callbacks=None):
- self._stream = stream
- self._callbacks = callbacks
- if callbacks is None:
- self._callbacks = []
- def read(self, *args, **kwargs):
- value = self._stream.read(*args, **kwargs)
- invoke_progress_callbacks(self._callbacks, len(value))
- return value
- class NoResourcesAvailable(Exception):
- pass
- class TaskSemaphore:
- def __init__(self, count):
- """A semaphore for the purpose of limiting the number of tasks
- :param count: The size of semaphore
- """
- self._semaphore = threading.Semaphore(count)
- def acquire(self, tag, blocking=True):
- """Acquire the semaphore
- :param tag: A tag identifying what is acquiring the semaphore. Note
- that this is not really needed to directly use this class but is
- needed for API compatibility with the SlidingWindowSemaphore
- implementation.
- :param block: If True, block until it can be acquired. If False,
- do not block and raise an exception if cannot be acquired.
- :returns: A token (can be None) to use when releasing the semaphore
- """
- logger.debug("Acquiring %s", tag)
- if not self._semaphore.acquire(blocking):
- raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
- def release(self, tag, acquire_token):
- """Release the semaphore
- :param tag: A tag identifying what is releasing the semaphore
- :param acquire_token: The token returned from when the semaphore was
- acquired. Note that this is not really needed to directly use this
- class but is needed for API compatibility with the
- SlidingWindowSemaphore implementation.
- """
- logger.debug(f"Releasing acquire {tag}/{acquire_token}")
- self._semaphore.release()
- class SlidingWindowSemaphore(TaskSemaphore):
- """A semaphore used to coordinate sequential resource access.
- This class is similar to the stdlib BoundedSemaphore:
- * It's initialized with a count.
- * Each call to ``acquire()`` decrements the counter.
- * If the count is at zero, then ``acquire()`` will either block until the
- count increases, or if ``blocking=False``, then it will raise
- a NoResourcesAvailable exception indicating that it failed to acquire the
- semaphore.
- The main difference is that this semaphore is used to limit
- access to a resource that requires sequential access. For example,
- if I want to access resource R that has 20 subresources R_0 - R_19,
- this semaphore can also enforce that you only have a max range of
- 10 at any given point in time. You must also specify a tag name
- when you acquire the semaphore. The sliding window semantics apply
- on a per tag basis. The internal count will only be incremented
- when the minimum sequence number for a tag is released.
- """
- def __init__(self, count):
- self._count = count
- # Dict[tag, next_sequence_number].
- self._tag_sequences = defaultdict(int)
- self._lowest_sequence = {}
- self._lock = threading.Lock()
- self._condition = threading.Condition(self._lock)
- # Dict[tag, List[sequence_number]]
- self._pending_release = {}
- def current_count(self):
- with self._lock:
- return self._count
- def acquire(self, tag, blocking=True):
- logger.debug("Acquiring %s", tag)
- self._condition.acquire()
- try:
- if self._count == 0:
- if not blocking:
- raise NoResourcesAvailable("Cannot acquire tag '%s'" % tag)
- else:
- while self._count == 0:
- self._condition.wait()
- # self._count is no longer zero.
- # First, check if this is the first time we're seeing this tag.
- sequence_number = self._tag_sequences[tag]
- if sequence_number == 0:
- # First time seeing the tag, so record we're at 0.
- self._lowest_sequence[tag] = sequence_number
- self._tag_sequences[tag] += 1
- self._count -= 1
- return sequence_number
- finally:
- self._condition.release()
- def release(self, tag, acquire_token):
- sequence_number = acquire_token
- logger.debug("Releasing acquire %s/%s", tag, sequence_number)
- self._condition.acquire()
- try:
- if tag not in self._tag_sequences:
- raise ValueError("Attempted to release unknown tag: %s" % tag)
- max_sequence = self._tag_sequences[tag]
- if self._lowest_sequence[tag] == sequence_number:
- # We can immediately process this request and free up
- # resources.
- self._lowest_sequence[tag] += 1
- self._count += 1
- self._condition.notify()
- queued = self._pending_release.get(tag, [])
- while queued:
- if self._lowest_sequence[tag] == queued[-1]:
- queued.pop()
- self._lowest_sequence[tag] += 1
- self._count += 1
- else:
- break
- elif self._lowest_sequence[tag] < sequence_number < max_sequence:
- # We can't do anything right now because we're still waiting
- # for the min sequence for the tag to be released. We have
- # to queue this for pending release.
- self._pending_release.setdefault(tag, []).append(
- sequence_number
- )
- self._pending_release[tag].sort(reverse=True)
- else:
- raise ValueError(
- "Attempted to release unknown sequence number "
- "%s for tag: %s" % (sequence_number, tag)
- )
- finally:
- self._condition.release()
- class ChunksizeAdjuster:
- def __init__(
- self,
- max_size=MAX_SINGLE_UPLOAD_SIZE,
- min_size=MIN_UPLOAD_CHUNKSIZE,
- max_parts=MAX_PARTS,
- ):
- self.max_size = max_size
- self.min_size = min_size
- self.max_parts = max_parts
- def adjust_chunksize(self, current_chunksize, file_size=None):
- """Get a chunksize close to current that fits within all S3 limits.
- :type current_chunksize: int
- :param current_chunksize: The currently configured chunksize.
- :type file_size: int or None
- :param file_size: The size of the file to upload. This might be None
- if the object being transferred has an unknown size.
- :returns: A valid chunksize that fits within configured limits.
- """
- chunksize = current_chunksize
- if file_size is not None:
- chunksize = self._adjust_for_max_parts(chunksize, file_size)
- return self._adjust_for_chunksize_limits(chunksize)
- def _adjust_for_chunksize_limits(self, current_chunksize):
- if current_chunksize > self.max_size:
- logger.debug(
- "Chunksize greater than maximum chunksize. "
- "Setting to %s from %s." % (self.max_size, current_chunksize)
- )
- return self.max_size
- elif current_chunksize < self.min_size:
- logger.debug(
- "Chunksize less than minimum chunksize. "
- "Setting to %s from %s." % (self.min_size, current_chunksize)
- )
- return self.min_size
- else:
- return current_chunksize
- def _adjust_for_max_parts(self, current_chunksize, file_size):
- chunksize = current_chunksize
- num_parts = int(math.ceil(file_size / float(chunksize)))
- while num_parts > self.max_parts:
- chunksize *= 2
- num_parts = int(math.ceil(file_size / float(chunksize)))
- if chunksize != current_chunksize:
- logger.debug(
- "Chunksize would result in the number of parts exceeding the "
- "maximum. Setting to %s from %s."
- % (chunksize, current_chunksize)
- )
- return chunksize
|