This file is indexed.

/usr/lib/python2.7/dist-packages/celery/task/base.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# -*- coding: utf-8 -*-
"""
    celery.task.base
    ~~~~~~~~~~~~~~~~

    The task implementation has been moved to :mod:`celery.app.task`.

    This contains the backward compatible Task class used in the old API,
    and shouldn't be used in new applications.

"""
from __future__ import absolute_import

from kombu import Exchange

from celery import current_app
from celery.app.task import Context, TaskType, Task as BaseTask  # noqa
from celery.five import class_property, reclassmethod
from celery.schedules import maybe_schedule
from celery.utils.log import get_task_logger

__all__ = ['Task', 'PeriodicTask', 'task']

#: list of methods that must be classmethods in the old API.
_COMPAT_CLASSMETHODS = (
    'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
    'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
)


class Task(BaseTask):
    """Deprecated Task base class.

    Modern applications should use :class:`celery.Task` instead.

    """
    abstract = True
    __bound__ = False
    __v2_compat__ = True

    # - Deprecated compat. attributes -:

    queue = None
    routing_key = None
    exchange = None
    exchange_type = None
    delivery_mode = None
    mandatory = False  # XXX deprecated
    immediate = False  # XXX deprecated
    priority = None
    type = 'regular'
    disable_error_emails = False
    accept_magic_kwargs = False

    from_config = BaseTask.from_config + (
        ('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
        ('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
    )

    # In old Celery the @task decorator didn't exist, so one would create
    # classes instead and use them directly (e.g. MyTask.apply_async()).
    # the use of classmethods was a hack so that it was not necessary
    # to instantiate the class before using it, but it has only
    # given us pain (like all magic).
    for name in _COMPAT_CLASSMETHODS:
        locals()[name] = reclassmethod(getattr(BaseTask, name))

    @class_property
    def request(cls):
        return cls._get_request()

    @class_property
    def backend(cls):
        if cls._backend is None:
            return cls.app.backend
        return cls._backend

    @backend.setter
    def backend(cls, value):  # noqa
        cls._backend = value

    @classmethod
    def get_logger(self, **kwargs):
        return get_task_logger(self.name)

    @classmethod
    def establish_connection(self):
        """Deprecated method used to get a broker connection.

        Should be replaced with :meth:`@Celery.connection`
        instead, or by acquiring connections from the connection pool:

        .. code-block:: python

            # using the connection pool
            with celery.pool.acquire(block=True) as conn:
                ...

            # establish fresh connection
            with celery.connection() as conn:
                ...
        """
        return self._get_app().connection()

    def get_publisher(self, connection=None, exchange=None,
                      exchange_type=None, **options):
        """Deprecated method to get the task publisher (now called producer).

        Should be replaced with :class:`@amqp.TaskProducer`:

        .. code-block:: python

            with celery.connection() as conn:
                with celery.amqp.TaskProducer(conn) as prod:
                    my_task.apply_async(producer=prod)

        """
        exchange = self.exchange if exchange is None else exchange
        if exchange_type is None:
            exchange_type = self.exchange_type
        connection = connection or self.establish_connection()
        return self._get_app().amqp.TaskProducer(
            connection,
            exchange=exchange and Exchange(exchange, exchange_type),
            routing_key=self.routing_key, **options
        )

    @classmethod
    def get_consumer(self, connection=None, queues=None, **kwargs):
        """Deprecated method used to get consumer for the queue
        this task is sent to.

        Should be replaced with :class:`@amqp.TaskConsumer` instead:

        """
        Q = self._get_app().amqp
        connection = connection or self.establish_connection()
        if queues is None:
            queues = Q.queues[self.queue] if self.queue else Q.default_queue
        return Q.TaskConsumer(connection, queues, **kwargs)


class PeriodicTask(Task):
    """A periodic task is a task that adds itself to the
    :setting:`CELERYBEAT_SCHEDULE` setting."""
    abstract = True
    ignore_result = True
    relative = False
    options = None
    compat = True

    def __init__(self):
        if not hasattr(self, 'run_every'):
            raise NotImplementedError(
                'Periodic tasks must have a run_every attribute')
        self.run_every = maybe_schedule(self.run_every, self.relative)
        super(PeriodicTask, self).__init__()

    @classmethod
    def on_bound(cls, app):
        app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
            'task': cls.name,
            'schedule': cls.run_every,
            'args': (),
            'kwargs': {},
            'options': cls.options or {},
            'relative': cls.relative,
        }


def task(*args, **kwargs):
    """Deprecated decorator, please use :func:`celery.task`."""
    return current_app.task(*args, **dict({'accept_magic_kwargs': False,
                                           'base': Task}, **kwargs))


def periodic_task(*args, **options):
    """Deprecated decorator, please use :setting:`CELERYBEAT_SCHEDULE`."""
    return task(**dict({'base': PeriodicTask}, **options))