This file is indexed.

/usr/lib/python3/dist-packages/txaio/_common.py is in python3-txaio 2.5.1+2016.10.03.git.623ef68776-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import math
from txaio.interfaces import IBatchedTimer


class _BatchedCall(object):
    """
    Wraps IDelayedCall-implementing objects, implementing only the API
    which txaio promised in the first place: .cancel

    Do not create these yourself; use _BatchedTimer.call_later()
    """

    def __init__(self, timer, index, the_call):
        # XXX WeakRef?
        self._timer = timer
        self._index = index
        self._call = the_call

    def cancel(self):
        self._timer._remove_call(self._index, self)
        self._timer = None

    def __call__(self):
        return self._call()


class _BatchedTimer(IBatchedTimer):
    """
    Internal helper.

    Instances of this are returned from
    :meth:`txaio.make_batched_timer` and that is the only way they
    should be instantiated. You may depend on methods from the
    interface class only (:class:`txaio.IBatchedTimer`)

    **NOTE** that the times are in milliseconds in this class!
    """

    def __init__(self, bucket_milliseconds, chunk_size,
                 seconds_provider, delayed_call_creator):
        self._bucket_milliseconds = float(bucket_milliseconds)
        self._chunk_size = chunk_size
        self._get_seconds = seconds_provider
        self._create_delayed_call = delayed_call_creator
        self._buckets = dict()  # real seconds -> (IDelayedCall, list)

    def call_later(self, delay, func, *args, **kwargs):
        """
        IBatchedTimer API
        """
        # "quantize" the delay to the nearest bucket
        real_time = int(self._get_seconds() + delay) * 1000
        real_time -= int(real_time % self._bucket_milliseconds)
        call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs))
        try:
            self._buckets[real_time][1].append(call)
        except KeyError:
            # new bucket; need to add "actual" underlying IDelayedCall
            delayed_call = self._create_delayed_call(
                (real_time / 1000.0) - self._get_seconds(),
                self._notify_bucket, real_time,
            )
            self._buckets[real_time] = (delayed_call, [call])
        return call

    def _notify_bucket(self, real_time):
        """
        Internal helper. This 'does' the callbacks in a particular bucket.

        :param real_time: the bucket to do callbacks on
        """
        (delayed_call, calls) = self._buckets[real_time]
        del self._buckets[real_time]
        errors = []

        def notify_one_chunk(calls, chunk_size, chunk_delay_ms):
            for call in calls[:chunk_size]:
                try:
                    call()
                except Exception as e:
                    errors.append(e)
            calls = calls[chunk_size:]
            if calls:
                self._create_delayed_call(
                    chunk_delay_ms / 1000.0,
                    notify_one_chunk, calls, chunk_size, chunk_delay_ms,
                )
            else:
                # done all calls; make sure there were no errors
                if len(errors):
                    msg = u"Error(s) processing call_later bucket:\n"
                    for e in errors:
                        msg += u"{}\n".format(e)
                    raise RuntimeError(msg)
        # ceil()ing because we want the number of chunks, and a
        # partial chunk is still a chunk
        delay_ms = self._bucket_milliseconds / math.ceil(float(len(calls)) / self._chunk_size)
        notify_one_chunk(calls, self._chunk_size, delay_ms)

    def _remove_call(self, real_time, call):
        """
        Internal helper. Removes a (possibly still pending) call from a
        bucket. It is *not* an error of the bucket is gone (e.g. the
        call has already happened).
        """
        try:
            (delayed_call, calls) = self._buckets[real_time]
        except KeyError:
            # no such bucket ... error? swallow?
            return
        # remove call; if we're empty, cancel underlying
        # bucket-timeout IDelayedCall
        calls.remove(call)
        if not calls:
            del self._buckets[real_time]
            delayed_call.cancel()