This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/app/test_routes.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from __future__ import absolute_import

from kombu import Exchange
from kombu.utils.functional import maybe_evaluate

from celery.app import routes
from celery.exceptions import QueueNotFound
from celery.utils.functional import LRUCache

from celery.tests.case import AppCase


def Router(app, *args, **kwargs):
    return routes.Router(*args, app=app, **kwargs)


def E(app, queues):
    def expand(answer):
        return Router(app, [], queues).expand_destination(answer)
    return expand


def set_queues(app, **queues):
    app.conf.CELERY_QUEUES = queues
    app.amqp.queues = app.amqp.Queues(queues)


class RouteCase(AppCase):

    def setup(self):
        self.a_queue = {
            'exchange': 'fooexchange',
            'exchange_type': 'fanout',
            'routing_key': 'xuzzy',
        }
        self.b_queue = {
            'exchange': 'barexchange',
            'exchange_type': 'topic',
            'routing_key': 'b.b.#',
        }
        self.d_queue = {
            'exchange': self.app.conf.CELERY_DEFAULT_EXCHANGE,
            'exchange_type': self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
            'routing_key': self.app.conf.CELERY_DEFAULT_ROUTING_KEY,
        }

        @self.app.task(shared=False)
        def mytask():
            pass
        self.mytask = mytask


class test_MapRoute(RouteCase):

    def test_route_for_task_expanded_route(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        expand = E(self.app, self.app.amqp.queues)
        route = routes.MapRoute({self.mytask.name: {'queue': 'foo'}})
        self.assertEqual(
            expand(route.route_for_task(self.mytask.name))['queue'].name,
            'foo',
        )
        self.assertIsNone(route.route_for_task('celery.awesome'))

    def test_route_for_task(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        expand = E(self.app, self.app.amqp.queues)
        route = routes.MapRoute({self.mytask.name: self.b_queue})
        self.assertDictContainsSubset(
            self.b_queue,
            expand(route.route_for_task(self.mytask.name)),
        )
        self.assertIsNone(route.route_for_task('celery.awesome'))

    def test_expand_route_not_found(self):
        expand = E(self.app, self.app.amqp.Queues(
                   self.app.conf.CELERY_QUEUES, False))
        route = routes.MapRoute({'a': {'queue': 'x'}})
        with self.assertRaises(QueueNotFound):
            expand(route.route_for_task('a'))


class test_lookup_route(RouteCase):

    def test_init_queues(self):
        router = Router(self.app, queues=None)
        self.assertDictEqual(router.queues, {})

    def test_lookup_takes_first(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        R = routes.prepare(({self.mytask.name: {'queue': 'bar'}},
                            {self.mytask.name: {'queue': 'foo'}}))
        router = Router(self.app, R, self.app.amqp.queues)
        self.assertEqual(router.route({}, self.mytask.name,
                         args=[1, 2], kwargs={})['queue'].name, 'bar')

    def test_expands_queue_in_options(self):
        set_queues(self.app)
        R = routes.prepare(())
        router = Router(
            self.app, R, self.app.amqp.queues, create_missing=True,
        )
        # apply_async forwards all arguments, even exchange=None etc,
        # so need to make sure it's merged correctly.
        route = router.route(
            {'queue': 'testq',
             'exchange': None,
             'routing_key': None,
             'immediate': False},
            self.mytask.name,
            args=[1, 2], kwargs={},
        )
        self.assertEqual(route['queue'].name, 'testq')
        self.assertEqual(route['queue'].exchange, Exchange('testq'))
        self.assertEqual(route['queue'].routing_key, 'testq')
        self.assertEqual(route['immediate'], False)

    def test_expand_destination_string(self):
        set_queues(self.app, foo=self.a_queue, bar=self.b_queue)
        x = Router(self.app, {}, self.app.amqp.queues)
        dest = x.expand_destination('foo')
        self.assertEqual(dest['queue'].name, 'foo')

    def test_lookup_paths_traversed(self):
        set_queues(
            self.app, foo=self.a_queue, bar=self.b_queue,
            **{self.app.conf.CELERY_DEFAULT_QUEUE: self.d_queue}
        )
        R = routes.prepare((
            {'celery.xaza': {'queue': 'bar'}},
            {self.mytask.name: {'queue': 'foo'}}
        ))
        router = Router(self.app, R, self.app.amqp.queues)
        self.assertEqual(router.route({}, self.mytask.name,
                         args=[1, 2], kwargs={})['queue'].name, 'foo')
        self.assertEqual(
            router.route({}, 'celery.poza')['queue'].name,
            self.app.conf.CELERY_DEFAULT_QUEUE,
        )


class test_prepare(AppCase):

    def test_prepare(self):
        o = object()
        R = [{'foo': 'bar'},
             'celery.utils.functional.LRUCache', o]
        p = routes.prepare(R)
        self.assertIsInstance(p[0], routes.MapRoute)
        self.assertIsInstance(maybe_evaluate(p[1]), LRUCache)
        self.assertIs(p[2], o)

        self.assertEqual(routes.prepare(o), [o])

    def test_prepare_item_is_dict(self):
        R = {'foo': 'bar'}
        p = routes.prepare(R)
        self.assertIsInstance(p[0], routes.MapRoute)