This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/worker/test_heartbeat.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from __future__ import absolute_import

from celery.worker.heartbeat import Heart
from celery.tests.case import AppCase


class MockDispatcher(object):
    heart = None
    next_iter = 0

    def __init__(self):
        self.sent = []
        self.on_enabled = set()
        self.on_disabled = set()
        self.enabled = True

    def send(self, msg, **_fields):
        self.sent.append(msg)
        if self.heart:
            if self.next_iter > 10:
                self.heart._shutdown.set()
            self.next_iter += 1


class MockDispatcherRaising(object):

    def send(self, msg):
        if msg == 'worker-offline':
            raise Exception('foo')


class MockTimer(object):

    def call_repeatedly(self, secs, fun, args=(), kwargs={}):

        class entry(tuple):
            cancelled = False

            def cancel(self):
                self.cancelled = True

        return entry((secs, fun, args, kwargs))

    def cancel(self, entry):
        entry.cancel()


class test_Heart(AppCase):

    def test_start_stop(self):
        timer = MockTimer()
        eventer = MockDispatcher()
        h = Heart(timer, eventer, interval=1)
        h.start()
        self.assertTrue(h.tref)
        h.stop()
        self.assertIsNone(h.tref)
        h.stop()

    def test_start_when_disabled(self):
        timer = MockTimer()
        eventer = MockDispatcher()
        eventer.enabled = False
        h = Heart(timer, eventer)
        h.start()
        self.assertFalse(h.tref)

    def test_stop_when_disabled(self):
        timer = MockTimer()
        eventer = MockDispatcher()
        eventer.enabled = False
        h = Heart(timer, eventer)
        h.stop()