/usr/share/pyshared/celery/events/snapshot.py is in python-celery 2.4.6-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | # -*- coding: utf-8 -*-
"""
celery.events.snapshot
~~~~~~~~~~~~~~~~~~~~~~
Consuming the events as a stream is not always suitable
so this module implements a system to take snapshots of the
state of a cluster at regular intervals. There is a full
implementation of this writing the snapshots to a database
in :mod:`djcelery.snapshots` in the `django-celery` distribution.
:copyright: (c) 2009 - 2011 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import atexit
from .. import platforms
from ..app import app_or_default
from ..datastructures import TokenBucket
from ..utils import timer2, instantiate, LOG_LEVELS
from ..utils.dispatch import Signal
from ..utils.timeutils import rate
class Polaroid(object):
timer = timer2
shutter_signal = Signal(providing_args=("state", ))
cleanup_signal = Signal()
clear_after = False
_tref = None
_ctref = None
def __init__(self, state, freq=1.0, maxrate=None,
cleanup_freq=3600.0, logger=None, timer=None, app=None):
self.app = app_or_default(app)
self.state = state
self.freq = freq
self.cleanup_freq = cleanup_freq
self.timer = timer or self.timer
self.logger = logger or \
self.app.log.get_default_logger(name="celery.cam")
self.maxrate = maxrate and TokenBucket(rate(maxrate))
def install(self):
self._tref = self.timer.apply_interval(self.freq * 1000.0,
self.capture)
self._ctref = self.timer.apply_interval(self.cleanup_freq * 1000.0,
self.cleanup)
def on_shutter(self, state):
pass
def on_cleanup(self):
pass
def cleanup(self):
self.logger.debug("Cleanup: Running...")
self.cleanup_signal.send(None)
self.on_cleanup()
def shutter(self):
if self.maxrate is None or self.maxrate.can_consume():
self.logger.debug("Shutter: %s", self.state)
self.shutter_signal.send(self.state)
self.on_shutter(self.state)
def capture(self):
self.state.freeze_while(self.shutter, clear_after=self.clear_after)
def cancel(self):
if self._tref:
self._tref() # flush all received events.
self._tref.cancel()
if self._ctref:
self._ctref.cancel()
def __enter__(self):
self.install()
return self
def __exit__(self, *exc_info):
self.cancel()
def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
logfile=None, pidfile=None, timer=None, app=None):
app = app_or_default(app)
if pidfile:
pidlock = platforms.create_pidlock(pidfile).acquire()
atexit.register(pidlock.release)
if not isinstance(loglevel, int):
loglevel = LOG_LEVELS[loglevel.upper()]
logger = app.log.setup_logger(loglevel=loglevel,
logfile=logfile,
name="celery.evcam")
logger.info(
"-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
camera, freq))
state = app.events.State()
cam = instantiate(camera, state, app=app,
freq=freq, maxrate=maxrate, logger=logger,
timer=timer)
cam.install()
conn = app.broker_connection()
recv = app.events.Receiver(conn, handlers={"*": state.event})
try:
try:
recv.capture(limit=None)
except KeyboardInterrupt:
raise SystemExit
finally:
cam.cancel()
conn.close()
|