This file is indexed.

/usr/lib/python3/dist-packages/txaio/_common.py is in python3-txaio 2.8.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import math
from txaio.interfaces import IBatchedTimer


class _BatchedCall(object):
    """
    Wraps IDelayedCall-implementing objects, implementing only the API
    which txaio promised in the first place: .cancel

    Do not create these yourself; use _BatchedTimer.call_later()
    """

    def __init__(self, timer, index, the_call):
        # XXX WeakRef?
        self._timer = timer
        self._index = index
        self._call = the_call

    def cancel(self):
        self._timer._remove_call(self._index, self)
        self._timer = None

    def __call__(self):
        return self._call()


class _BatchedTimer(IBatchedTimer):
    """
    Internal helper.

    Instances of this are returned from
    :meth:`txaio.make_batched_timer` and that is the only way they
    should be instantiated. You may depend on methods from the
    interface class only (:class:`txaio.IBatchedTimer`)

    **NOTE** that the times are in milliseconds in this class!
    """

    def __init__(self, bucket_milliseconds, chunk_size,
                 seconds_provider, delayed_call_creator, loop=None):
        if bucket_milliseconds <= 0.0:
            raise ValueError(
                "bucket_milliseconds must be > 0.0"
            )
        self._bucket_milliseconds = float(bucket_milliseconds)
        self._chunk_size = chunk_size
        self._get_seconds = seconds_provider
        self._create_delayed_call = delayed_call_creator
        self._buckets = dict()  # real seconds -> (IDelayedCall, list)
        self._loop = loop

    def call_later(self, delay, func, *args, **kwargs):
        """
        IBatchedTimer API
        """
        # "quantize" the delay to the nearest bucket
        now = self._get_seconds()
        real_time = int(now + delay) * 1000
        real_time -= int(real_time % self._bucket_milliseconds)
        call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs))
        try:
            self._buckets[real_time][1].append(call)
        except KeyError:
            # new bucket; need to add "actual" underlying IDelayedCall
            diff = (real_time / 1000.0) - now
            # we need to clamp this because if we quantized to the
            # nearest second, but that second is actually (slightly)
            # less than the current time 'diff' will be negative.
            delayed_call = self._create_delayed_call(
                max(0.0, diff),
                self._notify_bucket, real_time,
            )
            self._buckets[real_time] = (delayed_call, [call])
        return call

    def _notify_bucket(self, real_time):
        """
        Internal helper. This 'does' the callbacks in a particular bucket.

        :param real_time: the bucket to do callbacks on
        """
        (delayed_call, calls) = self._buckets[real_time]
        del self._buckets[real_time]
        errors = []

        def notify_one_chunk(calls, chunk_size, chunk_delay_ms):
            for call in calls[:chunk_size]:
                try:
                    call()
                except Exception as e:
                    errors.append(e)
            calls = calls[chunk_size:]
            if calls:
                self._create_delayed_call(
                    chunk_delay_ms / 1000.0,
                    notify_one_chunk, calls, chunk_size, chunk_delay_ms,
                )
            else:
                # done all calls; make sure there were no errors
                if len(errors):
                    msg = u"Error(s) processing call_later bucket:\n"
                    for e in errors:
                        msg += u"{}\n".format(e)
                    raise RuntimeError(msg)
        # ceil()ing because we want the number of chunks, and a
        # partial chunk is still a chunk
        delay_ms = self._bucket_milliseconds / math.ceil(float(len(calls)) / self._chunk_size)
        # I can't imagine any scenario in which chunk_delay_ms is
        # actually less than zero, but just being safe here
        notify_one_chunk(calls, self._chunk_size, max(0.0, delay_ms))

    def _remove_call(self, real_time, call):
        """
        Internal helper. Removes a (possibly still pending) call from a
        bucket. It is *not* an error of the bucket is gone (e.g. the
        call has already happened).
        """
        try:
            (delayed_call, calls) = self._buckets[real_time]
        except KeyError:
            # no such bucket ... error? swallow?
            return
        # remove call; if we're empty, cancel underlying
        # bucket-timeout IDelayedCall
        calls.remove(call)
        if not calls:
            del self._buckets[real_time]
            delayed_call.cancel()