/usr/lib/python2.7/dist-packages/celery/tests/app/test_routes.py is in python-celery 3.1.23-7.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | from __future__ import absolute_import
from kombu import Exchange
from kombu.utils.functional import maybe_evaluate
from celery.app import routes
from celery.exceptions import QueueNotFound
from celery.utils.functional import LRUCache
from celery.tests.case import AppCase
def Router(app, *args, **kwargs):
return routes.Router(*args, app=app, **kwargs)
def E(app, queues):
def expand(answer):
return Router(app, [], queues).expand_destination(answer)
return expand
def set_queues(app, **queues):
app.conf.CELERY_QUEUES = queues
app.amqp.queues = app.amqp.Queues(queues)
class RouteCase(AppCase):
def setup(self):
self.a_queue = {
'exchange': 'fooexchange',
'exchange_type': 'fanout',
'routing_key': 'xuzzy',
}
self.b_queue = {
'exchange': 'barexchange',
'exchange_type': 'topic',
'routing_key': 'b.b.#',
}
self.d_queue = {
'exchange': self.app.conf.CELERY_DEFAULT_EXCHANGE,
'exchange_type': self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
'routing_key': self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
}
@self.app.task(shared=False)
def mytask():
pass
self.mytask = mytask
class test_MapRoute(RouteCase):
def test_route_for_task_expanded_route(self):
set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
expand = E(self.app, self.app.amqp.queues)
route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
self.assertEqual(
expand(route.route_for_task(self.mytask.name))['queue'].name,
'foo',
)
self.assertIsNone(route.route_for_task('celery.awesome'))
def test_route_for_task(self):
set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
expand = E(self.app, self.app.amqp.queues)
route = routes.MapRoute({self.mytask.name: self.b_queue})
self.assertDictContainsSubset(
self.b_queue,
expand(route.route_for_task(self.mytask.name)),
)
self.assertIsNone(route.route_for_task('celery.awesome'))
def test_expand_route_not_found(self):
expand = E(self.app, self.app.amqp.Queues(
self.app.conf.CELERY_QUEUES, False))
route = routes.MapRoute({'a': {'queue': 'x'}})
with self.assertRaises(QueueNotFound):
expand(route.route_for_task('a'))
class test_lookup_route(RouteCase):
def test_init_queues(self):
router = Router(self.app, queues=None)
self.assertDictEqual(router.queues, {})
def test_lookup_takes_first(self):
set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
{self.mytask.name: {'queue': 'foo'}}))
router = Router(self.app, R, self.app.amqp.queues)
self.assertEqual(router.route({}, self.mytask.name,
args=[1, 2], kwargs={})['queue'].name, 'bar')
def test_expands_queue_in_options(self):
set_queues(self.app)
R = routes.prepare(())
router = Router(
self.app, R, self.app.amqp.queues, create_missing=True,
)
# apply_async forwards all arguments, even exchange=None etc,
# so need to make sure it's merged correctly.
route = router.route(
{'queue': 'testq',
'exchange': None,
'routing_key': None,
'immediate': False},
self.mytask.name,
args=[1, 2], kwargs={},
)
self.assertEqual(route['queue'].name, 'testq')
self.assertEqual(route['queue'].exchange, Exchange('testq'))
self.assertEqual(route['queue'].routing_key, 'testq')
self.assertEqual(route['immediate'], False)
def test_expand_destination_string(self):
set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
x = Router(self.app, {}, self.app.amqp.queues)
dest = x.expand_destination('foo')
self.assertEqual(dest['queue'].name, 'foo')
def test_lookup_paths_traversed(self):
set_queues(
self.app, foo=self.a_queue, bar=self.b_queue,
**{self.app.conf.CELERY_DEFAULT_QUEUE: self.d_queue}
)
R = routes.prepare((
{'celery.xaza': {'queue': 'bar'}},
{self.mytask.name: {'queue': 'foo'}}
))
router = Router(self.app, R, self.app.amqp.queues)
self.assertEqual(router.route({}, self.mytask.name,
args=[1, 2], kwargs={})['queue'].name, 'foo')
self.assertEqual(
router.route({}, 'celery.poza')['queue'].name,
self.app.conf.CELERY_DEFAULT_QUEUE,
)
class test_prepare(AppCase):
def test_prepare(self):
o = object()
R = [{'foo': 'bar'},
'celery.utils.functional.LRUCache', o]
p = routes.prepare(R)
self.assertIsInstance(p[0], routes.MapRoute)
self.assertIsInstance(maybe_evaluate(p[1]), LRUCache)
self.assertIs(p[2], o)
self.assertEqual(routes.prepare(o), [o])
def test_prepare_item_is_dict(self):
R = {'foo': 'bar'}
p = routes.prepare(R)
self.assertIsInstance(p[0], routes.MapRoute)
|