/usr/share/pyshared/scrapyd/launcher.py is in python-scrapy 0.14.4-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | import sys, os
from datetime import datetime
from twisted.internet import reactor, defer, protocol, error
from twisted.application.service import Service
from twisted.python import log
from scrapy.utils.py26 import cpu_count
from scrapy.utils.python import stringify_dict
from scrapyd.utils import get_crawl_args
from .interfaces import IPoller, IEnvironment
class Launcher(Service):
name = 'launcher'
def __init__(self, config, app):
self.processes = {}
self.max_proc = config.getint('max_proc', 0)
if not self.max_proc:
self.max_proc = cpu_count() * config.getint('max_proc_per_cpu', 4)
self.runner = config.get('runner', 'scrapyd.runner')
self.app = app
def startService(self):
for slot in range(self.max_proc):
self._wait_for_project(slot)
log.msg("%s started: max_proc=%r, runner=%r" % (self.parent.name, \
self.max_proc, self.runner), system="Launcher")
def _wait_for_project(self, slot):
poller = self.app.getComponent(IPoller)
poller.next().addCallback(self._spawn_process, slot)
def _spawn_process(self, message, slot):
msg = stringify_dict(message, keys_only=False)
project = msg['_project']
args = [sys.executable, '-m', self.runner, 'crawl']
args += get_crawl_args(msg)
e = self.app.getComponent(IEnvironment)
env = e.get_environment(msg, slot)
env = stringify_dict(env, keys_only=False)
pp = ScrapyProcessProtocol(slot, project, msg['_spider'], \
msg['_job'], env)
pp.deferred.addBoth(self._process_finished, slot)
reactor.spawnProcess(pp, sys.executable, args=args, env=env)
self.processes[slot] = pp
def _process_finished(self, _, slot):
self.processes.pop(slot)
self._wait_for_project(slot)
class ScrapyProcessProtocol(protocol.ProcessProtocol):
def __init__(self, slot, project, spider, job, env):
self.slot = slot
self.pid = None
self.project = project
self.spider = spider
self.job = job
self.start_time = datetime.now()
self.env = env
self.logfile = env['SCRAPY_LOG_FILE']
self.deferred = defer.Deferred()
def outReceived(self, data):
log.msg(data.rstrip(), system="Launcher,%d/stdout" % self.pid)
def errReceived(self, data):
log.msg(data.rstrip(), system="Launcher,%d/stderr" % self.pid)
def connectionMade(self):
self.pid = self.transport.pid
self.log("Process started: ")
def processEnded(self, status):
if isinstance(status.value, error.ProcessDone):
self.log("Process finished: ")
else:
self.log("Process died: exitstatus=%r " % status.value.exitCode)
self.deferred.callback(self)
def log(self, msg):
msg += "project=%r spider=%r job=%r pid=%r log=%r" % (self.project, \
self.spider, self.job, self.pid, self.logfile)
log.msg(msg, system="Launcher")
|