/usr/lib/python2.7/dist-packages/celery/events/dumper.py is in python-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | # -*- coding: utf-8 -*-
"""Utility to dump events to screen.
This is a simple program that dumps events to the console
as they happen. Think of it like a `tcpdump` for Celery events.
"""
from __future__ import absolute_import, print_function, unicode_literals
import sys
from datetime import datetime
from celery.app import app_or_default
from celery.utils.functional import LRUCache
from celery.utils.time import humanize_seconds
__all__ = ['Dumper', 'evdump']
TASK_NAMES = LRUCache(limit=0xFFF)
HUMAN_TYPES = {
'worker-offline': 'shutdown',
'worker-online': 'started',
'worker-heartbeat': 'heartbeat',
}
CONNECTION_ERROR = """\
-> Cannot connect to %s: %s.
Trying again %s
"""
def humanize_type(type):
try:
return HUMAN_TYPES[type.lower()]
except KeyError:
return type.lower().replace('-', ' ')
class Dumper(object):
"""Monitor events."""
def __init__(self, out=sys.stdout):
self.out = out
def say(self, msg):
print(msg, file=self.out)
# need to flush so that output can be piped.
try:
self.out.flush()
except AttributeError: # pragma: no cover
pass
def on_event(self, ev):
timestamp = datetime.utcfromtimestamp(ev.pop('timestamp'))
type = ev.pop('type').lower()
hostname = ev.pop('hostname')
if type.startswith('task-'):
uuid = ev.pop('uuid')
if type in ('task-received', 'task-sent'):
task = TASK_NAMES[uuid] = '{0}({1}) args={2} kwargs={3}' \
.format(ev.pop('name'), uuid,
ev.pop('args'),
ev.pop('kwargs'))
else:
task = TASK_NAMES.get(uuid, '')
return self.format_task_event(hostname, timestamp,
type, task, ev)
fields = ', '.join(
'{0}={1}'.format(key, ev[key]) for key in sorted(ev)
)
sep = fields and ':' or ''
self.say('{0} [{1}] {2}{3} {4}'.format(
hostname, timestamp, humanize_type(type), sep, fields),
)
def format_task_event(self, hostname, timestamp, type, task, event):
fields = ', '.join(
'{0}={1}'.format(key, event[key]) for key in sorted(event)
)
sep = fields and ':' or ''
self.say('{0} [{1}] {2}{3} {4} {5}'.format(
hostname, timestamp, humanize_type(type), sep, task, fields),
)
def evdump(app=None, out=sys.stdout):
"""Start event dump."""
app = app_or_default(app)
dumper = Dumper(out=out)
dumper.say('-> evdump: starting capture...')
conn = app.connection_for_read().clone()
def _error_handler(exc, interval):
dumper.say(CONNECTION_ERROR % (
conn.as_uri(), exc, humanize_seconds(interval, 'in', ' ')
))
while 1:
try:
conn.ensure_connection(_error_handler)
recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
recv.capture()
except (KeyboardInterrupt, SystemExit):
return conn and conn.close()
except conn.connection_errors + conn.channel_errors:
dumper.say('-> Connection lost, attempting reconnect')
if __name__ == '__main__': # pragma: no cover
evdump()
|