/usr/lib/python2.7/dist-packages/celery/tests/worker/test_state.py is in python-celery 3.1.6-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | from __future__ import absolute_import
import pickle
from time import time
from celery.datastructures import LimitedSet
from celery.exceptions import SystemTerminate
from celery.worker import state
from celery.tests.case import AppCase, Mock, patch
class StateResetCase(AppCase):
def setup(self):
self.reset_state()
def teardown(self):
self.reset_state()
def reset_state(self):
state.active_requests.clear()
state.revoked.clear()
state.total_count.clear()
class MockShelve(dict):
filename = None
in_sync = False
closed = False
def open(self, filename, **kwargs):
self.filename = filename
return self
def sync(self):
self.in_sync = True
def close(self):
self.closed = True
class MyPersistent(state.Persistent):
storage = MockShelve()
class test_maybe_shutdown(AppCase):
def teardown(self):
state.should_stop = False
state.should_terminate = False
def test_should_stop(self):
state.should_stop = True
with self.assertRaises(SystemExit):
state.maybe_shutdown()
def test_should_terminate(self):
state.should_terminate = True
with self.assertRaises(SystemTerminate):
state.maybe_shutdown()
class test_Persistent(StateResetCase):
def setup(self):
self.reset_state()
self.p = MyPersistent(state, filename='celery-state')
def test_close_twice(self):
self.p._is_open = False
self.p.close()
def test_constructor(self):
self.assertDictEqual(self.p.db, {})
self.assertEqual(self.p.db.filename, self.p.filename)
def test_save(self):
self.p.db['foo'] = 'bar'
self.p.save()
self.assertTrue(self.p.db.in_sync)
self.assertTrue(self.p.db.closed)
def add_revoked(self, *ids):
for id in ids:
self.p.db.setdefault('revoked', LimitedSet()).add(id)
def test_merge(self, data=['foo', 'bar', 'baz']):
self.add_revoked(*data)
self.p.merge()
for item in data:
self.assertIn(item, state.revoked)
def test_merge_dict(self):
self.p.clock = Mock()
self.p.clock.adjust.return_value = 626
d = {'revoked': {'abc': time()}, 'clock': 313}
self.p._merge_with(d)
self.p.clock.adjust.assert_called_with(313)
self.assertEqual(d['clock'], 626)
self.assertIn('abc', state.revoked)
def test_sync_clock_and_purge(self):
passthrough = Mock()
passthrough.side_effect = lambda x: x
with patch('celery.worker.state.revoked') as revoked:
d = {'clock': 0}
self.p.clock = Mock()
self.p.clock.forward.return_value = 627
self.p._dumps = passthrough
self.p.compress = passthrough
self.p._sync_with(d)
revoked.purge.assert_called_with()
self.assertEqual(d['clock'], 627)
self.assertNotIn('revoked', d)
self.assertIs(d['zrevoked'], revoked)
def test_sync(self, data1=['foo', 'bar', 'baz'],
data2=['baz', 'ini', 'koz']):
self.add_revoked(*data1)
for item in data2:
state.revoked.add(item)
self.p.sync()
self.assertTrue(self.p.db['zrevoked'])
pickled = self.p.decompress(self.p.db['zrevoked'])
self.assertTrue(pickled)
saved = pickle.loads(pickled)
for item in data2:
self.assertIn(item, saved)
class SimpleReq(object):
def __init__(self, name):
self.name = name
class test_state(StateResetCase):
def test_accepted(self, requests=[SimpleReq('foo'),
SimpleReq('bar'),
SimpleReq('baz'),
SimpleReq('baz')]):
for request in requests:
state.task_accepted(request)
for req in requests:
self.assertIn(req, state.active_requests)
self.assertEqual(state.total_count['foo'], 1)
self.assertEqual(state.total_count['bar'], 1)
self.assertEqual(state.total_count['baz'], 2)
def test_ready(self, requests=[SimpleReq('foo'),
SimpleReq('bar')]):
for request in requests:
state.task_accepted(request)
self.assertEqual(len(state.active_requests), 2)
for request in requests:
state.task_ready(request)
self.assertEqual(len(state.active_requests), 0)
|