This file is indexed.

/usr/lib/python2.7/dist-packages/celery/worker/state.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# -*- coding: utf-8 -*-
"""
    celery.worker.state
    ~~~~~~~~~~~~~~~~~~~

    Internal worker state (global)

    This includes the currently active and reserved tasks,
    statistics, and revoked tasks.

"""
from __future__ import absolute_import

import os
import sys
import platform
import shelve
import zlib

from kombu.serialization import pickle, pickle_protocol
from kombu.utils import cached_property

from celery import __version__
from celery.datastructures import LimitedSet
from celery.exceptions import WorkerShutdown, WorkerTerminate
from celery.five import Counter

__all__ = ['SOFTWARE_INFO', 'reserved_requests', 'active_requests',
           'total_count', 'revoked', 'task_reserved', 'maybe_shutdown',
           'task_accepted', 'task_ready', 'task_reserved', 'task_ready',
           'Persistent']

#: Worker software/platform information.
SOFTWARE_INFO = {'sw_ident': 'py-celery',
                 'sw_ver': __version__,
                 'sw_sys': platform.system()}

#: maximum number of revokes to keep in memory.
REVOKES_MAX = 50000

#: how many seconds a revoke will be active before
#: being expired when the max limit has been exceeded.
REVOKE_EXPIRES = 10800

#: set of all reserved :class:`~celery.worker.job.Request`'s.
reserved_requests = set()

#: set of currently active :class:`~celery.worker.job.Request`'s.
active_requests = set()

#: count of tasks accepted by the worker, sorted by type.
total_count = Counter()

#: count of all tasks accepted by the worker
all_total_count = [0]

#: the list of currently revoked tasks.  Persistent if statedb set.
revoked = LimitedSet(maxlen=REVOKES_MAX, expires=REVOKE_EXPIRES)

#: Update global state when a task has been reserved.
task_reserved = reserved_requests.add

should_stop = False
should_terminate = False


def maybe_shutdown():
    if should_stop:
        raise WorkerShutdown()
    elif should_terminate:
        raise WorkerTerminate()


def task_accepted(request, _all_total_count=all_total_count):
    """Updates global state when a task has been accepted."""
    active_requests.add(request)
    total_count[request.name] += 1
    all_total_count[0] += 1


def task_ready(request):
    """Updates global state when a task is ready."""
    active_requests.discard(request)
    reserved_requests.discard(request)


C_BENCH = os.environ.get('C_BENCH') or os.environ.get('CELERY_BENCH')
C_BENCH_EVERY = int(os.environ.get('C_BENCH_EVERY') or
                    os.environ.get('CELERY_BENCH_EVERY') or 1000)
if C_BENCH:  # pragma: no cover
    import atexit

    from billiard import current_process
    from celery.five import monotonic
    from celery.utils.debug import memdump, sample_mem

    all_count = 0
    bench_first = None
    bench_start = None
    bench_last = None
    bench_every = C_BENCH_EVERY
    bench_sample = []
    __reserved = task_reserved
    __ready = task_ready

    if current_process()._name == 'MainProcess':
        @atexit.register
        def on_shutdown():
            if bench_first is not None and bench_last is not None:
                print('- Time spent in benchmark: {0!r}'.format(
                      bench_last - bench_first))
                print('- Avg: {0}'.format(
                      sum(bench_sample) / len(bench_sample)))
                memdump()

    def task_reserved(request):  # noqa
        global bench_start
        global bench_first
        now = None
        if bench_start is None:
            bench_start = now = monotonic()
        if bench_first is None:
            bench_first = now

        return __reserved(request)

    def task_ready(request):  # noqa
        global all_count
        global bench_start
        global bench_last
        all_count += 1
        if not all_count % bench_every:
            now = monotonic()
            diff = now - bench_start
            print('- Time spent processing {0} tasks (since first '
                  'task received): ~{1:.4f}s\n'.format(bench_every, diff))
            sys.stdout.flush()
            bench_start = bench_last = now
            bench_sample.append(diff)
            sample_mem()
        return __ready(request)


class Persistent(object):
    """This is the persistent data stored by the worker when
    :option:`--statedb` is enabled.

    It currently only stores revoked task id's.

    """
    storage = shelve
    protocol = pickle_protocol
    compress = zlib.compress
    decompress = zlib.decompress
    _is_open = False

    def __init__(self, state, filename, clock=None):
        self.state = state
        self.filename = filename
        self.clock = clock
        self.merge()

    def open(self):
        return self.storage.open(
            self.filename, protocol=self.protocol, writeback=True,
        )

    def merge(self):
        self._merge_with(self.db)

    def sync(self):
        self._sync_with(self.db)
        self.db.sync()

    def close(self):
        if self._is_open:
            self.db.close()
            self._is_open = False

    def save(self):
        self.sync()
        self.close()

    def _merge_with(self, d):
        self._merge_revoked(d)
        self._merge_clock(d)
        return d

    def _sync_with(self, d):
        self._revoked_tasks.purge()
        d.update(
            __proto__=3,
            zrevoked=self.compress(self._dumps(self._revoked_tasks)),
            clock=self.clock.forward() if self.clock else 0,
        )
        return d

    def _merge_clock(self, d):
        if self.clock:
            d['clock'] = self.clock.adjust(d.get('clock') or 0)

    def _merge_revoked(self, d):
        try:
            self._merge_revoked_v3(d['zrevoked'])
        except KeyError:
            try:
                self._merge_revoked_v2(d.pop('revoked'))
            except KeyError:
                pass
        # purge expired items at boot
        self._revoked_tasks.purge()

    def _merge_revoked_v3(self, zrevoked):
        if zrevoked:
            self._revoked_tasks.update(pickle.loads(self.decompress(zrevoked)))

    def _merge_revoked_v2(self, saved):
        if not isinstance(saved, LimitedSet):
            # (pre 3.0.18) used to be stored as a dict
            return self._merge_revoked_v1(saved)
        self._revoked_tasks.update(saved)

    def _merge_revoked_v1(self, saved):
        add = self._revoked_tasks.add
        for item in saved:
            add(item)

    def _dumps(self, obj):
        return pickle.dumps(obj, protocol=self.protocol)

    @property
    def _revoked_tasks(self):
        return self.state.revoked

    @cached_property
    def db(self):
        self._is_open = True
        return self.open()