/usr/lib/python3/dist-packages/celery/bin/worker.py is in python3-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 | # -*- coding: utf-8 -*-
"""Program used to start a Celery worker instance.
The :program:`celery worker` command (previously known as ``celeryd``)
.. program:: celery worker
.. seealso::
See :ref:`preload-options`.
.. cmdoption:: -c, --concurrency
Number of child processes processing the queue. The default
is the number of CPUs available on your system.
.. cmdoption:: -P, --pool
Pool implementation:
prefork (default), eventlet, gevent or solo.
.. cmdoption:: -n, --hostname
Set custom hostname (e.g., 'w1@%%h'). Expands: %%h (hostname),
%%n (name) and %%d, (domain).
.. cmdoption:: -B, --beat
Also run the `celery beat` periodic task scheduler. Please note that
there must only be one instance of this service.
.. note::
``-B`` is meant to be used for development purposes. For production
environment, you need to start :program:`celery beat` separately.
.. cmdoption:: -Q, --queues
List of queues to enable for this worker, separated by comma.
By default all configured queues are enabled.
Example: `-Q video,image`
.. cmdoption:: -X, --exclude-queues
List of queues to disable for this worker, separated by comma.
By default all configured queues are enabled.
Example: `-X video,image`.
.. cmdoption:: -I, --include
Comma separated list of additional modules to import.
Example: -I foo.tasks,bar.tasks
.. cmdoption:: -s, --schedule
Path to the schedule database if running with the `-B` option.
Defaults to `celerybeat-schedule`. The extension ".db" may be
appended to the filename.
.. cmdoption:: -O
Apply optimization profile. Supported: default, fair
.. cmdoption:: --prefetch-multiplier
Set custom prefetch multiplier value for this worker instance.
.. cmdoption:: --scheduler
Scheduler class to use. Default is
:class:`celery.beat.PersistentScheduler`
.. cmdoption:: -S, --statedb
Path to the state database. The extension '.db' may
be appended to the filename. Default: {default}
.. cmdoption:: -E, --task-events
Send task-related events that can be captured by monitors like
:program:`celery events`, `celerymon`, and others.
.. cmdoption:: --without-gossip
Don't subscribe to other workers events.
.. cmdoption:: --without-mingle
Don't synchronize with other workers at start-up.
.. cmdoption:: --without-heartbeat
Don't send event heartbeats.
.. cmdoption:: --heartbeat-interval
Interval in seconds at which to send worker heartbeat
.. cmdoption:: --purge
Purges all waiting tasks before the daemon is started.
**WARNING**: This is unrecoverable, and the tasks will be
deleted from the messaging server.
.. cmdoption:: --time-limit
Enables a hard time limit (in seconds int/float) for tasks.
.. cmdoption:: --soft-time-limit
Enables a soft time limit (in seconds int/float) for tasks.
.. cmdoption:: --max-tasks-per-child
Maximum number of tasks a pool worker can execute before it's
terminated and replaced by a new worker.
.. cmdoption:: --max-memory-per-child
Maximum amount of resident memory, in KiB, that may be consumed by a
child process before it will be replaced by a new one. If a single
task causes a child process to exceed this limit, the task will be
completed and the child process will be replaced afterwards.
Default: no limit.
.. cmdoption:: --autoscale
Enable autoscaling by providing
max_concurrency, min_concurrency. Example::
--autoscale=10,3
(always keep 3 processes, but grow to 10 if necessary)
.. cmdoption:: --detach
Start worker as a background process.
.. cmdoption:: -f, --logfile
Path to log file. If no logfile is specified, `stderr` is used.
.. cmdoption:: -l, --loglevel
Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
`ERROR`, `CRITICAL`, or `FATAL`.
.. cmdoption:: --pidfile
Optional file used to store the process pid.
The program won't start if this file already exists
and the pid is still alive.
.. cmdoption:: --uid
User id, or user name of the user to run as after detaching.
.. cmdoption:: --gid
Group id, or group name of the main group to change to after
detaching.
.. cmdoption:: --umask
Effective :manpage:`umask(1)` (in octal) of the process after detaching.
Inherits the :manpage:`umask(1)` of the parent process by default.
.. cmdoption:: --workdir
Optional directory to change to after detaching.
.. cmdoption:: --executable
Executable to use for the detached process.
"""
from __future__ import absolute_import, unicode_literals
import sys
from celery import concurrency
from celery.bin.base import Command, daemon_options
from celery.bin.celeryd_detach import detached_celeryd
from celery.five import string_t
from celery.platforms import maybe_drop_privileges
from celery.utils.log import LOG_LEVELS, mlevel
from celery.utils.nodenames import default_nodename
__all__ = ['worker', 'main']
HELP = __doc__
class worker(Command):
"""Start worker instance.
Examples:
.. code-block:: console
$ celery worker --app=proj -l info
$ celery worker -A proj -l info -Q hipri,lopri
$ celery worker -A proj --concurrency=4
$ celery worker -A proj --concurrency=1000 -P eventlet
$ celery worker --autoscale=10,0
"""
doc = HELP # parse help from this too
namespace = 'worker'
enable_config_from_cmdline = True
supports_args = False
removed_flags = {'--no-execv', '--force-execv'}
def run_from_argv(self, prog_name, argv=None, command=None):
argv = [x for x in argv if x not in self.removed_flags]
command = sys.argv[0] if command is None else command
argv = sys.argv[1:] if argv is None else argv
# parse options before detaching so errors can be handled.
options, args = self.prepare_args(
*self.parse_options(prog_name, argv, command))
self.maybe_detach([command] + argv)
return self(*args, **options)
def maybe_detach(self, argv, dopts=['-D', '--detach']):
if any(arg in argv for arg in dopts):
argv = [v for v in argv if v not in dopts]
# will never return
detached_celeryd(self.app).execute_from_commandline(argv)
raise SystemExit(0)
def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
loglevel=None, logfile=None, pidfile=None, statedb=None,
**kwargs):
maybe_drop_privileges(uid=uid, gid=gid)
# Pools like eventlet/gevent needs to patch libs as early
# as possible.
pool_cls = (concurrency.get_implementation(pool_cls) or
self.app.conf.worker_pool)
if self.app.IS_WINDOWS and kwargs.get('beat'):
self.die('-B option does not work on Windows. '
'Please run celery beat as a separate service.')
hostname = self.host_format(default_nodename(hostname))
if loglevel:
try:
loglevel = mlevel(loglevel)
except KeyError: # pragma: no cover
self.die('Unknown level {0!r}. Please use one of {1}.'.format(
loglevel, '|'.join(
l for l in LOG_LEVELS if isinstance(l, string_t))))
worker = self.app.Worker(
hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
logfile=logfile, # node format handled by celery.app.log.setup
pidfile=self.node_format(pidfile, hostname),
statedb=self.node_format(statedb, hostname),
**kwargs)
worker.start()
return worker.exitcode
def with_pool_option(self, argv):
# this command support custom pools
# that may have to be loaded as early as possible.
return (['-P'], ['--pool'])
def add_arguments(self, parser):
conf = self.app.conf
wopts = parser.add_argument_group('Worker Options')
wopts.add_argument('-n', '--hostname')
wopts.add_argument(
'-D', '--detach',
action='store_true', default=False,
)
wopts.add_argument(
'-S', '--statedb',
default=conf.worker_state_db,
)
wopts.add_argument('-l', '--loglevel', default='WARN')
wopts.add_argument('-O', dest='optimization')
wopts.add_argument(
'--prefetch-multiplier',
type=int, default=conf.worker_prefetch_multiplier,
)
topts = parser.add_argument_group('Pool Options')
topts.add_argument(
'-c', '--concurrency',
default=conf.worker_concurrency, type=int,
)
topts.add_argument(
'-P', '--pool',
default=conf.worker_pool,
)
topts.add_argument(
'-E', '--task-events', '--events',
action='store_true', default=conf.worker_send_task_events,
)
topts.add_argument(
'--time-limit',
type=float, default=conf.task_time_limit,
)
topts.add_argument(
'--soft-time-limit',
type=float, default=conf.task_soft_time_limit,
)
topts.add_argument(
'--max-tasks-per-child', '--maxtasksperchild',
type=int, default=conf.worker_max_tasks_per_child,
)
topts.add_argument(
'--max-memory-per-child', '--maxmemperchild',
type=int, default=conf.worker_max_memory_per_child,
)
qopts = parser.add_argument_group('Queue Options')
qopts.add_argument(
'--purge', '--discard',
action='store_true', default=False,
)
qopts.add_argument('--queues', '-Q', default=[])
qopts.add_argument('--exclude-queues', '-X', default=[])
qopts.add_argument('--include', '-I', default=[])
fopts = parser.add_argument_group('Features')
fopts.add_argument(
'--without-gossip', action='store_true', default=False,
)
fopts.add_argument(
'--without-mingle', action='store_true', default=False,
)
fopts.add_argument(
'--without-heartbeat', action='store_true', default=False,
)
fopts.add_argument('--heartbeat-interval', type=int)
fopts.add_argument('--autoscale')
daemon_options(parser)
bopts = parser.add_argument_group('Embedded Beat Options')
bopts.add_argument('-B', '--beat', action='store_true', default=False)
bopts.add_argument(
'-s', '--schedule-filename', '--schedule',
default=conf.beat_schedule_filename,
)
bopts.add_argument('--scheduler')
user_options = self.app.user_options['worker']
if user_options:
uopts = parser.add_argument_group('User Options')
self.add_compat_options(uopts, user_options)
def main(app=None):
"""Start worker."""
# Fix for setuptools generated scripts, so that it will
# work with multiprocessing fork emulation.
# (see multiprocessing.forking.get_preparation_data())
if __name__ != '__main__': # pragma: no cover
sys.modules['__main__'] = sys.modules[__name__]
from billiard import freeze_support
freeze_support()
worker(app=app).execute_from_commandline()
if __name__ == '__main__': # pragma: no cover
main()
|