This file is indexed.

/usr/share/pyshared/celery/events/snapshot.py is in python-celery 2.4.6-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# -*- coding: utf-8 -*-
"""
    celery.events.snapshot
    ~~~~~~~~~~~~~~~~~~~~~~

    Consuming the events as a stream is not always suitable
    so this module implements a system to take snapshots of the
    state of a cluster at regular intervals.  There is a full
    implementation of this writing the snapshots to a database
    in :mod:`djcelery.snapshots` in the `django-celery` distribution.

    :copyright: (c) 2009 - 2011 by Ask Solem.
    :license: BSD, see LICENSE for more details.

"""
from __future__ import absolute_import

import atexit

from .. import platforms
from ..app import app_or_default
from ..datastructures import TokenBucket
from ..utils import timer2, instantiate, LOG_LEVELS
from ..utils.dispatch import Signal
from ..utils.timeutils import rate


class Polaroid(object):
    timer = timer2
    shutter_signal = Signal(providing_args=("state", ))
    cleanup_signal = Signal()
    clear_after = False

    _tref = None
    _ctref = None

    def __init__(self, state, freq=1.0, maxrate=None,
            cleanup_freq=3600.0, logger=None, timer=None, app=None):
        self.app = app_or_default(app)
        self.state = state
        self.freq = freq
        self.cleanup_freq = cleanup_freq
        self.timer = timer or self.timer
        self.logger = logger or \
                self.app.log.get_default_logger(name="celery.cam")
        self.maxrate = maxrate and TokenBucket(rate(maxrate))

    def install(self):
        self._tref = self.timer.apply_interval(self.freq * 1000.0,
                                               self.capture)
        self._ctref = self.timer.apply_interval(self.cleanup_freq * 1000.0,
                                                self.cleanup)

    def on_shutter(self, state):
        pass

    def on_cleanup(self):
        pass

    def cleanup(self):
        self.logger.debug("Cleanup: Running...")
        self.cleanup_signal.send(None)
        self.on_cleanup()

    def shutter(self):
        if self.maxrate is None or self.maxrate.can_consume():
            self.logger.debug("Shutter: %s", self.state)
            self.shutter_signal.send(self.state)
            self.on_shutter(self.state)

    def capture(self):
        self.state.freeze_while(self.shutter, clear_after=self.clear_after)

    def cancel(self):
        if self._tref:
            self._tref()  # flush all received events.
            self._tref.cancel()
        if self._ctref:
            self._ctref.cancel()

    def __enter__(self):
        self.install()
        return self

    def __exit__(self, *exc_info):
        self.cancel()


def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
        logfile=None, pidfile=None, timer=None, app=None):
    app = app_or_default(app)

    if pidfile:
        pidlock = platforms.create_pidlock(pidfile).acquire()
        atexit.register(pidlock.release)

    if not isinstance(loglevel, int):
        loglevel = LOG_LEVELS[loglevel.upper()]
    logger = app.log.setup_logger(loglevel=loglevel,
                                  logfile=logfile,
                                  name="celery.evcam")

    logger.info(
        "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
            camera, freq))
    state = app.events.State()
    cam = instantiate(camera, state, app=app,
                      freq=freq, maxrate=maxrate, logger=logger,
                      timer=timer)
    cam.install()
    conn = app.broker_connection()
    recv = app.events.Receiver(conn, handlers={"*": state.event})
    try:
        try:
            recv.capture(limit=None)
        except KeyboardInterrupt:
            raise SystemExit
    finally:
        cam.cancel()
        conn.close()