/usr/lib/python2.7/dist-packages/celery/backends/rpc.py is in python-celery 3.1.6-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | # -*- coding: utf-8 -*-
"""
celery.backends.rpc
~~~~~~~~~~~~~~~~~~~
RPC-style result backend, using reply-to and one queue per client.
"""
from __future__ import absolute_import
from kombu import Consumer, Exchange
from kombu.common import maybe_declare
from kombu.utils import cached_property
from celery import current_task
from celery.backends import amqp
__all__ = ['RPCBackend']
class RPCBackend(amqp.AMQPBackend):
persistent = False
class Consumer(Consumer):
auto_declare = False
def _create_exchange(self, name, type='direct', delivery_mode=2):
# uses direct to queue routing (anon exchange).
return Exchange(None)
def on_task_call(self, producer, task_id):
maybe_declare(self.binding(producer.channel), retry=True)
def _create_binding(self, task_id):
return self.binding
def _many_bindings(self, ids):
return [self.binding]
def rkey(self, task_id):
return task_id
def destination_for(self, task_id, request):
# Request is a new argument for backends, so must still support
# old code that rely on current_task
try:
request = request or current_task.request
except AttributeError:
raise RuntimeError(
'RPC backend missing task request for {0!r}'.format(task_id),
)
return request.reply_to, request.correlation_id or task_id
def on_reply_declare(self, task_id):
pass
@property
def binding(self):
return self.Queue(self.oid, self.exchange, self.oid,
durable=False, auto_delete=False)
@cached_property
def oid(self):
return self.app.oid
|