This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/bin/test_events.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from __future__ import absolute_import

from celery.bin import events

from celery.tests.case import AppCase, SkipTest, patch, _old_patch


class MockCommand(object):
    executed = []

    def execute_from_commandline(self, **kwargs):
        self.executed.append(True)


def proctitle(prog, info=None):
    proctitle.last = (prog, info)
proctitle.last = ()


class test_events(AppCase):

    def setup(self):
        self.ev = events.events(app=self.app)

    @_old_patch('celery.events.dumper', 'evdump',
                lambda **kw: 'me dumper, you?')
    @_old_patch('celery.bin.events', 'set_process_title', proctitle)
    def test_run_dump(self):
        self.assertEqual(self.ev.run(dump=True), 'me dumper, you?')
        self.assertIn('celery events:dump', proctitle.last[0])

    def test_run_top(self):
        try:
            import curses  # noqa
        except ImportError:
            raise SkipTest('curses monitor requires curses')

        @_old_patch('celery.events.cursesmon', 'evtop',
                    lambda **kw: 'me top, you?')
        @_old_patch('celery.bin.events', 'set_process_title', proctitle)
        def _inner():
            self.assertEqual(self.ev.run(), 'me top, you?')
            self.assertIn('celery events:top', proctitle.last[0])
        return _inner()

    @_old_patch('celery.events.snapshot', 'evcam',
                lambda *a, **k: (a, k))
    @_old_patch('celery.bin.events', 'set_process_title', proctitle)
    def test_run_cam(self):
        a, kw = self.ev.run(camera='foo.bar.baz', logfile='logfile')
        self.assertEqual(a[0], 'foo.bar.baz')
        self.assertEqual(kw['freq'], 1.0)
        self.assertIsNone(kw['maxrate'])
        self.assertEqual(kw['loglevel'], 'INFO')
        self.assertEqual(kw['logfile'], 'logfile')
        self.assertIn('celery events:cam', proctitle.last[0])

    @patch('celery.events.snapshot.evcam')
    @patch('celery.bin.events.detached')
    def test_run_cam_detached(self, detached, evcam):
        self.ev.prog_name = 'celery events'
        self.ev.run_evcam('myapp.Camera', detach=True)
        self.assertTrue(detached.called)
        self.assertTrue(evcam.called)

    def test_get_options(self):
        self.assertTrue(self.ev.get_options())

    @_old_patch('celery.bin.events', 'events', MockCommand)
    def test_main(self):
        MockCommand.executed = []
        events.main()
        self.assertTrue(MockCommand.executed)