/usr/lib/python2.7/dist-packages/foolscap/promise.py is in python-foolscap 0.10.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 | # -*- test-case-name: foolscap.test.test_promise -*-
from twisted.python.failure import Failure
from twisted.internet import defer
from foolscap.eventual import eventually
EVENTUAL, CHAINED, NEAR, BROKEN = range(4)
class UsageError(Exception):
"""Raised when you do something inappropriate to a Promise."""
def _ignore(results):
pass
class Promise(object):
"""I am a promise of a future result. I am a lot like a Deferred, except
that my promised result is usually an instance. I make it possible to
schedule method invocations on this future instance, returning Promises
for the results.
Promises are always in one of three states: Eventual, Fulfilled, and
Broken. (see http://www.erights.org/elib/concurrency/refmech.html for a
pretty picture). They start as Eventual, meaning we do not yet know
whether they will resolve or not. In this state, method invocations are
queued. Eventually the Promise will be 'resolved' into either the
Fulfilled or the Broken state. Fulfilled means that the promise contains
a live object to which methods can be dispatched synchronously. Broken
promises are incapable of invoking methods: they all result in Failure.
Method invocation is always asynchronous: it always returns a Promise.
The only thing you can do with a promise 'p1' is to perform an
eventual-send on it, like so::
sendOnly(p1).foo(args) # ignores the result
p2 = send(p1).bar(args) # creates a Promise for the result
p2 = p1.bar(args) # same as send(p1).bar(args)
Or wait for it to resolve, using one of the following::
d = when(p); d.addCallback(cb) # provides a Deferred
p._then(cb, *args, **kwargs) # like when(p).addCallback(cb,*a,**kw)
p._except(cb, *args, **kwargs) # like when(p).addErrback(cb,*a,**kw)
The _then and _except forms return the same Promise. You can set up
chains of calls that will be invoked in the future, using a dataflow
style, like this::
p = getPromiseForServer()
d = p.getDatabase('db1')
r = d.getRecord(name)
def _print(record):
print 'the record says', record
def _oops(failure):
print 'something failed:', failure
r._then(_print)
r._except(_oops)
Or all collapsed in one sequence like::
getPromiseForServer().getDatabase('db1').getRecord(name)._then(_print)
The eventual-send will eventually invoke the method foo(args) on the
promise's resolution. This will return a new Promise for the results of
that method call.
"""
# all our internal methods are private, to avoid a confusing lack of an
# error message if someone tries to make a synchronous method call on us
# with a name that happens to match an internal one.
_state = EVENTUAL
_useDataflowStyle = True # enables p.foo(args)
def __init__(self):
self._watchers = []
self._pendingMethods = [] # list of (methname, args, kwargs, p)
# _then and _except are our only public methods. All other access is
# through normal (not underscore-prefixed) attribute names, which
# indicate names of methods on the target object that should be called
# later.
def _then(self, cb, *args, **kwargs):
d = self._wait_for_resolution()
d.addCallback(cb, *args, **kwargs)
d.addErrback(lambda ignore: None)
return self
def _except(self, cb, *args, **kwargs):
d = self._wait_for_resolution()
d.addErrback(cb, *args, **kwargs)
return self
# everything beyond here is private to this module
def __repr__(self):
return "<Promise %#x>" % id(self)
def __getattr__(self, name):
if not self._useDataflowStyle:
raise AttributeError("no such attribute %s" % name)
def newmethod(*args, **kwargs):
return self._send(name, args, kwargs)
return newmethod
# _send and _sendOnly are used by send() and sendOnly(). _send is also
# used by regular attribute access.
def _send(self, methname, args, kwargs):
"""Return a Promise (for the result of the call) when the call is
eventually made. The call is guaranteed to not fire in this turn."""
# this is called by send()
p, resolver = makePromise()
if self._state in (EVENTUAL, CHAINED):
self._pendingMethods.append((methname, args, kwargs, resolver))
else:
eventually(self._deliver, methname, args, kwargs, resolver)
return p
def _sendOnly(self, methname, args, kwargs):
"""Send a message like _send, but discard the result."""
# this is called by sendOnly()
if self._state in (EVENTUAL, CHAINED):
self._pendingMethods.append((methname, args, kwargs, _ignore))
else:
eventually(self._deliver, methname, args, kwargs, _ignore)
# _wait_for_resolution is used by when(), as well as _then and _except
def _wait_for_resolution(self):
"""Return a Deferred that will fire (with whatever was passed to
_resolve) when this Promise moves to a RESOLVED state (either NEAR or
BROKEN)."""
# this is called by when()
if self._state in (EVENTUAL, CHAINED):
d = defer.Deferred()
self._watchers.append(d)
return d
if self._state == NEAR:
return defer.succeed(self._target)
# self._state == BROKEN
return defer.fail(self._target)
# _resolve is our resolver method, and is handed out by makePromise()
def _resolve(self, target_or_failure):
"""Resolve this Promise to refer to the given target. If called with
a Failure, the Promise is now BROKEN. _resolve may only be called
once."""
# E splits this method into two pieces resolve(result) and
# smash(problem). It is easier for us to keep them in one piece,
# because d.addBoth(p._resolve) is convenient.
if self._state != EVENTUAL:
raise UsageError("Promises may not be resolved multiple times")
self._resolve2(target_or_failure)
# the remaining methods are internal, for use by this class only
def _resolve2(self, target_or_failure):
# we may be called with a Promise, an immediate value, or a Failure
if isinstance(target_or_failure, Promise):
self._state = CHAINED
when(target_or_failure).addBoth(self._resolve2)
return
if isinstance(target_or_failure, Failure):
self._break(target_or_failure)
return
self._target = target_or_failure
self._deliver_queued_messages()
self._state = NEAR
def _break(self, failure):
# TODO: think about what you do to break a resolved promise. Once the
# Promise is in the NEAR state, it can't be broken, but eventually
# we're going to have a FAR state, which *can* be broken.
"""Put this Promise in the BROKEN state."""
if not isinstance(failure, Failure):
raise UsageError("Promises must be broken with a Failure")
if self._state == BROKEN:
raise UsageError("Broken Promises may not be re-broken")
self._target = failure
if self._state in (EVENTUAL, CHAINED):
self._deliver_queued_messages()
self._state == BROKEN
def _invoke_method(self, name, args, kwargs):
if isinstance(self._target, Failure):
return self._target
method = getattr(self._target, name)
res = method(*args, **kwargs)
return res
def _deliverOneMethod(self, methname, args, kwargs):
method = getattr(self._target, methname)
return method(*args, **kwargs)
def _deliver(self, methname, args, kwargs, resolver):
# the resolver will be fired with both success and Failure
t = self._target
if isinstance(t, Promise):
resolver(t._send(methname, args, kwargs))
elif isinstance(t, Failure):
resolver(t)
else:
d = defer.maybeDeferred(self._deliverOneMethod,
methname, args, kwargs)
d.addBoth(resolver)
def _deliver_queued_messages(self):
for (methname, args, kwargs, resolver) in self._pendingMethods:
eventually(self._deliver, methname, args, kwargs, resolver)
del self._pendingMethods
# Q: what are the partial-ordering semantics between queued messages
# and when() clauses that are waiting on this Promise to be resolved?
for d in self._watchers:
eventually(d.callback, self._target)
del self._watchers
def resolvedPromise(resolution):
p = Promise()
p._resolve(resolution)
return p
def makePromise():
p = Promise()
return p, p._resolve
class _MethodGetterWrapper(object):
def __init__(self, callback):
self.cb = [callback]
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError("method %s is probably private" % name)
cb = self.cb[0] # avoid bound-methodizing
def newmethod(*args, **kwargs):
return cb(name, args, kwargs)
return newmethod
def send(o):
"""Make an eventual-send call on object C{o}. Use this as follows::
p = send(o).foo(args)
C{o} can either be a Promise or an immediate value. The arguments can
either be promises or immediate values.
send() always returns a Promise, and the o.foo(args) method invocation
always takes place in a later reactor turn.
Many thanks to Mark Miller for suggesting this syntax to me.
"""
if isinstance(o, Promise):
return _MethodGetterWrapper(o._send)
p = resolvedPromise(o)
return _MethodGetterWrapper(p._send)
def sendOnly(o):
"""Make an eventual-send call on object C{o}, and ignore the results.
"""
if isinstance(o, Promise):
return _MethodGetterWrapper(o._sendOnly)
# this is a little bit heavyweight for a simple eventually(), but it
# makes the code simpler
p = resolvedPromise(o)
return _MethodGetterWrapper(p._sendOnly)
def when(p):
"""Turn a Promise into a Deferred that will fire with the enclosed object
when it is ready. Use this when you actually need to schedule something
to happen in a synchronous fashion. Most of the time, you can just invoke
methods on the Promise as if it were immediately available."""
assert isinstance(p, Promise)
return p._wait_for_resolution()
|