bandwidth.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. # Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import threading
  14. import time
  15. class RequestExceededException(Exception):
  16. def __init__(self, requested_amt, retry_time):
  17. """Error when requested amount exceeds what is allowed
  18. The request that raised this error should be retried after waiting
  19. the time specified by ``retry_time``.
  20. :type requested_amt: int
  21. :param requested_amt: The originally requested byte amount
  22. :type retry_time: float
  23. :param retry_time: The length in time to wait to retry for the
  24. requested amount
  25. """
  26. self.requested_amt = requested_amt
  27. self.retry_time = retry_time
  28. msg = 'Request amount {} exceeded the amount available. Retry in {}'.format(
  29. requested_amt, retry_time
  30. )
  31. super().__init__(msg)
  32. class RequestToken:
  33. """A token to pass as an identifier when consuming from the LeakyBucket"""
  34. pass
  35. class TimeUtils:
  36. def time(self):
  37. """Get the current time back
  38. :rtype: float
  39. :returns: The current time in seconds
  40. """
  41. return time.time()
  42. def sleep(self, value):
  43. """Sleep for a designated time
  44. :type value: float
  45. :param value: The time to sleep for in seconds
  46. """
  47. return time.sleep(value)
  48. class BandwidthLimiter:
  49. def __init__(self, leaky_bucket, time_utils=None):
  50. """Limits bandwidth for shared S3 transfers
  51. :type leaky_bucket: LeakyBucket
  52. :param leaky_bucket: The leaky bucket to use limit bandwidth
  53. :type time_utils: TimeUtils
  54. :param time_utils: Time utility to use for interacting with time.
  55. """
  56. self._leaky_bucket = leaky_bucket
  57. self._time_utils = time_utils
  58. if time_utils is None:
  59. self._time_utils = TimeUtils()
  60. def get_bandwith_limited_stream(
  61. self, fileobj, transfer_coordinator, enabled=True
  62. ):
  63. """Wraps a fileobj in a bandwidth limited stream wrapper
  64. :type fileobj: file-like obj
  65. :param fileobj: The file-like obj to wrap
  66. :type transfer_coordinator: s3transfer.futures.TransferCoordinator
  67. param transfer_coordinator: The coordinator for the general transfer
  68. that the wrapped stream is a part of
  69. :type enabled: boolean
  70. :param enabled: Whether bandwidth limiting should be enabled to start
  71. """
  72. stream = BandwidthLimitedStream(
  73. fileobj, self._leaky_bucket, transfer_coordinator, self._time_utils
  74. )
  75. if not enabled:
  76. stream.disable_bandwidth_limiting()
  77. return stream
  78. class BandwidthLimitedStream:
  79. def __init__(
  80. self,
  81. fileobj,
  82. leaky_bucket,
  83. transfer_coordinator,
  84. time_utils=None,
  85. bytes_threshold=256 * 1024,
  86. ):
  87. """Limits bandwidth for reads on a wrapped stream
  88. :type fileobj: file-like object
  89. :param fileobj: The file like object to wrap
  90. :type leaky_bucket: LeakyBucket
  91. :param leaky_bucket: The leaky bucket to use to throttle reads on
  92. the stream
  93. :type transfer_coordinator: s3transfer.futures.TransferCoordinator
  94. param transfer_coordinator: The coordinator for the general transfer
  95. that the wrapped stream is a part of
  96. :type time_utils: TimeUtils
  97. :param time_utils: The time utility to use for interacting with time
  98. """
  99. self._fileobj = fileobj
  100. self._leaky_bucket = leaky_bucket
  101. self._transfer_coordinator = transfer_coordinator
  102. self._time_utils = time_utils
  103. if time_utils is None:
  104. self._time_utils = TimeUtils()
  105. self._bandwidth_limiting_enabled = True
  106. self._request_token = RequestToken()
  107. self._bytes_seen = 0
  108. self._bytes_threshold = bytes_threshold
  109. def enable_bandwidth_limiting(self):
  110. """Enable bandwidth limiting on reads to the stream"""
  111. self._bandwidth_limiting_enabled = True
  112. def disable_bandwidth_limiting(self):
  113. """Disable bandwidth limiting on reads to the stream"""
  114. self._bandwidth_limiting_enabled = False
  115. def read(self, amount):
  116. """Read a specified amount
  117. Reads will only be throttled if bandwidth limiting is enabled.
  118. """
  119. if not self._bandwidth_limiting_enabled:
  120. return self._fileobj.read(amount)
  121. # We do not want to be calling consume on every read as the read
  122. # amounts can be small causing the lock of the leaky bucket to
  123. # introduce noticeable overhead. So instead we keep track of
  124. # how many bytes we have seen and only call consume once we pass a
  125. # certain threshold.
  126. self._bytes_seen += amount
  127. if self._bytes_seen < self._bytes_threshold:
  128. return self._fileobj.read(amount)
  129. self._consume_through_leaky_bucket()
  130. return self._fileobj.read(amount)
  131. def _consume_through_leaky_bucket(self):
  132. # NOTE: If the read amount on the stream are high, it will result
  133. # in large bursty behavior as there is not an interface for partial
  134. # reads. However given the read's on this abstraction are at most 256KB
  135. # (via downloads), it reduces the burstiness to be small KB bursts at
  136. # worst.
  137. while not self._transfer_coordinator.exception:
  138. try:
  139. self._leaky_bucket.consume(
  140. self._bytes_seen, self._request_token
  141. )
  142. self._bytes_seen = 0
  143. return
  144. except RequestExceededException as e:
  145. self._time_utils.sleep(e.retry_time)
  146. else:
  147. raise self._transfer_coordinator.exception
  148. def signal_transferring(self):
  149. """Signal that data being read is being transferred to S3"""
  150. self.enable_bandwidth_limiting()
  151. def signal_not_transferring(self):
  152. """Signal that data being read is not being transferred to S3"""
  153. self.disable_bandwidth_limiting()
  154. def seek(self, where, whence=0):
  155. self._fileobj.seek(where, whence)
  156. def tell(self):
  157. return self._fileobj.tell()
  158. def close(self):
  159. if self._bandwidth_limiting_enabled and self._bytes_seen:
  160. # This handles the case where the file is small enough to never
  161. # trigger the threshold and thus is never subjugated to the
  162. # leaky bucket on read(). This specifically happens for small
  163. # uploads. So instead to account for those bytes, have
  164. # it go through the leaky bucket when the file gets closed.
  165. self._consume_through_leaky_bucket()
  166. self._fileobj.close()
  167. def __enter__(self):
  168. return self
  169. def __exit__(self, *args, **kwargs):
  170. self.close()
  171. class LeakyBucket:
  172. def __init__(
  173. self,
  174. max_rate,
  175. time_utils=None,
  176. rate_tracker=None,
  177. consumption_scheduler=None,
  178. ):
  179. """A leaky bucket abstraction to limit bandwidth consumption
  180. :type rate: int
  181. :type rate: The maximum rate to allow. This rate is in terms of
  182. bytes per second.
  183. :type time_utils: TimeUtils
  184. :param time_utils: The time utility to use for interacting with time
  185. :type rate_tracker: BandwidthRateTracker
  186. :param rate_tracker: Tracks bandwidth consumption
  187. :type consumption_scheduler: ConsumptionScheduler
  188. :param consumption_scheduler: Schedules consumption retries when
  189. necessary
  190. """
  191. self._max_rate = float(max_rate)
  192. self._time_utils = time_utils
  193. if time_utils is None:
  194. self._time_utils = TimeUtils()
  195. self._lock = threading.Lock()
  196. self._rate_tracker = rate_tracker
  197. if rate_tracker is None:
  198. self._rate_tracker = BandwidthRateTracker()
  199. self._consumption_scheduler = consumption_scheduler
  200. if consumption_scheduler is None:
  201. self._consumption_scheduler = ConsumptionScheduler()
  202. def consume(self, amt, request_token):
  203. """Consume an a requested amount
  204. :type amt: int
  205. :param amt: The amount of bytes to request to consume
  206. :type request_token: RequestToken
  207. :param request_token: The token associated to the consumption
  208. request that is used to identify the request. So if a
  209. RequestExceededException is raised the token should be used
  210. in subsequent retry consume() request.
  211. :raises RequestExceededException: If the consumption amount would
  212. exceed the maximum allocated bandwidth
  213. :rtype: int
  214. :returns: The amount consumed
  215. """
  216. with self._lock:
  217. time_now = self._time_utils.time()
  218. if self._consumption_scheduler.is_scheduled(request_token):
  219. return self._release_requested_amt_for_scheduled_request(
  220. amt, request_token, time_now
  221. )
  222. elif self._projected_to_exceed_max_rate(amt, time_now):
  223. self._raise_request_exceeded_exception(
  224. amt, request_token, time_now
  225. )
  226. else:
  227. return self._release_requested_amt(amt, time_now)
  228. def _projected_to_exceed_max_rate(self, amt, time_now):
  229. projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
  230. return projected_rate > self._max_rate
  231. def _release_requested_amt_for_scheduled_request(
  232. self, amt, request_token, time_now
  233. ):
  234. self._consumption_scheduler.process_scheduled_consumption(
  235. request_token
  236. )
  237. return self._release_requested_amt(amt, time_now)
  238. def _raise_request_exceeded_exception(self, amt, request_token, time_now):
  239. allocated_time = amt / float(self._max_rate)
  240. retry_time = self._consumption_scheduler.schedule_consumption(
  241. amt, request_token, allocated_time
  242. )
  243. raise RequestExceededException(
  244. requested_amt=amt, retry_time=retry_time
  245. )
  246. def _release_requested_amt(self, amt, time_now):
  247. self._rate_tracker.record_consumption_rate(amt, time_now)
  248. return amt
  249. class ConsumptionScheduler:
  250. def __init__(self):
  251. """Schedules when to consume a desired amount"""
  252. self._tokens_to_scheduled_consumption = {}
  253. self._total_wait = 0
  254. def is_scheduled(self, token):
  255. """Indicates if a consumption request has been scheduled
  256. :type token: RequestToken
  257. :param token: The token associated to the consumption
  258. request that is used to identify the request.
  259. """
  260. return token in self._tokens_to_scheduled_consumption
  261. def schedule_consumption(self, amt, token, time_to_consume):
  262. """Schedules a wait time to be able to consume an amount
  263. :type amt: int
  264. :param amt: The amount of bytes scheduled to be consumed
  265. :type token: RequestToken
  266. :param token: The token associated to the consumption
  267. request that is used to identify the request.
  268. :type time_to_consume: float
  269. :param time_to_consume: The desired time it should take for that
  270. specific request amount to be consumed in regardless of previously
  271. scheduled consumption requests
  272. :rtype: float
  273. :returns: The amount of time to wait for the specific request before
  274. actually consuming the specified amount.
  275. """
  276. self._total_wait += time_to_consume
  277. self._tokens_to_scheduled_consumption[token] = {
  278. 'wait_duration': self._total_wait,
  279. 'time_to_consume': time_to_consume,
  280. }
  281. return self._total_wait
  282. def process_scheduled_consumption(self, token):
  283. """Processes a scheduled consumption request that has completed
  284. :type token: RequestToken
  285. :param token: The token associated to the consumption
  286. request that is used to identify the request.
  287. """
  288. scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
  289. self._total_wait = max(
  290. self._total_wait - scheduled_retry['time_to_consume'], 0
  291. )
  292. class BandwidthRateTracker:
  293. def __init__(self, alpha=0.8):
  294. """Tracks the rate of bandwidth consumption
  295. :type a: float
  296. :param a: The constant to use in calculating the exponentional moving
  297. average of the bandwidth rate. Specifically it is used in the
  298. following calculation:
  299. current_rate = alpha * new_rate + (1 - alpha) * current_rate
  300. This value of this constant should be between 0 and 1.
  301. """
  302. self._alpha = alpha
  303. self._last_time = None
  304. self._current_rate = None
  305. @property
  306. def current_rate(self):
  307. """The current transfer rate
  308. :rtype: float
  309. :returns: The current tracked transfer rate
  310. """
  311. if self._last_time is None:
  312. return 0.0
  313. return self._current_rate
  314. def get_projected_rate(self, amt, time_at_consumption):
  315. """Get the projected rate using a provided amount and time
  316. :type amt: int
  317. :param amt: The proposed amount to consume
  318. :type time_at_consumption: float
  319. :param time_at_consumption: The proposed time to consume at
  320. :rtype: float
  321. :returns: The consumption rate if that amt and time were consumed
  322. """
  323. if self._last_time is None:
  324. return 0.0
  325. return self._calculate_exponential_moving_average_rate(
  326. amt, time_at_consumption
  327. )
  328. def record_consumption_rate(self, amt, time_at_consumption):
  329. """Record the consumption rate based off amount and time point
  330. :type amt: int
  331. :param amt: The amount that got consumed
  332. :type time_at_consumption: float
  333. :param time_at_consumption: The time at which the amount was consumed
  334. """
  335. if self._last_time is None:
  336. self._last_time = time_at_consumption
  337. self._current_rate = 0.0
  338. return
  339. self._current_rate = self._calculate_exponential_moving_average_rate(
  340. amt, time_at_consumption
  341. )
  342. self._last_time = time_at_consumption
  343. def _calculate_rate(self, amt, time_at_consumption):
  344. time_delta = time_at_consumption - self._last_time
  345. if time_delta <= 0:
  346. # While it is really unlikely to see this in an actual transfer,
  347. # we do not want to be returning back a negative rate or try to
  348. # divide the amount by zero. So instead return back an infinite
  349. # rate as the time delta is infinitesimally small.
  350. return float('inf')
  351. return amt / (time_delta)
  352. def _calculate_exponential_moving_average_rate(
  353. self, amt, time_at_consumption
  354. ):
  355. new_rate = self._calculate_rate(amt, time_at_consumption)
  356. return self._alpha * new_rate + (1 - self._alpha) * self._current_rate