This file is indexed.

/usr/share/pyshared/tftp/util.py is in python-txtftp 0.1~bzr38-0ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
'''
@author: shylent
'''
from functools import wraps
from twisted.internet import reactor
from twisted.internet.defer import maybeDeferred


__all__ = ['SequentialCall', 'Spent', 'Cancelled', 'deferred']


class Spent(Exception):
    """Trying to iterate a L{SequentialCall}, that is exhausted"""

class Cancelled(Exception):
    """Trying to iterate a L{SequentialCall}, that's been cancelled"""

def no_op(*args, **kwargs):
    pass

class SequentialCall(object):
    """Calls a given callable at intervals, specified by the L{timeout} iterable.
    Optionally calls a timeout handler, if provided, when there are no more timeout values.

    @param timeout: an iterable, that yields valid _seconds arguments to
    L{callLater<twisted.internet.interfaces.IReactorTime.callLater>} (floats)
    @type timeout: any iterable

    @param run_now: whether or not the callable should be called immediately
    upon initialization. Relinquishes control to the reactor
    (calls callLater(0,...)). Default: C{False}.
    @type run_now: C{bool}

    @param callable: the callable, that will be called at the specified intervals
    @param callable_args: the arguments to call it with
    @param callable_kwargs: the keyword arguments to call it with

    @param on_timeout: the callable, that will be called when there are no more
    timeout values
    @param on_timeout_args: the arguments to call it with
    @param on_timeout_kwargs: the keyword arguments to call it with 
    
    """

    @classmethod
    def run(cls, timeout, callable, callable_args=None, callable_kwargs=None,
                 on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None,
                 run_now=False, _clock=None):
        """Create a L{SequentialCall} object and start its scheduler cycle

        @see: L{SequentialCall}

        """
        inst = cls(timeout, callable, callable_args, callable_kwargs,
                   on_timeout, on_timeout_args, on_timeout_kwargs,
                   run_now, _clock)
        inst.reschedule()
        return inst

    def __init__(self, timeout,
                 callable, callable_args=None, callable_kwargs=None,
                 on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None,
                 run_now=False, _clock=None):
        self._timeout = iter(timeout)
        self.callable = callable
        self.callable_args = callable_args or []
        self.callable_kwargs = callable_kwargs or {}
        self.on_timeout = on_timeout or no_op
        self.on_timeout_args = on_timeout_args or []
        self.on_timeout_kwargs = on_timeout_kwargs or {}
        self._wd = None
        self._spent = self._cancelled = False
        self._ran_first = not run_now
        if _clock is None:
            self._clock = reactor
        else:
            self._clock = _clock

    def _call_and_schedule(self):
        self.callable(*self.callable_args, **self.callable_kwargs)
        self._ran_first = True
        if not self._spent:
            self.reschedule()

    def reschedule(self):
        """Schedule the next L{callable} call

        @raise Spent: if the timeout iterator has been exhausted and on_timeout
        handler has been already called
        @raise Cancelled: if this L{SequentialCall} has already been cancelled

        """
        if not self._ran_first:
            self._wd = self._clock.callLater(0, self._call_and_schedule)
            return
        if self._cancelled:
            raise Cancelled("This SequentialCall has already been cancelled")
        if self._spent:
            raise Spent("This SequentialCall has already timed out")
        try:
            next_timeout = self._timeout.next()
            self._wd = self._clock.callLater(next_timeout, self._call_and_schedule)
        except StopIteration:
            self.on_timeout(*self.on_timeout_args, **self.on_timeout_kwargs)
            self._spent = True

    def cancel(self):
        """Cancel the next scheduled call

        @raise Cancelled: if this SequentialCall has already been cancelled
        @raise Spent: if this SequentialCall has expired

        """
        if self._cancelled:
            raise Cancelled("This SequentialCall has already been cancelled")
        if self._spent:
            raise Spent("This SequentialCall has already timed out")
        if self._wd is not None and self._wd.active():
            self._wd.cancel()
        self._spent = self._cancelled = True

    def active(self):
        """Whether or not this L{SequentialCall} object is considered active"""
        return not (self._spent or self._cancelled)


def deferred(func):
    """Decorates a function to ensure that it always returns a `Deferred`.

    This also serves a secondary documentation purpose; functions decorated
    with this are readily identifiable as asynchronous.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        return maybeDeferred(func, *args, **kwargs)
    return wrapper