This file is indexed.

/usr/lib/python2.7/dist-packages/celery/_state.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: utf-8 -*-
"""
    celery._state
    ~~~~~~~~~~~~~~~

    This is an internal module containing thread state
    like the ``current_app``, and ``current_task``.

    This module shouldn't be used directly.

"""
from __future__ import absolute_import, print_function

import os
import sys
import threading
import weakref

from celery.local import Proxy
from celery.utils.threads import LocalStack

try:
    from weakref import WeakSet as AppSet
except ImportError:  # XXX Py2.6

    class AppSet(object):  # noqa

        def __init__(self):
            self._refs = set()

        def add(self, app):
            self._refs.add(weakref.ref(app))

        def __iter__(self):
            dirty = []
            try:
                for appref in self._refs:
                    app = appref()
                    if app is None:
                        dirty.append(appref)
                    else:
                        yield app
            finally:
                while dirty:
                    self._refs.discard(dirty.pop())

__all__ = ['set_default_app', 'get_current_app', 'get_current_task',
           'get_current_worker_task', 'current_app', 'current_task',
           'connect_on_app_finalize']

#: Global default app used when no current app.
default_app = None

#: List of all app instances (weakrefs), must not be used directly.
_apps = AppSet()

#: global set of functions to call whenever a new app is finalized
#: E.g. Shared tasks, and builtin tasks are created
#: by adding callbacks here.
_on_app_finalizers = set()

_task_join_will_block = False


def connect_on_app_finalize(callback):
    _on_app_finalizers.add(callback)
    return callback


def _announce_app_finalized(app):
    callbacks = set(_on_app_finalizers)
    for callback in callbacks:
        callback(app)


def _set_task_join_will_block(blocks):
    global _task_join_will_block
    _task_join_will_block = blocks


def task_join_will_block():
    return _task_join_will_block


class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None
_tls = _TLS()

_task_stack = LocalStack()


def set_default_app(app):
    global default_app
    default_app = app


def _get_current_app():
    if default_app is None:
        #: creates the global fallback app instance.
        from celery.app import Celery
        set_default_app(Celery(
            'default',
            loader=os.environ.get('CELERY_LOADER') or 'default',
            fixups=[],
            set_as_current=False, accept_magic_kwargs=True,
        ))
    return _tls.current_app or default_app


def _set_current_app(app):
    _tls.current_app = app


C_STRICT_APP = os.environ.get('C_STRICT_APP')
if os.environ.get('C_STRICT_APP'):  # pragma: no cover
    def get_current_app():
        raise Exception('USES CURRENT APP')
        import traceback
        print('-- USES CURRENT_APP', file=sys.stderr)  # noqa+
        traceback.print_stack(file=sys.stderr)
        return _get_current_app()
else:
    get_current_app = _get_current_app


def get_current_task():
    """Currently executing task."""
    return _task_stack.top


def get_current_worker_task():
    """Currently executing task, that was applied by the worker.

    This is used to differentiate between the actual task
    executed by the worker and any task that was called within
    a task (using ``task.__call__`` or ``task.apply``)

    """
    for task in reversed(_task_stack.stack):
        if not task.request.called_directly:
            return task


#: Proxy to current app.
current_app = Proxy(get_current_app)

#: Proxy to current task.
current_task = Proxy(get_current_task)


def _register_app(app):
    _apps.add(app)


def _get_active_apps():
    return _apps