/usr/share/pyshared/scrapy/utils/testproc.py is in python-scrapy 0.14.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | import sys
import os
from twisted.internet import reactor, defer, protocol
class ProcessTest(object):
command = None
prefix = [sys.executable, '-m', 'scrapy.cmdline']
cwd = os.getcwd() # trial chdirs to temp dir
def execute(self, args, check_code=True, settings='missing'):
env = os.environ.copy()
env['SCRAPY_SETTINGS_MODULE'] = settings
cmd = self.prefix + [self.command] + list(args)
pp = TestProcessProtocol()
pp.deferred.addBoth(self._process_finished, cmd, check_code)
reactor.spawnProcess(pp, cmd[0], cmd, env=env, path=self.cwd)
return pp.deferred
def _process_finished(self, pp, cmd, check_code):
if pp.exitcode and check_code:
msg = "process %s exit with code %d" % (cmd, pp.exitcode)
msg += "\n>>> stdout <<<\n%s" % pp.out
msg += "\n"
msg += "\n>>> stderr <<<\n%s" % pp.err
raise RuntimeError(msg)
return pp.exitcode, pp.out, pp.err
class TestProcessProtocol(protocol.ProcessProtocol):
def __init__(self):
self.deferred = defer.Deferred()
self.out = ''
self.err = ''
self.exitcode = None
def outReceived(self, data):
self.out += data
def errReceived(self, data):
self.err += data
def processEnded(self, status):
self.exitcode = status.value.exitCode
self.deferred.callback(self)
|