This file is indexed.

/usr/lib/python2.7/dist-packages/maasserver/utils/async.py is in python-django-maas 1.5.4+bzr2294-0ubuntu1.2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# Copyright 2014 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Utilities for working with asynchronous operations."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

str = None

__metaclass__ = type
__all__ = [
    "gather",
]

from itertools import count
from Queue import Queue

from crochet import wait_for_reactor
from maasserver.exceptions import IteratorReusedError
from twisted.internet import reactor
from twisted.internet.defer import maybeDeferred
from twisted.python import log


class UseOnceIterator:
    """An iterator that is usable only once."""

    def __init__(self, *args):
        """Create a new :class:`UseOnceIterator`.

        Takes the same arguments as iter().
        """
        self.iterable = iter(*args)
        self.has_run_once = False

    def __iter__(self):
        return self

    def next(self):
        if self.has_run_once:
            raise IteratorReusedError(
                "It is not possible to reuse a UseOnceIterator.")
        try:
            return self.iterable.next()
        except StopIteration:
            self.has_run_once = True
            raise


@wait_for_reactor
def gather(calls, timeout=10.0):
    """gather(calls, timeout=10.0)

    Issue calls into the reactor, passing results back to another thread.

    :param calls: An iterable of no-argument callables to be called in
        the reactor thread. Each will be called via
        :py:func:`~twisted.internet.defer.maybeDeferred`.

    :param timeout: The number of seconds before further results are
        ignored. Outstanding results will be cancelled.

    :return: A :class:`UseOnceIterator` of results. A result might be a
        failure, i.e. an instance of
        :py:class:`twisted.python.failure.Failure`, or a valid result;
        it's up to the caller to check.

    """

    # Prepare of a list of Deferreds that we're going to wait for.
    deferreds = [maybeDeferred(call) for call in calls]

    # We'll use this queue (thread-safe) to pass results back.
    queue = Queue()

    # A sentinel to mark the end of the results.
    done = object()

    # This function will get called if not all results are in before
    # `timeout` seconds have passed. It puts `done` into the queue to
    # indicate the end of results, and cancels all outstanding deferred
    # calls.
    def cancel():
        queue.put(done)
        for deferred in deferreds:
            try:
                deferred.cancel()
            except:
                log.err()

    if timeout is None:
        canceller = None
    else:
        canceller = reactor.callLater(timeout, cancel)

    countdown = count(len(deferreds), -1)

    # Callback to report the result back to the queue. If it's the last
    # result to be reported, `done` is put into the queue, and the
    # delayed call to `cancel` is itself cancelled.
    def report(result):
        queue.put(result)
        if next(countdown) == 1:
            queue.put(done)
            if canceller is not None:
                if canceller.active():
                    canceller.cancel()

    for deferred in deferreds:
        deferred.addBoth(report)

    # If there are no calls then there will be no results, so we put
    # `done` into the queue, and cancel the nascent delayed call to
    # `cancel`, if it exists.
    if len(deferreds) == 0:
        queue.put(done)
        if canceller is not None:
            canceller.cancel()

    # Return an iterator to the invoking thread that will stop at the
    # first sign of the `done` sentinel.
    return UseOnceIterator(queue.get, done)