This file is indexed.

/usr/lib/python3/dist-packages/celery/bin/worker.py is in python3-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# -*- coding: utf-8 -*-
"""Program used to start a Celery worker instance.

The :program:`celery worker` command (previously known as ``celeryd``)

.. program:: celery worker

.. seealso::

    See :ref:`preload-options`.

.. cmdoption:: -c, --concurrency

    Number of child processes processing the queue.  The default
    is the number of CPUs available on your system.

.. cmdoption:: -P, --pool

    Pool implementation:

    prefork (default), eventlet, gevent or solo.

.. cmdoption:: -n, --hostname

    Set custom hostname (e.g., 'w1@%%h').  Expands: %%h (hostname),
    %%n (name) and %%d, (domain).

.. cmdoption:: -B, --beat

    Also run the `celery beat` periodic task scheduler.  Please note that
    there must only be one instance of this service.

    .. note::

        ``-B`` is meant to be used for development purposes. For production
        environment, you need to start :program:`celery beat` separately.

.. cmdoption:: -Q, --queues

    List of queues to enable for this worker, separated by comma.
    By default all configured queues are enabled.
    Example: `-Q video,image`

.. cmdoption:: -X, --exclude-queues

    List of queues to disable for this worker, separated by comma.
    By default all configured queues are enabled.
    Example: `-X video,image`.

.. cmdoption:: -I, --include

    Comma separated list of additional modules to import.
    Example: -I foo.tasks,bar.tasks

.. cmdoption:: -s, --schedule

    Path to the schedule database if running with the `-B` option.
    Defaults to `celerybeat-schedule`.  The extension ".db" may be
    appended to the filename.

.. cmdoption:: -O

    Apply optimization profile.  Supported: default, fair

.. cmdoption:: --prefetch-multiplier

    Set custom prefetch multiplier value for this worker instance.

.. cmdoption:: --scheduler

    Scheduler class to use.  Default is
    :class:`celery.beat.PersistentScheduler`

.. cmdoption:: -S, --statedb

    Path to the state database.  The extension '.db' may
    be appended to the filename.  Default: {default}

.. cmdoption:: -E, --task-events

    Send task-related events that can be captured by monitors like
    :program:`celery events`, `celerymon`, and others.

.. cmdoption:: --without-gossip

    Don't subscribe to other workers events.

.. cmdoption:: --without-mingle

    Don't synchronize with other workers at start-up.

.. cmdoption:: --without-heartbeat

    Don't send event heartbeats.

.. cmdoption:: --heartbeat-interval

    Interval in seconds at which to send worker heartbeat

.. cmdoption:: --purge

    Purges all waiting tasks before the daemon is started.
    **WARNING**: This is unrecoverable, and the tasks will be
    deleted from the messaging server.

.. cmdoption:: --time-limit

    Enables a hard time limit (in seconds int/float) for tasks.

.. cmdoption:: --soft-time-limit

    Enables a soft time limit (in seconds int/float) for tasks.

.. cmdoption:: --max-tasks-per-child

    Maximum number of tasks a pool worker can execute before it's
    terminated and replaced by a new worker.

.. cmdoption:: --max-memory-per-child

    Maximum amount of resident memory, in KiB, that may be consumed by a
    child process before it will be replaced by a new one.  If a single
    task causes a child process to exceed this limit, the task will be
    completed and the child process will be replaced afterwards.
    Default: no limit.

.. cmdoption:: --autoscale

    Enable autoscaling by providing
    max_concurrency, min_concurrency. Example::

        --autoscale=10,3

    (always keep 3 processes, but grow to 10 if necessary)

.. cmdoption:: --detach

    Start worker as a background process.

.. cmdoption:: -f, --logfile

    Path to log file.  If no logfile is specified, `stderr` is used.

.. cmdoption:: -l, --loglevel

    Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
    `ERROR`, `CRITICAL`, or `FATAL`.

.. cmdoption:: --pidfile

    Optional file used to store the process pid.

    The program won't start if this file already exists
    and the pid is still alive.

.. cmdoption:: --uid

    User id, or user name of the user to run as after detaching.

.. cmdoption:: --gid

    Group id, or group name of the main group to change to after
    detaching.

.. cmdoption:: --umask

    Effective :manpage:`umask(1)` (in octal) of the process after detaching.
    Inherits the :manpage:`umask(1)` of the parent process by default.

.. cmdoption:: --workdir

    Optional directory to change to after detaching.

.. cmdoption:: --executable

    Executable to use for the detached process.
"""
from __future__ import absolute_import, unicode_literals
import sys
from celery import concurrency
from celery.bin.base import Command, daemon_options
from celery.bin.celeryd_detach import detached_celeryd
from celery.five import string_t
from celery.platforms import maybe_drop_privileges
from celery.utils.log import LOG_LEVELS, mlevel
from celery.utils.nodenames import default_nodename

__all__ = ['worker', 'main']

HELP = __doc__


class worker(Command):
    """Start worker instance.

    Examples:
        .. code-block:: console

            $ celery worker --app=proj -l info
            $ celery worker -A proj -l info -Q hipri,lopri

            $ celery worker -A proj --concurrency=4
            $ celery worker -A proj --concurrency=1000 -P eventlet
            $ celery worker --autoscale=10,0
    """

    doc = HELP  # parse help from this too
    namespace = 'worker'
    enable_config_from_cmdline = True
    supports_args = False
    removed_flags = {'--no-execv', '--force-execv'}

    def run_from_argv(self, prog_name, argv=None, command=None):
        argv = [x for x in argv if x not in self.removed_flags]
        command = sys.argv[0] if command is None else command
        argv = sys.argv[1:] if argv is None else argv
        # parse options before detaching so errors can be handled.
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        self.maybe_detach([command] + argv)
        return self(*args, **options)

    def maybe_detach(self, argv, dopts=['-D', '--detach']):
        if any(arg in argv for arg in dopts):
            argv = [v for v in argv if v not in dopts]
            # will never return
            detached_celeryd(self.app).execute_from_commandline(argv)
            raise SystemExit(0)

    def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
            loglevel=None, logfile=None, pidfile=None, statedb=None,
            **kwargs):
        maybe_drop_privileges(uid=uid, gid=gid)
        # Pools like eventlet/gevent needs to patch libs as early
        # as possible.
        pool_cls = (concurrency.get_implementation(pool_cls) or
                    self.app.conf.worker_pool)
        if self.app.IS_WINDOWS and kwargs.get('beat'):
            self.die('-B option does not work on Windows.  '
                     'Please run celery beat as a separate service.')
        hostname = self.host_format(default_nodename(hostname))
        if loglevel:
            try:
                loglevel = mlevel(loglevel)
            except KeyError:  # pragma: no cover
                self.die('Unknown level {0!r}.  Please use one of {1}.'.format(
                    loglevel, '|'.join(
                        l for l in LOG_LEVELS if isinstance(l, string_t))))

        worker = self.app.Worker(
            hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
            logfile=logfile,  # node format handled by celery.app.log.setup
            pidfile=self.node_format(pidfile, hostname),
            statedb=self.node_format(statedb, hostname),
            **kwargs)
        worker.start()
        return worker.exitcode

    def with_pool_option(self, argv):
        # this command support custom pools
        # that may have to be loaded as early as possible.
        return (['-P'], ['--pool'])

    def add_arguments(self, parser):
        conf = self.app.conf

        wopts = parser.add_argument_group('Worker Options')
        wopts.add_argument('-n', '--hostname')
        wopts.add_argument(
            '-D', '--detach',
            action='store_true', default=False,
        )
        wopts.add_argument(
            '-S', '--statedb',
            default=conf.worker_state_db,
        )
        wopts.add_argument('-l', '--loglevel', default='WARN')
        wopts.add_argument('-O', dest='optimization')
        wopts.add_argument(
            '--prefetch-multiplier',
            type=int, default=conf.worker_prefetch_multiplier,
        )

        topts = parser.add_argument_group('Pool Options')
        topts.add_argument(
            '-c', '--concurrency',
            default=conf.worker_concurrency, type=int,
        )
        topts.add_argument(
            '-P', '--pool',
            default=conf.worker_pool,
        )
        topts.add_argument(
            '-E', '--task-events', '--events',
            action='store_true', default=conf.worker_send_task_events,
        )
        topts.add_argument(
            '--time-limit',
            type=float, default=conf.task_time_limit,
        )
        topts.add_argument(
            '--soft-time-limit',
            type=float, default=conf.task_soft_time_limit,
        )
        topts.add_argument(
            '--max-tasks-per-child', '--maxtasksperchild',
            type=int, default=conf.worker_max_tasks_per_child,
        )
        topts.add_argument(
            '--max-memory-per-child', '--maxmemperchild',
            type=int, default=conf.worker_max_memory_per_child,
        )

        qopts = parser.add_argument_group('Queue Options')
        qopts.add_argument(
            '--purge', '--discard',
            action='store_true', default=False,
        )
        qopts.add_argument('--queues', '-Q', default=[])
        qopts.add_argument('--exclude-queues', '-X', default=[])
        qopts.add_argument('--include', '-I', default=[])

        fopts = parser.add_argument_group('Features')
        fopts.add_argument(
            '--without-gossip', action='store_true', default=False,
        )
        fopts.add_argument(
            '--without-mingle', action='store_true', default=False,
        )
        fopts.add_argument(
            '--without-heartbeat', action='store_true', default=False,
        )
        fopts.add_argument('--heartbeat-interval', type=int)
        fopts.add_argument('--autoscale')

        daemon_options(parser)

        bopts = parser.add_argument_group('Embedded Beat Options')
        bopts.add_argument('-B', '--beat', action='store_true', default=False)
        bopts.add_argument(
            '-s', '--schedule-filename', '--schedule',
            default=conf.beat_schedule_filename,
        )
        bopts.add_argument('--scheduler')

        user_options = self.app.user_options['worker']
        if user_options:
            uopts = parser.add_argument_group('User Options')
            self.add_compat_options(uopts, user_options)


def main(app=None):
    """Start worker."""
    # Fix for setuptools generated scripts, so that it will
    # work with multiprocessing fork emulation.
    # (see multiprocessing.forking.get_preparation_data())
    if __name__ != '__main__':  # pragma: no cover
        sys.modules['__main__'] = sys.modules[__name__]
    from billiard import freeze_support
    freeze_support()
    worker(app=app).execute_from_commandline()


if __name__ == '__main__':          # pragma: no cover
    main()