/usr/lib/python3/dist-packages/s3transfer/bandwidth.py is in python3-s3transfer 0.1.13-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 | # Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import time
import threading
class RequestExceededException(Exception):
def __init__(self, requested_amt, retry_time):
"""Error when requested amount exceeds what is allowed
The request that raised this error should be retried after waiting
the time specified by ``retry_time``.
:type requested_amt: int
:param requested_amt: The originally requested byte amount
:type retry_time: float
:param retry_time: The length in time to wait to retry for the
requested amount
"""
self.requested_amt = requested_amt
self.retry_time = retry_time
msg = (
'Request amount %s exceeded the amount available. Retry in %s' % (
requested_amt, retry_time)
)
super(RequestExceededException, self).__init__(msg)
class RequestToken(object):
"""A token to pass as an identifier when consuming from the LeakyBucket"""
pass
class TimeUtils(object):
def time(self):
"""Get the current time back
:rtype: float
:returns: The current time in seconds
"""
return time.time()
def sleep(self, value):
"""Sleep for a designated time
:type value: float
:param value: The time to sleep for in seconds
"""
return time.sleep(value)
class BandwidthLimiter(object):
def __init__(self, leaky_bucket, time_utils=None):
"""Limits bandwidth for shared S3 transfers
:type leaky_bucket: LeakyBucket
:param leaky_bucket: The leaky bucket to use limit bandwidth
:type time_utils: TimeUtils
:param time_utils: Time utility to use for interacting with time.
"""
self._leaky_bucket = leaky_bucket
self._time_utils = time_utils
if time_utils is None:
self._time_utils = TimeUtils()
def get_bandwith_limited_stream(self, fileobj, transfer_coordinator,
enabled=True):
"""Wraps a fileobj in a bandwidth limited stream wrapper
:type fileobj: file-like obj
:param fileobj: The file-like obj to wrap
:type transfer_coordinator: s3transfer.futures.TransferCoordinator
param transfer_coordinator: The coordinator for the general transfer
that the wrapped stream is a part of
:type enabled: boolean
:param enabled: Whether bandwidth limiting should be enabled to start
"""
stream = BandwidthLimitedStream(
fileobj, self._leaky_bucket, transfer_coordinator,
self._time_utils)
if not enabled:
stream.disable_bandwidth_limiting()
return stream
class BandwidthLimitedStream(object):
def __init__(self, fileobj, leaky_bucket, transfer_coordinator,
time_utils=None, bytes_threshold=256 * 1024):
"""Limits bandwidth for reads on a wrapped stream
:type fileobj: file-like object
:param fileobj: The file like object to wrap
:type leaky_bucket: LeakyBucket
:param leaky_bucket: The leaky bucket to use to throttle reads on
the stream
:type transfer_coordinator: s3transfer.futures.TransferCoordinator
param transfer_coordinator: The coordinator for the general transfer
that the wrapped stream is a part of
:type time_utils: TimeUtils
:param time_utils: The time utility to use for interacting with time
"""
self._fileobj = fileobj
self._leaky_bucket = leaky_bucket
self._transfer_coordinator = transfer_coordinator
self._time_utils = time_utils
if time_utils is None:
self._time_utils = TimeUtils()
self._bandwidth_limiting_enabled = True
self._request_token = RequestToken()
self._bytes_seen = 0
self._bytes_threshold = bytes_threshold
def enable_bandwidth_limiting(self):
"""Enable bandwidth limiting on reads to the stream"""
self._bandwidth_limiting_enabled = True
def disable_bandwidth_limiting(self):
"""Disable bandwidth limiting on reads to the stream"""
self._bandwidth_limiting_enabled = False
def read(self, amount):
"""Read a specified amount
Reads will only be throttled if bandwidth limiting is enabled.
"""
if not self._bandwidth_limiting_enabled:
return self._fileobj.read(amount)
# We do not want to be calling consume on every read as the read
# amounts can be small causing the lock of the leaky bucket to
# introduce noticeable overhead. So instead we keep track of
# how many bytes we have seen and only call consume once we pass a
# certain threshold.
self._bytes_seen += amount
if self._bytes_seen < self._bytes_threshold:
return self._fileobj.read(amount)
self._consume_through_leaky_bucket()
return self._fileobj.read(amount)
def _consume_through_leaky_bucket(self):
# NOTE: If the read amonut on the stream are high, it will result
# in large bursty behavior as there is not an interface for partial
# reads. However given the read's on this abstraction are at most 256KB
# (via downloads), it reduces the burstiness to be small KB bursts at
# worst.
while not self._transfer_coordinator.exception:
try:
self._leaky_bucket.consume(
self._bytes_seen, self._request_token)
self._bytes_seen = 0
return
except RequestExceededException as e:
self._time_utils.sleep(e.retry_time)
else:
raise self._transfer_coordinator.exception
def signal_transferring(self):
"""Signal that data being read is being transferred to S3"""
self.enable_bandwidth_limiting()
def signal_not_transferring(self):
"""Signal that data being read is not being transferred to S3"""
self.disable_bandwidth_limiting()
def seek(self, where):
self._fileobj.seek(where)
def tell(self):
return self._fileobj.tell()
def close(self):
if self._bandwidth_limiting_enabled and self._bytes_seen:
# This handles the case where the file is small enough to never
# trigger the threshold and thus is never subjugated to the
# leaky bucket on read(). This specifically happens for small
# uploads. So instead to account for those bytes, have
# it go through the leaky bucket when the file gets closed.
self._consume_through_leaky_bucket()
self._fileobj.close()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
class LeakyBucket(object):
def __init__(self, max_rate, time_utils=None, rate_tracker=None,
consumption_scheduler=None):
"""A leaky bucket abstraction to limit bandwidth consumption
:type rate: int
:type rate: The maximum rate to allow. This rate is in terms of
bytes per second.
:type time_utils: TimeUtils
:param time_utils: The time utility to use for interacting with time
:type rate_tracker: BandwidthRateTracker
:param rate_tracker: Tracks bandwidth consumption
:type consumption_scheduler: ConsumptionScheduler
:param consumption_scheduler: Schedules consumption retries when
necessary
"""
self._max_rate = float(max_rate)
self._time_utils = time_utils
if time_utils is None:
self._time_utils = TimeUtils()
self._lock = threading.Lock()
self._rate_tracker = rate_tracker
if rate_tracker is None:
self._rate_tracker = BandwidthRateTracker()
self._consumption_scheduler = consumption_scheduler
if consumption_scheduler is None:
self._consumption_scheduler = ConsumptionScheduler()
def consume(self, amt, request_token):
"""Consume an a requested amount
:type amt: int
:param amt: The amount of bytes to request to consume
:type request_token: RequestToken
:param request_token: The token associated to the consumption
request that is used to identify the request. So if a
RequestExceededException is raised the token should be used
in subsequent retry consume() request.
:raises RequestExceededException: If the consumption amount would
exceed the maximum allocated bandwidth
:rtype: int
:returns: The amount consumed
"""
with self._lock:
time_now = self._time_utils.time()
if self._consumption_scheduler.is_scheduled(request_token):
return self._release_requested_amt_for_scheduled_request(
amt, request_token, time_now)
elif self._projected_to_exceed_max_rate(amt, time_now):
self._raise_request_exceeded_exception(
amt, request_token, time_now)
else:
return self._release_requested_amt(amt, time_now)
def _projected_to_exceed_max_rate(self, amt, time_now):
projected_rate = self._rate_tracker.get_projected_rate(amt, time_now)
return projected_rate > self._max_rate
def _release_requested_amt_for_scheduled_request(self, amt, request_token,
time_now):
self._consumption_scheduler.process_scheduled_consumption(
request_token)
return self._release_requested_amt(amt, time_now)
def _raise_request_exceeded_exception(self, amt, request_token, time_now):
allocated_time = amt/float(self._max_rate)
retry_time = self._consumption_scheduler.schedule_consumption(
amt, request_token, allocated_time)
raise RequestExceededException(
requested_amt=amt, retry_time=retry_time)
def _release_requested_amt(self, amt, time_now):
self._rate_tracker.record_consumption_rate(amt, time_now)
return amt
class ConsumptionScheduler(object):
def __init__(self):
"""Schedules when to consume a desired amount"""
self._tokens_to_scheduled_consumption = {}
self._total_wait = 0
def is_scheduled(self, token):
"""Indicates if a consumption request has been scheduled
:type token: RequestToken
:param token: The token associated to the consumption
request that is used to identify the request.
"""
return token in self._tokens_to_scheduled_consumption
def schedule_consumption(self, amt, token, time_to_consume):
"""Schedules a wait time to be able to consume an amount
:type amt: int
:param amt: The amount of bytes scheduled to be consumed
:type token: RequestToken
:param token: The token associated to the consumption
request that is used to identify the request.
:type time_to_consume: float
:param time_to_consume: The desired time it should take for that
specific request amount to be consumed in regardless of previously
scheduled consumption requests
:rtype: float
:returns: The amount of time to wait for the specific request before
actually consuming the specified amount.
"""
self._total_wait += time_to_consume
self._tokens_to_scheduled_consumption[token] = {
'wait_duration': self._total_wait,
'time_to_consume': time_to_consume,
}
return self._total_wait
def process_scheduled_consumption(self, token):
"""Processes a scheduled consumption request that has completed
:type token: RequestToken
:param token: The token associated to the consumption
request that is used to identify the request.
"""
scheduled_retry = self._tokens_to_scheduled_consumption.pop(token)
self._total_wait = max(
self._total_wait - scheduled_retry['time_to_consume'], 0)
class BandwidthRateTracker(object):
def __init__(self, alpha=0.8):
"""Tracks the rate of bandwidth consumption
:type a: float
:param a: The constant to use in calculating the exponentional moving
average of the bandwidth rate. Specifically it is used in the
following calculation:
current_rate = alpha * new_rate + (1 - alpha) * current_rate
This value of this constant should be between 0 and 1.
"""
self._alpha = alpha
self._last_time = None
self._current_rate = None
@property
def current_rate(self):
"""The current transfer rate
:rtype: float
:returns: The current tracked transfer rate
"""
if self._last_time is None:
return 0.0
return self._current_rate
def get_projected_rate(self, amt, time_at_consumption):
"""Get the projected rate using a provided amount and time
:type amt: int
:param amt: The proposed amount to consume
:type time_at_consumption: float
:param time_at_consumption: The proposed time to consume at
:rtype: float
:returns: The consumption rate if that amt and time were consumed
"""
if self._last_time is None:
return 0.0
return self._calculate_exponential_moving_average_rate(
amt, time_at_consumption)
def record_consumption_rate(self, amt, time_at_consumption):
"""Record the consumption rate based off amount and time point
:type amt: int
:param amt: The amount that got consumed
:type time_at_consumption: float
:param time_at_consumption: The time at which the amount was consumed
"""
if self._last_time is None:
self._last_time = time_at_consumption
self._current_rate = 0.0
return
self._current_rate = self._calculate_exponential_moving_average_rate(
amt, time_at_consumption)
self._last_time = time_at_consumption
def _calculate_rate(self, amt, time_at_consumption):
time_delta = time_at_consumption - self._last_time
if time_delta <= 0:
# While it is really unlikley to see this in an actual transfer,
# we do not want to be returning back a negative rate or try to
# divide the amount by zero. So instead return back an infinite
# rate as the time delta is infinitesimally small.
return float('inf')
return amt / (time_delta)
def _calculate_exponential_moving_average_rate(self, amt,
time_at_consumption):
new_rate = self._calculate_rate(amt, time_at_consumption)
return self._alpha * new_rate + (1 - self._alpha) * self._current_rate
|