This file is indexed.

/usr/lib/python2.7/dist-packages/celery/bin/worker.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# -*- coding: utf-8 -*-
"""

The :program:`celery worker` command (previously known as ``celeryd``)

.. program:: celery worker

.. seealso::

    See :ref:`preload-options`.

.. cmdoption:: -c, --concurrency

    Number of child processes processing the queue. The default
    is the number of CPUs available on your system.

.. cmdoption:: -P, --pool

    Pool implementation:

    prefork (default), eventlet, gevent, solo or threads.

.. cmdoption:: -f, --logfile

    Path to log file. If no logfile is specified, `stderr` is used.

.. cmdoption:: -l, --loglevel

    Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
    `ERROR`, `CRITICAL`, or `FATAL`.

.. cmdoption:: -n, --hostname

    Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
    %n (name) and %d, (domain).

.. cmdoption:: -B, --beat

    Also run the `celery beat` periodic task scheduler. Please note that
    there must only be one instance of this service.

.. cmdoption:: -Q, --queues

    List of queues to enable for this worker, separated by comma.
    By default all configured queues are enabled.
    Example: `-Q video,image`

.. cmdoption:: -I, --include

    Comma separated list of additional modules to import.
    Example: -I foo.tasks,bar.tasks

.. cmdoption:: -s, --schedule

    Path to the schedule database if running with the `-B` option.
    Defaults to `celerybeat-schedule`. The extension ".db" may be
    appended to the filename.

.. cmdoption:: -O

    Apply optimization profile.  Supported: default, fair

.. cmdoption:: --scheduler

    Scheduler class to use. Default is celery.beat.PersistentScheduler

.. cmdoption:: -S, --statedb

    Path to the state database. The extension '.db' may
    be appended to the filename. Default: {default}

.. cmdoption:: -E, --events

    Send events that can be captured by monitors like :program:`celery events`,
    `celerymon`, and others.

.. cmdoption:: --without-gossip

    Do not subscribe to other workers events.

.. cmdoption:: --without-mingle

    Do not synchronize with other workers at startup.

.. cmdoption:: --without-heartbeat

    Do not send event heartbeats.

.. cmdoption:: --heartbeat-interval

    Interval in seconds at which to send worker heartbeat

.. cmdoption:: --purge

    Purges all waiting tasks before the daemon is started.
    **WARNING**: This is unrecoverable, and the tasks will be
    deleted from the messaging server.

.. cmdoption:: --time-limit

    Enables a hard time limit (in seconds int/float) for tasks.

.. cmdoption:: --soft-time-limit

    Enables a soft time limit (in seconds int/float) for tasks.

.. cmdoption:: --maxtasksperchild

    Maximum number of tasks a pool worker can execute before it's
    terminated and replaced by a new worker.

.. cmdoption:: --pidfile

    Optional file used to store the workers pid.

    The worker will not start if this file already exists
    and the pid is still alive.

.. cmdoption:: --autoscale

    Enable autoscaling by providing
    max_concurrency, min_concurrency. Example::

        --autoscale=10,3

    (always keep 3 processes, but grow to 10 if necessary)

.. cmdoption:: --autoreload

    Enable autoreloading.

.. cmdoption:: --no-execv

    Don't do execv after multiprocessing child fork.

"""
from __future__ import absolute_import, unicode_literals

import sys

from celery import concurrency
from celery.bin.base import Command, Option, daemon_options
from celery.bin.celeryd_detach import detached_celeryd
from celery.five import string_t
from celery.platforms import maybe_drop_privileges
from celery.utils import default_nodename
from celery.utils.log import LOG_LEVELS, mlevel

__all__ = ['worker', 'main']

__MODULE_DOC__ = __doc__


class worker(Command):
    """Start worker instance.

    Examples::

        celery worker --app=proj -l info
        celery worker -A proj -l info -Q hipri,lopri

        celery worker -A proj --concurrency=4
        celery worker -A proj --concurrency=1000 -P eventlet

        celery worker --autoscale=10,0
    """
    doc = __MODULE_DOC__  # parse help from this too
    namespace = 'celeryd'
    enable_config_from_cmdline = True
    supports_args = False

    def run_from_argv(self, prog_name, argv=None, command=None):
        command = sys.argv[0] if command is None else command
        argv = sys.argv[1:] if argv is None else argv
        # parse options before detaching so errors can be handled.
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        self.maybe_detach([command] + argv)
        return self(*args, **options)

    def maybe_detach(self, argv, dopts=['-D', '--detach']):
        if any(arg in argv for arg in dopts):
            argv = [v for v in argv if v not in dopts]
            # will never return
            detached_celeryd(self.app).execute_from_commandline(argv)
            raise SystemExit(0)

    def run(self, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
            loglevel=None, logfile=None, pidfile=None, state_db=None,
            **kwargs):
        maybe_drop_privileges(uid=uid, gid=gid)
        # Pools like eventlet/gevent needs to patch libs as early
        # as possible.
        pool_cls = (concurrency.get_implementation(pool_cls) or
                    self.app.conf.CELERYD_POOL)
        if self.app.IS_WINDOWS and kwargs.get('beat'):
            self.die('-B option does not work on Windows.  '
                     'Please run celery beat as a separate service.')
        hostname = self.host_format(default_nodename(hostname))
        if loglevel:
            try:
                loglevel = mlevel(loglevel)
            except KeyError:  # pragma: no cover
                self.die('Unknown level {0!r}. Please use one of {1}.'.format(
                    loglevel, '|'.join(
                        l for l in LOG_LEVELS if isinstance(l, string_t))))

        return self.app.Worker(
            hostname=hostname, pool_cls=pool_cls, loglevel=loglevel,
            logfile=logfile,  # node format handled by celery.app.log.setup
            pidfile=self.node_format(pidfile, hostname),
            state_db=self.node_format(state_db, hostname), **kwargs
        ).start()

    def with_pool_option(self, argv):
        # this command support custom pools
        # that may have to be loaded as early as possible.
        return (['-P'], ['--pool'])

    def get_options(self):
        conf = self.app.conf
        return (
            Option('-c', '--concurrency',
                   default=conf.CELERYD_CONCURRENCY, type='int'),
            Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'),
            Option('--purge', '--discard', default=False, action='store_true'),
            Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
            Option('-n', '--hostname'),
            Option('-B', '--beat', action='store_true'),
            Option('-s', '--schedule', dest='schedule_filename',
                   default=conf.CELERYBEAT_SCHEDULE_FILENAME),
            Option('--scheduler', dest='scheduler_cls'),
            Option('-S', '--statedb',
                   default=conf.CELERYD_STATE_DB, dest='state_db'),
            Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
                   action='store_true', dest='send_events'),
            Option('--time-limit', type='float', dest='task_time_limit',
                   default=conf.CELERYD_TASK_TIME_LIMIT),
            Option('--soft-time-limit', dest='task_soft_time_limit',
                   default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
            Option('--maxtasksperchild', dest='max_tasks_per_child',
                   default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
            Option('--queues', '-Q', default=[]),
            Option('--exclude-queues', '-X', default=[]),
            Option('--include', '-I', default=[]),
            Option('--autoscale'),
            Option('--autoreload', action='store_true'),
            Option('--no-execv', action='store_true', default=False),
            Option('--without-gossip', action='store_true', default=False),
            Option('--without-mingle', action='store_true', default=False),
            Option('--without-heartbeat', action='store_true', default=False),
            Option('--heartbeat-interval', type='int'),
            Option('-O', dest='optimization'),
            Option('-D', '--detach', action='store_true'),
        ) + daemon_options() + tuple(self.app.user_options['worker'])


def main(app=None):
    # Fix for setuptools generated scripts, so that it will
    # work with multiprocessing fork emulation.
    # (see multiprocessing.forking.get_preparation_data())
    if __name__ != '__main__':  # pragma: no cover
        sys.modules['__main__'] = sys.modules[__name__]
    from billiard import freeze_support
    freeze_support()
    worker(app=app).execute_from_commandline()


if __name__ == '__main__':          # pragma: no cover
    main()