This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/worker/test_state.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from __future__ import absolute_import

import pickle

from time import time

from celery.datastructures import LimitedSet
from celery.exceptions import WorkerShutdown, WorkerTerminate
from celery.worker import state

from celery.tests.case import AppCase, Mock, patch


class StateResetCase(AppCase):

    def setup(self):
        self.reset_state()

    def teardown(self):
        self.reset_state()

    def reset_state(self):
        state.active_requests.clear()
        state.revoked.clear()
        state.total_count.clear()


class MockShelve(dict):
    filename = None
    in_sync = False
    closed = False

    def open(self, filename, **kwargs):
        self.filename = filename
        return self

    def sync(self):
        self.in_sync = True

    def close(self):
        self.closed = True


class MyPersistent(state.Persistent):
    storage = MockShelve()


class test_maybe_shutdown(AppCase):

    def teardown(self):
        state.should_stop = False
        state.should_terminate = False

    def test_should_stop(self):
        state.should_stop = True
        with self.assertRaises(WorkerShutdown):
            state.maybe_shutdown()

    def test_should_terminate(self):
        state.should_terminate = True
        with self.assertRaises(WorkerTerminate):
            state.maybe_shutdown()


class test_Persistent(StateResetCase):

    def setup(self):
        self.reset_state()
        self.p = MyPersistent(state, filename='celery-state')

    def test_close_twice(self):
        self.p._is_open = False
        self.p.close()

    def test_constructor(self):
        self.assertDictEqual(self.p.db, {})
        self.assertEqual(self.p.db.filename, self.p.filename)

    def test_save(self):
        self.p.db['foo'] = 'bar'
        self.p.save()
        self.assertTrue(self.p.db.in_sync)
        self.assertTrue(self.p.db.closed)

    def add_revoked(self, *ids):
        for id in ids:
            self.p.db.setdefault('revoked', LimitedSet()).add(id)

    def test_merge(self, data=['foo', 'bar', 'baz']):
        self.add_revoked(*data)
        self.p.merge()
        for item in data:
            self.assertIn(item, state.revoked)

    def test_merge_dict(self):
        self.p.clock = Mock()
        self.p.clock.adjust.return_value = 626
        d = {'revoked': {'abc': time()}, 'clock': 313}
        self.p._merge_with(d)
        self.p.clock.adjust.assert_called_with(313)
        self.assertEqual(d['clock'], 626)
        self.assertIn('abc', state.revoked)

    def test_sync_clock_and_purge(self):
        passthrough = Mock()
        passthrough.side_effect = lambda x: x
        with patch('celery.worker.state.revoked') as revoked:
            d = {'clock': 0}
            self.p.clock = Mock()
            self.p.clock.forward.return_value = 627
            self.p._dumps = passthrough
            self.p.compress = passthrough
            self.p._sync_with(d)
            revoked.purge.assert_called_with()
            self.assertEqual(d['clock'], 627)
            self.assertNotIn('revoked', d)
            self.assertIs(d['zrevoked'], revoked)

    def test_sync(self, data1=['foo', 'bar', 'baz'],
                  data2=['baz', 'ini', 'koz']):
        self.add_revoked(*data1)
        for item in data2:
            state.revoked.add(item)
        self.p.sync()

        self.assertTrue(self.p.db['zrevoked'])
        pickled = self.p.decompress(self.p.db['zrevoked'])
        self.assertTrue(pickled)
        saved = pickle.loads(pickled)
        for item in data2:
            self.assertIn(item, saved)


class SimpleReq(object):

    def __init__(self, name):
        self.name = name


class test_state(StateResetCase):

    def test_accepted(self, requests=[SimpleReq('foo'),
                                      SimpleReq('bar'),
                                      SimpleReq('baz'),
                                      SimpleReq('baz')]):
        for request in requests:
            state.task_accepted(request)
        for req in requests:
            self.assertIn(req, state.active_requests)
        self.assertEqual(state.total_count['foo'], 1)
        self.assertEqual(state.total_count['bar'], 1)
        self.assertEqual(state.total_count['baz'], 2)

    def test_ready(self, requests=[SimpleReq('foo'),
                                   SimpleReq('bar')]):
        for request in requests:
            state.task_accepted(request)
        self.assertEqual(len(state.active_requests), 2)
        for request in requests:
            state.task_ready(request)
        self.assertEqual(len(state.active_requests), 0)