/usr/lib/python3/dist-packages/celery/tests/worker/test_strategy.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | from __future__ import absolute_import
from collections import defaultdict
from contextlib import contextmanager
from kombu.utils.limits import TokenBucket
from celery.worker import state
from celery.utils.timeutils import rate
from celery.tests.case import AppCase, Mock, patch, body_from_sig
class test_default_strategy(AppCase):
def setup(self):
@self.app.task(shared=False)
def add(x, y):
return x + y
self.add = add
class Context(object):
def __init__(self, sig, s, reserved, consumer, message, body):
self.sig = sig
self.s = s
self.reserved = reserved
self.consumer = consumer
self.message = message
self.body = body
def __call__(self, **kwargs):
return self.s(
self.message, self.body,
self.message.ack, self.message.reject, [], **kwargs
)
def was_reserved(self):
return self.reserved.called
def was_rate_limited(self):
assert not self.was_reserved()
return self.consumer._limit_task.called
def was_scheduled(self):
assert not self.was_reserved()
assert not self.was_rate_limited()
return self.consumer.timer.call_at.called
def event_sent(self):
return self.consumer.event_dispatcher.send.call_args
def get_request(self):
if self.was_reserved():
return self.reserved.call_args[0][0]
if self.was_rate_limited():
return self.consumer._limit_task.call_args[0][0]
if self.was_scheduled():
return self.consumer.timer.call_at.call_args[0][0]
raise ValueError('request not handled')
@contextmanager
def _context(self, sig,
rate_limits=True, events=True, utc=True, limit=None):
self.assertTrue(sig.type.Strategy)
reserved = Mock()
consumer = Mock()
consumer.task_buckets = defaultdict(lambda: None)
if limit:
bucket = TokenBucket(rate(limit), capacity=1)
consumer.task_buckets[sig.task] = bucket
consumer.disable_rate_limits = not rate_limits
consumer.event_dispatcher.enabled = events
s = sig.type.start_strategy(self.app, consumer, task_reserved=reserved)
self.assertTrue(s)
message = Mock()
body = body_from_sig(self.app, sig, utc=utc)
yield self.Context(sig, s, reserved, consumer, message, body)
def test_when_logging_disabled(self):
with patch('celery.worker.strategy.logger') as logger:
logger.isEnabledFor.return_value = False
with self._context(self.add.s(2, 2)) as C:
C()
self.assertFalse(logger.info.called)
def test_task_strategy(self):
with self._context(self.add.s(2, 2)) as C:
C()
self.assertTrue(C.was_reserved())
req = C.get_request()
C.consumer.on_task_request.assert_called_with(req)
self.assertTrue(C.event_sent())
def test_when_events_disabled(self):
with self._context(self.add.s(2, 2), events=False) as C:
C()
self.assertTrue(C.was_reserved())
self.assertFalse(C.event_sent())
def test_eta_task(self):
with self._context(self.add.s(2, 2).set(countdown=10)) as C:
C()
self.assertTrue(C.was_scheduled())
C.consumer.qos.increment_eventually.assert_called_with()
def test_eta_task_utc_disabled(self):
with self._context(self.add.s(2, 2).set(countdown=10), utc=False) as C:
C()
self.assertTrue(C.was_scheduled())
C.consumer.qos.increment_eventually.assert_called_with()
def test_when_rate_limited(self):
task = self.add.s(2, 2)
with self._context(task, rate_limits=True, limit='1/m') as C:
C()
self.assertTrue(C.was_rate_limited())
def test_when_rate_limited__limits_disabled(self):
task = self.add.s(2, 2)
with self._context(task, rate_limits=False, limit='1/m') as C:
C()
self.assertTrue(C.was_reserved())
def test_when_revoked(self):
task = self.add.s(2, 2)
task.freeze()
state.revoked.add(task.id)
try:
with self._context(task) as C:
C()
with self.assertRaises(ValueError):
C.get_request()
finally:
state.revoked.discard(task.id)
|