/usr/lib/python2.7/dist-packages/celery/events/receiver.py is in python-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | """Event receiver implementation."""
from __future__ import absolute_import, unicode_literals
import time
from operator import itemgetter
from kombu import Queue
from kombu.connection import maybe_channel
from kombu.mixins import ConsumerMixin
from celery import uuid
from celery.app import app_or_default
from celery.utils.time import adjust_timestamp
from .event import get_exchange
__all__ = ['EventReceiver']
CLIENT_CLOCK_SKEW = -1
_TZGETTER = itemgetter('utcoffset', 'timestamp')
class EventReceiver(ConsumerMixin):
"""Capture events.
Arguments:
connection (kombu.Connection): Connection to the broker.
handlers (Mapping[Callable]): Event handlers.
This is a map of event type names and their handlers.
The special handler `"*"` captures all events that don't have a
handler.
"""
app = None
def __init__(self, channel, handlers=None, routing_key='#',
node_id=None, app=None, queue_prefix=None,
accept=None, queue_ttl=None, queue_expires=None):
self.app = app_or_default(app or self.app)
self.channel = maybe_channel(channel)
self.handlers = {} if handlers is None else handlers
self.routing_key = routing_key
self.node_id = node_id or uuid()
self.queue_prefix = queue_prefix or self.app.conf.event_queue_prefix
self.exchange = get_exchange(
self.connection or self.app.connection_for_write())
if queue_ttl is None:
queue_ttl = self.app.conf.event_queue_ttl
if queue_expires is None:
queue_expires = self.app.conf.event_queue_expires
self.queue = Queue(
'.'.join([self.queue_prefix, self.node_id]),
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=True, durable=False,
message_ttl=queue_ttl,
expires=queue_expires,
)
self.clock = self.app.clock
self.adjust_clock = self.clock.adjust
self.forward_clock = self.clock.forward
if accept is None:
accept = {self.app.conf.event_serializer, 'json'}
self.accept = accept
def process(self, type, event):
"""Process event by dispatching to configured handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
handler and handler(event)
def get_consumers(self, Consumer, channel):
return [Consumer(queues=[self.queue],
callbacks=[self._receive], no_ack=True,
accept=self.accept)]
def on_consume_ready(self, connection, channel, consumers,
wakeup=True, **kwargs):
if wakeup:
self.wakeup_workers(channel=channel)
def itercapture(self, limit=None, timeout=None, wakeup=True):
return self.consume(limit=limit, timeout=timeout, wakeup=wakeup)
def capture(self, limit=None, timeout=None, wakeup=True):
"""Open up a consumer capturing events.
This has to run in the main process, and it will never stop
unless :attr:`EventDispatcher.should_stop` is set to True, or
forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
"""
return list(self.consume(limit=limit, timeout=timeout, wakeup=wakeup))
def wakeup_workers(self, channel=None):
self.app.control.broadcast('heartbeat',
connection=self.connection,
channel=channel)
def event_from_message(self, body, localize=True,
now=time.time, tzfields=_TZGETTER,
adjust_timestamp=adjust_timestamp,
CLIENT_CLOCK_SKEW=CLIENT_CLOCK_SKEW):
type = body['type']
if type == 'task-sent':
# clients never sync so cannot use their clock value
_c = body['clock'] = (self.clock.value or 1) + CLIENT_CLOCK_SKEW
self.adjust_clock(_c)
else:
try:
clock = body['clock']
except KeyError:
body['clock'] = self.forward_clock()
else:
self.adjust_clock(clock)
if localize:
try:
offset, timestamp = tzfields(body)
except KeyError:
pass
else:
body['timestamp'] = adjust_timestamp(timestamp, offset)
body['local_received'] = now()
return type, body
def _receive(self, body, message, list=list, isinstance=isinstance):
if isinstance(body, list): # celery 4.0: List of events
process, from_message = self.process, self.event_from_message
[process(*from_message(event)) for event in body]
else:
self.process(*self.event_from_message(body))
@property
def connection(self):
return self.channel.connection.client if self.channel else None
|