/usr/lib/python2.7/dist-packages/maasserver/utils/async.py is in python-django-maas 1.5+bzr2252-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | # Copyright 2014 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Utilities for working with asynchronous operations."""
from __future__ import (
absolute_import,
print_function,
unicode_literals,
)
str = None
__metaclass__ = type
__all__ = [
"gather",
]
from itertools import count
from Queue import Queue
from crochet import wait_for_reactor
from maasserver.exceptions import IteratorReusedError
from twisted.internet import reactor
from twisted.internet.defer import maybeDeferred
from twisted.python import log
class UseOnceIterator:
"""An iterator that is usable only once."""
def __init__(self, *args):
"""Create a new :class:`UseOnceIterator`.
Takes the same arguments as iter().
"""
self.iterable = iter(*args)
self.has_run_once = False
def __iter__(self):
return self
def next(self):
if self.has_run_once:
raise IteratorReusedError(
"It is not possible to reuse a UseOnceIterator.")
try:
return self.iterable.next()
except StopIteration:
self.has_run_once = True
raise
@wait_for_reactor
def gather(calls, timeout=10.0):
"""gather(calls, timeout=10.0)
Issue calls into the reactor, passing results back to another thread.
:param calls: An iterable of no-argument callables to be called in
the reactor thread. Each will be called via
:py:func:`~twisted.internet.defer.maybeDeferred`.
:param timeout: The number of seconds before further results are
ignored. Outstanding results will be cancelled.
:return: A :class:`UseOnceIterator` of results. A result might be a
failure, i.e. an instance of
:py:class:`twisted.python.failure.Failure`, or a valid result;
it's up to the caller to check.
"""
# Prepare of a list of Deferreds that we're going to wait for.
deferreds = [maybeDeferred(call) for call in calls]
# We'll use this queue (thread-safe) to pass results back.
queue = Queue()
# A sentinel to mark the end of the results.
done = object()
# This function will get called if not all results are in before
# `timeout` seconds have passed. It puts `done` into the queue to
# indicate the end of results, and cancels all outstanding deferred
# calls.
def cancel():
queue.put(done)
for deferred in deferreds:
try:
deferred.cancel()
except:
log.err()
if timeout is None:
canceller = None
else:
canceller = reactor.callLater(timeout, cancel)
countdown = count(len(deferreds), -1)
# Callback to report the result back to the queue. If it's the last
# result to be reported, `done` is put into the queue, and the
# delayed call to `cancel` is itself cancelled.
def report(result):
queue.put(result)
if next(countdown) == 1:
queue.put(done)
if canceller is not None:
if canceller.active():
canceller.cancel()
for deferred in deferreds:
deferred.addBoth(report)
# If there are no calls then there will be no results, so we put
# `done` into the queue, and cancel the nascent delayed call to
# `cancel`, if it exists.
if len(deferreds) == 0:
queue.put(done)
if canceller is not None:
canceller.cancel()
# Return an iterator to the invoking thread that will stop at the
# first sign of the `done` sentinel.
return UseOnceIterator(queue.get, done)
|