This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/bin/test_beat.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from __future__ import absolute_import

import logging
import sys

from collections import defaultdict

from celery import beat
from celery import platforms
from celery.bin import beat as beat_bin
from celery.apps import beat as beatapp

from celery.tests.case import AppCase, Mock, patch, restore_logging
from kombu.tests.case import redirect_stdouts


class MockedShelveModule(object):
    shelves = defaultdict(lambda: {})

    def open(self, filename, *args, **kwargs):
        return self.shelves[filename]
mocked_shelve = MockedShelveModule()


class MockService(beat.Service):
    started = False
    in_sync = False
    persistence = mocked_shelve

    def start(self):
        self.__class__.started = True

    def sync(self):
        self.__class__.in_sync = True


class MockBeat(beatapp.Beat):
    running = False

    def run(self):
        MockBeat.running = True


class MockBeat2(beatapp.Beat):
    Service = MockService

    def install_sync_handler(self, b):
        pass


class MockBeat3(beatapp.Beat):
    Service = MockService

    def install_sync_handler(self, b):
        raise TypeError('xxx')


class test_Beat(AppCase):

    def test_loglevel_string(self):
        b = beatapp.Beat(app=self.app, loglevel='DEBUG',
                         redirect_stdouts=False)
        self.assertEqual(b.loglevel, logging.DEBUG)

        b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
                          redirect_stdouts=False)
        self.assertEqual(b2.loglevel, logging.DEBUG)

    def test_colorize(self):
        self.app.log.setup = Mock()
        b = beatapp.Beat(app=self.app, no_color=True,
                         redirect_stdouts=False)
        b.setup_logging()
        self.assertTrue(self.app.log.setup.called)
        self.assertEqual(self.app.log.setup.call_args[1]['colorize'], False)

    def test_init_loader(self):
        b = beatapp.Beat(app=self.app, redirect_stdouts=False)
        b.init_loader()

    def test_process_title(self):
        b = beatapp.Beat(app=self.app, redirect_stdouts=False)
        b.set_process_title()

    def test_run(self):
        b = MockBeat2(app=self.app, redirect_stdouts=False)
        MockService.started = False
        b.run()
        self.assertTrue(MockService.started)

    def psig(self, fun, *args, **kwargs):
        handlers = {}

        class Signals(platforms.Signals):

            def __setitem__(self, sig, handler):
                handlers[sig] = handler

        p, platforms.signals = platforms.signals, Signals()
        try:
            fun(*args, **kwargs)
            return handlers
        finally:
            platforms.signals = p

    def test_install_sync_handler(self):
        b = beatapp.Beat(app=self.app, redirect_stdouts=False)
        clock = MockService(app=self.app)
        MockService.in_sync = False
        handlers = self.psig(b.install_sync_handler, clock)
        with self.assertRaises(SystemExit):
            handlers['SIGINT']('SIGINT', object())
        self.assertTrue(MockService.in_sync)
        MockService.in_sync = False

    def test_setup_logging(self):
        with restore_logging():
            try:
                # py3k
                delattr(sys.stdout, 'logger')
            except AttributeError:
                pass
            b = beatapp.Beat(app=self.app, redirect_stdouts=False)
            b.redirect_stdouts = False
            b.app.log.already_setup = False
            b.setup_logging()
            with self.assertRaises(AttributeError):
                sys.stdout.logger

    @redirect_stdouts
    @patch('celery.apps.beat.logger')
    def test_logs_errors(self, logger, stdout, stderr):
        with restore_logging():
            b = MockBeat3(
                app=self.app, redirect_stdouts=False, socket_timeout=None,
            )
            b.start_scheduler()
            self.assertTrue(logger.critical.called)

    @redirect_stdouts
    @patch('celery.platforms.create_pidlock')
    def test_use_pidfile(self, create_pidlock, stdout, stderr):
        b = MockBeat2(app=self.app, pidfile='pidfilelockfilepid',
                      socket_timeout=None, redirect_stdouts=False)
        b.start_scheduler()
        self.assertTrue(create_pidlock.called)


class MockDaemonContext(object):
    opened = False
    closed = False

    def __init__(self, *args, **kwargs):
        pass

    def open(self):
        self.__class__.opened = True
        return self
    __enter__ = open

    def close(self, *args):
        self.__class__.closed = True
    __exit__ = close


class test_div(AppCase):

    def setup(self):
        self.prev, beatapp.Beat = beatapp.Beat, MockBeat
        self.ctx, beat_bin.detached = (
            beat_bin.detached, MockDaemonContext,
        )

    def teardown(self):
        beatapp.Beat = self.prev

    def test_main(self):
        sys.argv = [sys.argv[0], '-s', 'foo']
        try:
            beat_bin.main(app=self.app)
            self.assertTrue(MockBeat.running)
        finally:
            MockBeat.running = False

    def test_detach(self):
        cmd = beat_bin.beat()
        cmd.app = self.app
        cmd.run(detach=True)
        self.assertTrue(MockDaemonContext.opened)
        self.assertTrue(MockDaemonContext.closed)

    def test_parse_options(self):
        cmd = beat_bin.beat()
        cmd.app = self.app
        options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
        self.assertEqual(options.schedule, 'foo')