/usr/lib/python2.7/dist-packages/celery/task/base.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | # -*- coding: utf-8 -*-
"""
celery.task.base
~~~~~~~~~~~~~~~~
The task implementation has been moved to :mod:`celery.app.task`.
This contains the backward compatible Task class used in the old API,
and shouldn't be used in new applications.
"""
from __future__ import absolute_import
from kombu import Exchange
from celery import current_app
from celery.app.task import Context, TaskType, Task as BaseTask # noqa
from celery.five import class_property, reclassmethod
from celery.schedules import maybe_schedule
from celery.utils.log import get_task_logger
__all__ = ['Task', 'PeriodicTask', 'task']
#: list of methods that must be classmethods in the old API.
_COMPAT_CLASSMETHODS = (
'delay', 'apply_async', 'retry', 'apply', 'subtask_from_request',
'AsyncResult', 'subtask', '_get_request', '_get_exec_options',
)
class Task(BaseTask):
"""Deprecated Task base class.
Modern applications should use :class:`celery.Task` instead.
"""
abstract = True
__bound__ = False
__v2_compat__ = True
# - Deprecated compat. attributes -:
queue = None
routing_key = None
exchange = None
exchange_type = None
delivery_mode = None
mandatory = False # XXX deprecated
immediate = False # XXX deprecated
priority = None
type = 'regular'
disable_error_emails = False
accept_magic_kwargs = False
from_config = BaseTask.from_config + (
('exchange_type', 'CELERY_DEFAULT_EXCHANGE_TYPE'),
('delivery_mode', 'CELERY_DEFAULT_DELIVERY_MODE'),
)
# In old Celery the @task decorator didn't exist, so one would create
# classes instead and use them directly (e.g. MyTask.apply_async()).
# the use of classmethods was a hack so that it was not necessary
# to instantiate the class before using it, but it has only
# given us pain (like all magic).
for name in _COMPAT_CLASSMETHODS:
locals()[name] = reclassmethod(getattr(BaseTask, name))
@class_property
def request(cls):
return cls._get_request()
@class_property
def backend(cls):
if cls._backend is None:
return cls.app.backend
return cls._backend
@backend.setter
def backend(cls, value): # noqa
cls._backend = value
@classmethod
def get_logger(self, **kwargs):
return get_task_logger(self.name)
@classmethod
def establish_connection(self):
"""Deprecated method used to get a broker connection.
Should be replaced with :meth:`@Celery.connection`
instead, or by acquiring connections from the connection pool:
.. code-block:: python
# using the connection pool
with celery.pool.acquire(block=True) as conn:
...
# establish fresh connection
with celery.connection() as conn:
...
"""
return self._get_app().connection()
def get_publisher(self, connection=None, exchange=None,
exchange_type=None, **options):
"""Deprecated method to get the task publisher (now called producer).
Should be replaced with :class:`@amqp.TaskProducer`:
.. code-block:: python
with celery.connection() as conn:
with celery.amqp.TaskProducer(conn) as prod:
my_task.apply_async(producer=prod)
"""
exchange = self.exchange if exchange is None else exchange
if exchange_type is None:
exchange_type = self.exchange_type
connection = connection or self.establish_connection()
return self._get_app().amqp.TaskProducer(
connection,
exchange=exchange and Exchange(exchange, exchange_type),
routing_key=self.routing_key, **options
)
@classmethod
def get_consumer(self, connection=None, queues=None, **kwargs):
"""Deprecated method used to get consumer for the queue
this task is sent to.
Should be replaced with :class:`@amqp.TaskConsumer` instead:
"""
Q = self._get_app().amqp
connection = connection or self.establish_connection()
if queues is None:
queues = Q.queues[self.queue] if self.queue else Q.default_queue
return Q.TaskConsumer(connection, queues, **kwargs)
class PeriodicTask(Task):
"""A periodic task is a task that adds itself to the
:setting:`CELERYBEAT_SCHEDULE` setting."""
abstract = True
ignore_result = True
relative = False
options = None
compat = True
def __init__(self):
if not hasattr(self, 'run_every'):
raise NotImplementedError(
'Periodic tasks must have a run_every attribute')
self.run_every = maybe_schedule(self.run_every, self.relative)
super(PeriodicTask, self).__init__()
@classmethod
def on_bound(cls, app):
app.conf.CELERYBEAT_SCHEDULE[cls.name] = {
'task': cls.name,
'schedule': cls.run_every,
'args': (),
'kwargs': {},
'options': cls.options or {},
'relative': cls.relative,
}
def task(*args, **kwargs):
"""Deprecated decorator, please use :func:`celery.task`."""
return current_app.task(*args, **dict({'accept_magic_kwargs': False,
'base': Task}, **kwargs))
def periodic_task(*args, **options):
"""Deprecated decorator, please use :setting:`CELERYBEAT_SCHEDULE`."""
return task(**dict({'base': PeriodicTask}, **options))
|