This file is indexed.

/usr/share/pyshared/celery/apps/beat.py is in python-celery 2.5.3-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import atexit
import socket
import sys

from .. import __version__, platforms
from .. import beat
from ..app import app_or_default
from ..app.abstract import configurated, from_config
from ..utils import LOG_LEVELS, qualname
from ..utils.timeutils import humanize_seconds

STARTUP_INFO_FMT = """
Configuration ->
    . broker -> %(conninfo)s
    . loader -> %(loader)s
    . scheduler -> %(scheduler)s
%(scheduler_info)s
    . logfile -> %(logfile)s@%(loglevel)s
    . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
""".strip()


class Beat(configurated):
    Service = beat.Service

    loglevel = from_config("log_level")
    logfile = from_config("log_file")
    schedule = from_config("schedule_filename")
    scheduler_cls = from_config("scheduler")
    redirect_stdouts = from_config()
    redirect_stdouts_level = from_config()

    def __init__(self, max_interval=None, app=None,
            socket_timeout=30, pidfile=None, **kwargs):
        """Starts the celerybeat task scheduler."""
        self.app = app = app_or_default(app)
        self.setup_defaults(kwargs, namespace="celerybeat")

        self.max_interval = max_interval
        self.socket_timeout = socket_timeout
        self.colored = app.log.colored(self.logfile)
        self.pidfile = pidfile

        if not isinstance(self.loglevel, int):
            self.loglevel = LOG_LEVELS[self.loglevel.upper()]

    def run(self):
        logger = self.setup_logging()
        print(str(self.colored.cyan(
                    "celerybeat v%s is starting." % __version__)))
        self.init_loader()
        self.set_process_title()
        self.start_scheduler(logger)

    def setup_logging(self):
        handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
                                                       logfile=self.logfile)
        logger = self.app.log.get_default_logger(name="celery.beat")
        if self.redirect_stdouts and not handled:
            self.app.log.redirect_stdouts_to_logger(logger,
                    loglevel=self.redirect_stdouts_level)
        return logger

    def start_scheduler(self, logger=None):
        c = self.colored
        if self.pidfile:
            pidlock = platforms.create_pidlock(self.pidfile).acquire()
            atexit.register(pidlock.release)
        beat = self.Service(app=self.app,
                            logger=logger,
                            max_interval=self.max_interval,
                            scheduler_cls=self.scheduler_cls,
                            schedule_filename=self.schedule)

        print(str(c.blue("__    ", c.magenta("-"),
                  c.blue("    ... __   "), c.magenta("-"),
                  c.blue("        _\n"),
                  c.reset(self.startup_info(beat)))))
        if self.socket_timeout:
            logger.debug("Setting default socket timeout to %r",
                         self.socket_timeout)
            socket.setdefaulttimeout(self.socket_timeout)
        try:
            self.install_sync_handler(beat)
            beat.start()
        except Exception, exc:
            logger.critical("celerybeat raised exception %s: %r",
                            exc.__class__, exc,
                            exc_info=True)

    def init_loader(self):
        # Run the worker init handler.
        # (Usually imports task modules and such.)
        self.app.loader.init_worker()

    def startup_info(self, beat):
        scheduler = beat.get_scheduler(lazy=True)
        return STARTUP_INFO_FMT % {
            "conninfo": self.app.broker_connection().as_uri(),
            "logfile": self.logfile or "[stderr]",
            "loglevel": LOG_LEVELS[self.loglevel],
            "loader": qualname(self.app.loader),
            "scheduler": qualname(scheduler),
            "scheduler_info": scheduler.info,
            "hmax_interval": humanize_seconds(beat.max_interval),
            "max_interval": beat.max_interval,
        }

    def set_process_title(self):
        arg_start = "manage" in sys.argv[0] and 2 or 1
        platforms.set_process_title("celerybeat",
                               info=" ".join(sys.argv[arg_start:]))

    def install_sync_handler(self, beat):
        """Install a `SIGTERM` + `SIGINT` handler that saves
        the celerybeat schedule."""

        def _sync(signum, frame):
            beat.sync()
            raise SystemExit()

        platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)