/usr/lib/python3/dist-packages/txaio/_common.py is in python3-txaio 2.5.1+2016.10.03.git.623ef68776-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | import math
from txaio.interfaces import IBatchedTimer
class _BatchedCall(object):
"""
Wraps IDelayedCall-implementing objects, implementing only the API
which txaio promised in the first place: .cancel
Do not create these yourself; use _BatchedTimer.call_later()
"""
def __init__(self, timer, index, the_call):
# XXX WeakRef?
self._timer = timer
self._index = index
self._call = the_call
def cancel(self):
self._timer._remove_call(self._index, self)
self._timer = None
def __call__(self):
return self._call()
class _BatchedTimer(IBatchedTimer):
"""
Internal helper.
Instances of this are returned from
:meth:`txaio.make_batched_timer` and that is the only way they
should be instantiated. You may depend on methods from the
interface class only (:class:`txaio.IBatchedTimer`)
**NOTE** that the times are in milliseconds in this class!
"""
def __init__(self, bucket_milliseconds, chunk_size,
seconds_provider, delayed_call_creator):
self._bucket_milliseconds = float(bucket_milliseconds)
self._chunk_size = chunk_size
self._get_seconds = seconds_provider
self._create_delayed_call = delayed_call_creator
self._buckets = dict() # real seconds -> (IDelayedCall, list)
def call_later(self, delay, func, *args, **kwargs):
"""
IBatchedTimer API
"""
# "quantize" the delay to the nearest bucket
real_time = int(self._get_seconds() + delay) * 1000
real_time -= int(real_time % self._bucket_milliseconds)
call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs))
try:
self._buckets[real_time][1].append(call)
except KeyError:
# new bucket; need to add "actual" underlying IDelayedCall
delayed_call = self._create_delayed_call(
(real_time / 1000.0) - self._get_seconds(),
self._notify_bucket, real_time,
)
self._buckets[real_time] = (delayed_call, [call])
return call
def _notify_bucket(self, real_time):
"""
Internal helper. This 'does' the callbacks in a particular bucket.
:param real_time: the bucket to do callbacks on
"""
(delayed_call, calls) = self._buckets[real_time]
del self._buckets[real_time]
errors = []
def notify_one_chunk(calls, chunk_size, chunk_delay_ms):
for call in calls[:chunk_size]:
try:
call()
except Exception as e:
errors.append(e)
calls = calls[chunk_size:]
if calls:
self._create_delayed_call(
chunk_delay_ms / 1000.0,
notify_one_chunk, calls, chunk_size, chunk_delay_ms,
)
else:
# done all calls; make sure there were no errors
if len(errors):
msg = u"Error(s) processing call_later bucket:\n"
for e in errors:
msg += u"{}\n".format(e)
raise RuntimeError(msg)
# ceil()ing because we want the number of chunks, and a
# partial chunk is still a chunk
delay_ms = self._bucket_milliseconds / math.ceil(float(len(calls)) / self._chunk_size)
notify_one_chunk(calls, self._chunk_size, delay_ms)
def _remove_call(self, real_time, call):
"""
Internal helper. Removes a (possibly still pending) call from a
bucket. It is *not* an error of the bucket is gone (e.g. the
call has already happened).
"""
try:
(delayed_call, calls) = self._buckets[real_time]
except KeyError:
# no such bucket ... error? swallow?
return
# remove call; if we're empty, cancel underlying
# bucket-timeout IDelayedCall
calls.remove(call)
if not calls:
del self._buckets[real_time]
delayed_call.cancel()
|