123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- # Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://aws.amazon.com/apache2.0/
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import threading
- import time
- class RequestExceededException(Exception):
- def __init__(self, requested_amt, retry_time):
- """Error when requested amount exceeds what is allowed
- The request that raised this error should be retried after waiting
- the time specified by ``retry_time``.
- :type requested_amt: int
- :param requested_amt: The originally requested byte amount
- :type retry_time: float
- :param retry_time: The length in time to wait to retry for the
- requested amount
- """
- self.requested_amt = requested_amt
- self.retry_time = retry_time
- msg = 'Request amount {} exceeded the amount available. Retry in {}'.format(
- requested_amt, retry_time
- )
- super().__init__(msg)
- class RequestToken:
- """A token to pass as an identifier when consuming from the LeakyBucket"""
- pass
- class TimeUtils:
- def time(self):
- """Get the current time back
- :rtype: float
- :returns: The current time in seconds
- """
- return time.time()
- def sleep(self, value):
- """Sleep for a designated time
- :type value: float
- :param value: The time to sleep for in seconds
- """
- return time.sleep(value)
- class BandwidthLimiter:
- def __init__(self, leaky_bucket, time_utils=None):
- """Limits bandwidth for shared S3 transfers
- :type leaky_bucket: LeakyBucket
- :param leaky_bucket: The leaky bucket to use limit bandwidth
- :type time_utils: TimeUtils
- :param time_utils: Time utility to use for interacting with time.
- """
- self._leaky_bucket = leaky_bucket
- self._time_utils = time_utils
- if time_utils is None:
- self._time_utils = TimeUtils()
- def get_bandwith_limited_stream(
- self, fileobj, transfer_coordinator, enabled=True
- ):
- """Wraps a fileobj in a bandwidth limited stream wrapper
- :type fileobj: file-like obj
- :param fileobj: The file-like obj to wrap
- :type transfer_coordinator: s3transfer.futures.TransferCoordinator
- param transfer_coordinator: The coordinator for the general transfer
- that the wrapped stream is a part of
- :type enabled: boolean
- :param enabled: Whether bandwidth limiting should be enabled to start
- """
- stream = BandwidthLimitedStream(
- fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils
- )
- if not enabled:
- stream.disable_bandwidth_limiting()
- return stream
- class BandwidthLimitedStream:
- def __init__(
- self,
- fileobj,
- leaky_bucket,
- transfer_coordinator,
- time_utils=None,
- bytes_threshold=256 * 1024,
- ):
- """Limits bandwidth for reads on a wrapped stream
- :type fileobj: file-like object
- :param fileobj: The file like object to wrap
- :type leaky_bucket: LeakyBucket
- :param leaky_bucket: The leaky bucket to use to throttle reads on
- the stream
- :type transfer_coordinator: s3transfer.futures.TransferCoordinator
- param transfer_coordinator: The coordinator for the general transfer
- that the wrapped stream is a part of
- :type time_utils: TimeUtils
- :param time_utils: The time utility to use for interacting with time
- """
- self._fileobj = fileobj
- self._leaky_bucket = leaky_bucket
- self._transfer_coordinator = transfer_coordinator
- self._time_utils = time_utils
- if time_utils is None:
- self._time_utils = TimeUtils()
- self._bandwidth_limiting_enabled = True
- self._request_token = RequestToken()
- self._bytes_seen = 0
- self._bytes_threshold = bytes_threshold
- def enable_bandwidth_limiting(self):
- """Enable bandwidth limiting on reads to the stream"""
- self._bandwidth_limiting_enabled = True
- def disable_bandwidth_limiting(self):
- """Disable bandwidth limiting on reads to the stream"""
- self._bandwidth_limiting_enabled = False
- def read(self, amount):
- """Read a specified amount
- Reads will only be throttled if bandwidth limiting is enabled.
- """
- if not self._bandwidth_limiting_enabled:
- return self._fileobj.read(amount)
- # We do not want to be calling consume on every read as the read
- # amounts can be small causing the lock of the leaky bucket to
- # introduce noticeable overhead. So instead we keep track of
- # how many bytes we have seen and only call consume once we pass a
- # certain threshold.
- self._bytes_seen += amount
- if self._bytes_seen < self._bytes_threshold:
- return self._fileobj.read(amount)
- self._consume_through_leaky_bucket()
- return self._fileobj.read(amount)
- def _consume_through_leaky_bucket(self):
- # NOTE: If the read amount on the stream are high, it will result
- # in large bursty behavior as there is not an interface for partial
- # reads. However given the read's on this abstraction are at most 256KB
- # (via downloads), it reduces the burstiness to be small KB bursts at
- # worst.
- while not self._transfer_coordinator.exception:
- try:
- self._leaky_bucket.consume(
- self._bytes_seen, self._request_token
- )
- self._bytes_seen = 0
- return
- except RequestExceededException as e:
- self._time_utils.sleep(e.retry_time)
- else:
- raise self._transfer_coordinator.exception
- def signal_transferring(self):
- """Signal that data being read is being transferred to S3"""
- self.enable_bandwidth_limiting()
- def signal_not_transferring(self):
- """Signal that data being read is not being transferred to S3"""
- self.disable_bandwidth_limiting()
- def seek(self, where, whence=0):
- self._fileobj.seek(where, whence)
- def tell(self):
- return self._fileobj.tell()
- def close(self):
- if self._bandwidth_limiting_enabled and self._bytes_seen:
- # This handles the case where the file is small enough to never
- # trigger the threshold and thus is never subjugated to the
- # leaky bucket on read(). This specifically happens for small
- # uploads. So instead to account for those bytes, have
- # it go through the leaky bucket when the file gets closed.
- self._consume_through_leaky_bucket()
- self._fileobj.close()
- def __enter__(self):
- return self
- def __exit__(self, *args, **kwargs):
- self.close()
- class LeakyBucket:
- def __init__(
- self,
- max_rate,
- time_utils=None,
- rate_tracker=None,
- consumption_scheduler=None,
- ):
- """A leaky bucket abstraction to limit bandwidth consumption
- :type rate: int
- :type rate: The maximum rate to allow. This rate is in terms of
- bytes per second.
- :type time_utils: TimeUtils
- :param time_utils: The time utility to use for interacting with time
- :type rate_tracker: BandwidthRateTracker
- :param rate_tracker: Tracks bandwidth consumption
- :type consumption_scheduler: ConsumptionScheduler
- :param consumption_scheduler: Schedules consumption retries when
- necessary
- """
- self._max_rate = float(max_rate)
- self._time_utils = time_utils
- if time_utils is None:
- self._time_utils = TimeUtils()
- self._lock = threading.Lock()
- self._rate_tracker = rate_tracker
- if rate_tracker is None:
- self._rate_tracker = BandwidthRateTracker()
- self._consumption_scheduler = consumption_scheduler
- if consumption_scheduler is None:
- self._consumption_scheduler = ConsumptionScheduler()
- def consume(self, amt, request_token):
- """Consume an a requested amount
- :type amt: int
- :param amt: The amount of bytes to request to consume
- :type request_token: RequestToken
- :param request_token: The token associated to the consumption
- request that is used to identify the request. So if a
- RequestExceededException is raised the token should be used
- in subsequent retry consume() request.
- :raises RequestExceededException: If the consumption amount would
- exceed the maximum allocated bandwidth
- :rtype: int
- :returns: The amount consumed
- """
- with self._lock:
- time_now = self._time_utils.time()
- if self._consumption_scheduler.is_scheduled(request_token):
- return self._release_requested_amt_for_scheduled_request(
- amt, request_token, time_now
- )
- elif self._projected_to_exceed_max_rate(amt, time_now):
- self._raise_request_exceeded_exception(
- amt, request_token, time_now
- )
- else:
- return self._release_requested_amt(amt, time_now)
- def _projected_to_exceed_max_rate(self, amt, time_now):
- projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
- return projected_rate > self._max_rate
- def _release_requested_amt_for_scheduled_request(
- self, amt, request_token, time_now
- ):
- self._consumption_scheduler.process_scheduled_consumption(
- request_token
- )
- return self._release_requested_amt(amt, time_now)
- def _raise_request_exceeded_exception(self, amt, request_token, time_now):
- allocated_time = amt / float(self._max_rate)
- retry_time = self._consumption_scheduler.schedule_consumption(
- amt, request_token, allocated_time
- )
- raise RequestExceededException(
- requested_amt=amt, retry_time=retry_time
- )
- def _release_requested_amt(self, amt, time_now):
- self._rate_tracker.record_consumption_rate(amt, time_now)
- return amt
- class ConsumptionScheduler:
- def __init__(self):
- """Schedules when to consume a desired amount"""
- self._tokens_to_scheduled_consumption = {}
- self._total_wait = 0
- def is_scheduled(self, token):
- """Indicates if a consumption request has been scheduled
- :type token: RequestToken
- :param token: The token associated to the consumption
- request that is used to identify the request.
- """
- return token in self._tokens_to_scheduled_consumption
- def schedule_consumption(self, amt, token, time_to_consume):
- """Schedules a wait time to be able to consume an amount
- :type amt: int
- :param amt: The amount of bytes scheduled to be consumed
- :type token: RequestToken
- :param token: The token associated to the consumption
- request that is used to identify the request.
- :type time_to_consume: float
- :param time_to_consume: The desired time it should take for that
- specific request amount to be consumed in regardless of previously
- scheduled consumption requests
- :rtype: float
- :returns: The amount of time to wait for the specific request before
- actually consuming the specified amount.
- """
- self._total_wait += time_to_consume
- self._tokens_to_scheduled_consumption[token] = {
- 'wait_duration': self._total_wait,
- 'time_to_consume': time_to_consume,
- }
- return self._total_wait
- def process_scheduled_consumption(self, token):
- """Processes a scheduled consumption request that has completed
- :type token: RequestToken
- :param token: The token associated to the consumption
- request that is used to identify the request.
- """
- scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
- self._total_wait = max(
- self._total_wait - scheduled_retry['time_to_consume'], 0
- )
- class BandwidthRateTracker:
- def __init__(self, alpha=0.8):
- """Tracks the rate of bandwidth consumption
- :type a: float
- :param a: The constant to use in calculating the exponentional moving
- average of the bandwidth rate. Specifically it is used in the
- following calculation:
- current_rate = alpha * new_rate + (1 - alpha) * current_rate
- This value of this constant should be between 0 and 1.
- """
- self._alpha = alpha
- self._last_time = None
- self._current_rate = None
- @property
- def current_rate(self):
- """The current transfer rate
- :rtype: float
- :returns: The current tracked transfer rate
- """
- if self._last_time is None:
- return 0.0
- return self._current_rate
- def get_projected_rate(self, amt, time_at_consumption):
- """Get the projected rate using a provided amount and time
- :type amt: int
- :param amt: The proposed amount to consume
- :type time_at_consumption: float
- :param time_at_consumption: The proposed time to consume at
- :rtype: float
- :returns: The consumption rate if that amt and time were consumed
- """
- if self._last_time is None:
- return 0.0
- return self._calculate_exponential_moving_average_rate(
- amt, time_at_consumption
- )
- def record_consumption_rate(self, amt, time_at_consumption):
- """Record the consumption rate based off amount and time point
- :type amt: int
- :param amt: The amount that got consumed
- :type time_at_consumption: float
- :param time_at_consumption: The time at which the amount was consumed
- """
- if self._last_time is None:
- self._last_time = time_at_consumption
- self._current_rate = 0.0
- return
- self._current_rate = self._calculate_exponential_moving_average_rate(
- amt, time_at_consumption
- )
- self._last_time = time_at_consumption
- def _calculate_rate(self, amt, time_at_consumption):
- time_delta = time_at_consumption - self._last_time
- if time_delta <= 0:
- # While it is really unlikely to see this in an actual transfer,
- # we do not want to be returning back a negative rate or try to
- # divide the amount by zero. So instead return back an infinite
- # rate as the time delta is infinitesimally small.
- return float('inf')
- return amt / (time_delta)
- def _calculate_exponential_moving_average_rate(
- self, amt, time_at_consumption
- ):
- new_rate = self._calculate_rate(amt, time_at_consumption)
- return self._alpha * new_rate + (1 - self._alpha) * self._current_rate
|