/usr/lib/python3/dist-packages/celery/bin/worker.py is in python3-celery 3.1.13-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 | # -*- coding: utf-8 -*-
"""
The :program:`celery worker` command (previously known as ``celeryd``)
.. program:: celery worker
.. seealso::
See :ref:`preload-options`.
.. cmdoption:: -c, --concurrency
Number of child processes processing the queue. The default
is the number of CPUs available on your system.
.. cmdoption:: -P, --pool
Pool implementation:
prefork (default), eventlet, gevent, solo or threads.
.. cmdoption:: -f, --logfile
Path to log file. If no logfile is specified, `stderr` is used.
.. cmdoption:: -l, --loglevel
Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
`ERROR`, `CRITICAL`, or `FATAL`.
.. cmdoption:: -n, --hostname
Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
%n (name) and %d, (domain).
.. cmdoption:: -B, --beat
Also run the `celery beat` periodic task scheduler. Please note that
there must only be one instance of this service.
.. cmdoption:: -Q, --queues
List of queues to enable for this worker, separated by comma.
By default all configured queues are enabled.
Example: `-Q video,image`
.. cmdoption:: -I, --include
Comma separated list of additional modules to import.
Example: -I foo.tasks,bar.tasks
.. cmdoption:: -s, --schedule
Path to the schedule database if running with the `-B` option.
Defaults to `celerybeat-schedule`. The extension ".db" may be
appended to the filename.
.. cmdoption:: -O
Apply optimization profile. Supported: default, fair
.. cmdoption:: --scheduler
Scheduler class to use. Default is celery.beat.PersistentScheduler
.. cmdoption:: -S, --statedb
Path to the state database. The extension '.db' may
be appended to the filename. Default: {default}
.. cmdoption:: -E, --events
Send events that can be captured by monitors like :program:`celery events`,
`celerymon`, and others.
.. cmdoption:: --without-gossip
Do not subscribe to other workers events.
.. cmdoption:: --without-mingle
Do not synchronize with other workers at startup.
.. cmdoption:: --without-heartbeat
Do not send event heartbeats.
.. cmdoption:: --heartbeat-interval
Interval in seconds at which to send worker heartbeat
.. cmdoption:: --purge
Purges all waiting tasks before the daemon is started.
**WARNING**: This is unrecoverable, and the tasks will be
deleted from the messaging server.
.. cmdoption:: --time-limit
Enables a hard time limit (in seconds int/float) for tasks.
.. cmdoption:: --soft-time-limit
Enables a soft time limit (in seconds int/float) for tasks.
.. cmdoption:: --maxtasksperchild
Maximum number of tasks a pool worker can execute before it's
terminated and replaced by a new worker.
.. cmdoption:: --pidfile
Optional file used to store the workers pid.
The worker will not start if this file already exists
and the pid is still alive.
.. cmdoption:: --autoscale
Enable autoscaling by providing
max_concurrency, min_concurrency. Example::
--autoscale=10,3
(always keep 3 processes, but grow to 10 if necessary)
.. cmdoption:: --autoreload
Enable autoreloading.
.. cmdoption:: --no-execv
Don't do execv after multiprocessing child fork.
"""
from __future__ import absolute_import, unicode_literals
import sys
from celery import concurrency
from celery.bin.base import Command, Option, daemon_options
from celery.bin.celeryd_detach import detached_celeryd
from celery.five import string_t
from celery.platforms import maybe_drop_privileges
from celery.utils import default_nodename
from celery.utils.log import LOG_LEVELS, mlevel
__all__ = ['worker', 'main']
__MODULE_DOC__ = __doc__
class worker(Command):
"""Start worker instance.
Examples::
celery worker --app=proj -l info
celery worker -A proj -l info -Q hipri,lopri
celery worker -A proj --concurrency=4
celery worker -A proj --concurrency=1000 -P eventlet
celery worker --autoscale=10,0
"""
doc = __MODULE_DOC__ # parse help from this too
namespace = 'celeryd'
enable_config_from_cmdline = True
supports_args = False
def run_from_argv(self, prog_name, argv=None, command=None):
command = sys.argv[0] if command is None else command
argv = sys.argv[1:] if argv is None else argv
# parse options before detaching so errors can be handled.
options, args = self.prepare_args(
*self.parse_options(prog_name, argv, command))
self.maybe_detach([command] + argv)
return self(*args, **options)
def maybe_detach(self, argv, dopts=['-D', '--detach']):
if any(arg in argv for arg in dopts):
argv = [v for v in argv if v not in dopts]
# will never return
detached_celeryd(self.app).execute_from_commandline(argv)
raise SystemExit(0)
def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
loglevel=None, logfile=None, pidfile=None, state_db=None,
**kwargs):
maybe_drop_privileges(uid=uid, gid=gid)
# Pools like eventlet/gevent needs to patch libs as early
# as possible.
pool_cls = (concurrency.get_implementation(pool_cls) or
self.app.conf.CELERYD_POOL)
if self.app.IS_WINDOWS and kwargs.get('beat'):
self.die('-B option does not work on Windows. '
'Please run celery beat as a separate service.')
hostname = self.host_format(default_nodename(hostname))
if loglevel:
try:
loglevel = mlevel(loglevel)
except KeyError: # pragma: no cover
self.die('Unknown level {0!r}. Please use one of {1}.'.format(
loglevel, '|'.join(
l for l in LOG_LEVELS if isinstance(l, string_t))))
return self.app.Worker(
hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
logfile=logfile, # node format handled by celery.app.log.setup
pidfile=self.node_format(pidfile, hostname),
state_db=self.node_format(state_db, hostname), **kwargs
).start()
def with_pool_option(self, argv):
# this command support custom pools
# that may have to be loaded as early as possible.
return (['-P'], ['--pool'])
def get_options(self):
conf = self.app.conf
return (
Option('-c', '--concurrency',
default=conf.CELERYD_CONCURRENCY, type='int'),
Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'),
Option('--purge', '--discard', default=False, action='store_true'),
Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
Option('-n', '--hostname'),
Option('-B', '--beat', action='store_true'),
Option('-s', '--schedule', dest='schedule_filename',
default=conf.CELERYBEAT_SCHEDULE_FILENAME),
Option('--scheduler', dest='scheduler_cls'),
Option('-S', '--statedb',
default=conf.CELERYD_STATE_DB, dest='state_db'),
Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
action='store_true', dest='send_events'),
Option('--time-limit', type='float', dest='task_time_limit',
default=conf.CELERYD_TASK_TIME_LIMIT),
Option('--soft-time-limit', dest='task_soft_time_limit',
default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
Option('--maxtasksperchild', dest='max_tasks_per_child',
default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
Option('--queues', '-Q', default=[]),
Option('--exclude-queues', '-X', default=[]),
Option('--include', '-I', default=[]),
Option('--autoscale'),
Option('--autoreload', action='store_true'),
Option('--no-execv', action='store_true', default=False),
Option('--without-gossip', action='store_true', default=False),
Option('--without-mingle', action='store_true', default=False),
Option('--without-heartbeat', action='store_true', default=False),
Option('--heartbeat-interval', type='int'),
Option('-O', dest='optimization'),
Option('-D', '--detach', action='store_true'),
) + daemon_options() + tuple(self.app.user_options['worker'])
def main(app=None):
# Fix for setuptools generated scripts, so that it will
# work with multiprocessing fork emulation.
# (see multiprocessing.forking.get_preparation_data())
if __name__ != '__main__': # pragma: no cover
sys.modules['__main__'] = sys.modules[__name__]
from billiard import freeze_support
freeze_support()
worker(app=app).execute_from_commandline()
if __name__ == '__main__': # pragma: no cover
main()
|