/usr/share/pyshared/scrapy/utils/defer.py is in python-scrapy 0.14.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | """
Helper functions for dealing with Twisted deferreds
"""
from twisted.internet import defer, reactor, task
from twisted.python import failure
from scrapy.exceptions import IgnoreRequest
def defer_fail(_failure):
"""Same as twisted.internet.defer.fail, but delay calling errback until
next reactor loop
"""
d = defer.Deferred()
reactor.callLater(0, d.errback, _failure)
return d
def defer_succeed(result):
"""Same as twsited.internet.defer.succed, but delay calling callback until
next reactor loop
"""
d = defer.Deferred()
reactor.callLater(0, d.callback, result)
return d
def defer_result(result):
if isinstance(result, defer.Deferred):
return result
elif isinstance(result, failure.Failure):
return defer_fail(result)
else:
return defer_succeed(result)
def mustbe_deferred(f, *args, **kw):
"""Same as twisted.internet.defer.maybeDeferred, but delay calling
callback/errback to next reactor loop
"""
try:
result = f(*args, **kw)
# FIXME: Hack to avoid introspecting tracebacks. This to speed up
# processing of IgnoreRequest errors which are, by far, the most common
# exception in Scrapy - see #125
except IgnoreRequest, e:
return defer_fail(failure.Failure(e))
except:
return defer_fail(failure.Failure())
else:
return defer_result(result)
def parallel(iterable, count, callable, *args, **named):
"""Execute a callable over the objects in the given iterable, in parallel,
using no more than ``count`` concurrent calls.
Taken from: http://jcalderone.livejournal.com/24285.html
"""
coop = task.Cooperator()
work = (callable(elem, *args, **named) for elem in iterable)
return defer.DeferredList([coop.coiterate(work) for i in xrange(count)])
def process_chain(callbacks, input, *a, **kw):
"""Return a Deferred built by chaining the given callbacks"""
d = defer.Deferred()
for x in callbacks:
d.addCallback(x, *a, **kw)
d.callback(input)
return d
def process_chain_both(callbacks, errbacks, input, *a, **kw):
"""Return a Deferred built by chaining the given callbacks and errbacks"""
d = defer.Deferred()
for cb, eb in zip(callbacks, errbacks):
d.addCallbacks(cb, eb, callbackArgs=a, callbackKeywords=kw,
errbackArgs=a, errbackKeywords=kw)
if isinstance(input, failure.Failure):
d.errback(input)
else:
d.callback(input)
return d
def process_parallel(callbacks, input, *a, **kw):
"""Return a Deferred with the output of all successful calls to the given
callbacks
"""
dfds = [defer.succeed(input).addCallback(x, *a, **kw) for x in callbacks]
d = defer.gatherResults(dfds)
d.addErrback(lambda _: _.value.subFailure)
return d
def iter_errback(iterable, errback, *a, **kw):
"""Wraps an iterable calling an errback if an error is caught while
iterating it.
"""
it = iter(iterable)
while 1:
try:
yield it.next()
except StopIteration:
break
except:
errback(failure.Failure(), *a, **kw)
|