/usr/lib/python3/dist-packages/celery/bin/celeryd_detach.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | # -*- coding: utf-8 -*-
"""
celery.bin.celeryd_detach
~~~~~~~~~~~~~~~~~~~~~~~~~
Program used to daemonize the worker
Using :func:`os.execv` because forking and multiprocessing
leads to weird issues (it was a long time ago now, but it
could have something to do with the threading mutex bug)
"""
from __future__ import absolute_import
import celery
import os
import sys
from optparse import OptionParser, BadOptionError
from celery.platforms import EX_FAILURE, detached
from celery.utils.log import get_logger
from celery.bin.base import daemon_options, Option
__all__ = ['detached_celeryd', 'detach']
logger = get_logger(__name__)
C_FAKEFORK = os.environ.get('C_FAKEFORK')
OPTION_LIST = daemon_options(default_pidfile='celeryd.pid') + (
Option('--workdir', default=None, dest='working_directory'),
Option('--fake',
default=False, action='store_true', dest='fake',
help="Don't fork (for debugging purposes)"),
)
def detach(path, argv, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, working_directory=None, fake=False, app=None,
executable=None):
fake = 1 if C_FAKEFORK else fake
with detached(logfile, pidfile, uid, gid, umask, working_directory, fake,
after_forkers=False):
try:
if executable is not None:
path = executable
os.execv(path, [path] + argv)
except Exception:
if app is None:
from celery import current_app
app = current_app
app.log.setup_logging_subsystem('ERROR', logfile)
logger.critical("Can't exec %r", ' '.join([path] + argv),
exc_info=True)
return EX_FAILURE
class PartialOptionParser(OptionParser):
def __init__(self, *args, **kwargs):
self.leftovers = []
OptionParser.__init__(self, *args, **kwargs)
def _process_long_opt(self, rargs, values):
arg = rargs.pop(0)
if '=' in arg:
opt, next_arg = arg.split('=', 1)
rargs.insert(0, next_arg)
had_explicit_value = True
else:
opt = arg
had_explicit_value = False
try:
opt = self._match_long_opt(opt)
option = self._long_opt.get(opt)
except BadOptionError:
option = None
if option:
if option.takes_value():
nargs = option.nargs
if len(rargs) < nargs:
if nargs == 1:
self.error('{0} requires an argument'.format(opt))
else:
self.error('{0} requires {1} arguments'.format(
opt, nargs))
elif nargs == 1:
value = rargs.pop(0)
else:
value = tuple(rargs[0:nargs])
del rargs[0:nargs]
elif had_explicit_value:
self.error('{0} option does not take a value'.format(opt))
else:
value = None
option.process(opt, value, values, self)
else:
self.leftovers.append(arg)
def _process_short_opts(self, rargs, values):
arg = rargs[0]
try:
OptionParser._process_short_opts(self, rargs, values)
except BadOptionError:
self.leftovers.append(arg)
if rargs and not rargs[0][0] == '-':
self.leftovers.append(rargs.pop(0))
class detached_celeryd(object):
option_list = OPTION_LIST
usage = '%prog [options] [celeryd options]'
version = celery.VERSION_BANNER
description = ('Detaches Celery worker nodes. See `celery worker --help` '
'for the list of supported worker arguments.')
command = sys.executable
execv_path = sys.executable
if sys.version_info < (2, 7): # does not support pkg/__main__.py
execv_argv = ['-m', 'celery.__main__', 'worker']
else:
execv_argv = ['-m', 'celery', 'worker']
def __init__(self, app=None):
self.app = app
def Parser(self, prog_name):
return PartialOptionParser(prog=prog_name,
option_list=self.option_list,
usage=self.usage,
description=self.description,
version=self.version)
def parse_options(self, prog_name, argv):
parser = self.Parser(prog_name)
options, values = parser.parse_args(argv)
if options.logfile:
parser.leftovers.append('--logfile={0}'.format(options.logfile))
if options.pidfile:
parser.leftovers.append('--pidfile={0}'.format(options.pidfile))
return options, values, parser.leftovers
def execute_from_commandline(self, argv=None):
if argv is None:
argv = sys.argv
config = []
seen_cargs = 0
for arg in argv:
if seen_cargs:
config.append(arg)
else:
if arg == '--':
seen_cargs = 1
config.append(arg)
prog_name = os.path.basename(argv[0])
options, values, leftovers = self.parse_options(prog_name, argv[1:])
sys.exit(detach(
app=self.app, path=self.execv_path,
argv=self.execv_argv + leftovers + config,
**vars(options)
))
def main(app=None):
detached_celeryd(app).execute_from_commandline()
if __name__ == '__main__': # pragma: no cover
main()
|