This file is indexed.

/usr/share/pyshared/allmydata/util/pipeline.py is in tahoe-lafs 1.9.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.python import log
from allmydata.util.assertutil import precondition

class PipelineError(Exception):
    """One of the pipelined messages returned an error. The received Failure
    object is stored in my .error attribute."""
    def __init__(self, error):
        self.error = error

    def __repr__(self):
        return "<PipelineError error=(%r)>" % (self.error,)
    def __str__(self):
        return "<PipelineError error=(%s)>" % (self.error,)

class SingleFileError(Exception):
    """You are not permitted to add a job to a full pipeline."""


class ExpandableDeferredList(defer.Deferred):
    # like DeferredList(fireOnOneErrback=True) with a built-in
    # gatherResults(), but you can add new Deferreds until you close it. This
    # gives you a chance to add don't-complain-about-unhandled-error errbacks
    # immediately after attachment, regardless of whether you actually end up
    # wanting the list or not.
    def __init__(self):
        defer.Deferred.__init__(self)
        self.resultsReceived = 0
        self.resultList = []
        self.failure = None
        self.closed = False

    def addDeferred(self, d):
        precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
        index = len(self.resultList)
        self.resultList.append(None)
        d.addCallbacks(self._cbDeferred, self._ebDeferred,
                       callbackArgs=(index,))
        return d

    def close(self):
        self.closed = True
        self.checkForFinished()

    def checkForFinished(self):
        if not self.closed:
            return
        if self.called:
            return
        if self.failure:
            self.errback(self.failure)
        elif self.resultsReceived == len(self.resultList):
            self.callback(self.resultList)

    def _cbDeferred(self, res, index):
        self.resultList[index] = res
        self.resultsReceived += 1
        self.checkForFinished()
        return res

    def _ebDeferred(self, f):
        self.failure = f
        self.checkForFinished()
        return f


class Pipeline:
    """I manage a size-limited pipeline of Deferred operations, usually
    callRemote() messages."""

    def __init__(self, capacity):
        self.capacity = capacity # how full we can be
        self.gauge = 0 # how full we are
        self.failure = None
        self.waiting = [] # callers of add() who are blocked
        self.unflushed = ExpandableDeferredList()

    def add(self, _size, _func, *args, **kwargs):
        # We promise that all the Deferreds we return will fire in the order
        # they were returned. To make it easier to keep this promise, we
        # prohibit multiple outstanding calls to add() .
        if self.waiting:
            raise SingleFileError
        if self.failure:
            return defer.fail(self.failure)
        self.gauge += _size
        fd = defer.maybeDeferred(_func, *args, **kwargs)
        fd.addBoth(self._call_finished, _size)
        self.unflushed.addDeferred(fd)
        fd.addErrback(self._eat_pipeline_errors)
        fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
        if self.gauge < self.capacity:
            return defer.succeed(None)
        d = defer.Deferred()
        self.waiting.append(d)
        return d

    def flush(self):
        if self.failure:
            return defer.fail(self.failure)
        d, self.unflushed = self.unflushed, ExpandableDeferredList()
        d.close()
        d.addErrback(self._flushed_error)
        return d

    def _flushed_error(self, f):
        precondition(self.failure) # should have been set by _call_finished
        return self.failure

    def _call_finished(self, res, size):
        self.gauge -= size
        if isinstance(res, Failure):
            res = Failure(PipelineError(res))
            if not self.failure:
                self.failure = res
        if self.failure:
            while self.waiting:
                d = self.waiting.pop(0)
                d.errback(self.failure)
        else:
            while self.waiting and (self.gauge < self.capacity):
                d = self.waiting.pop(0)
                d.callback(None)
                # the d.callback() might trigger a new call to add(), which
                # will raise our gauge and might cause the pipeline to be
                # filled. So the while() loop gets a chance to tell the
                # caller to stop.
        return res

    def _eat_pipeline_errors(self, f):
        f.trap(PipelineError)
        return None