This file is indexed.

/usr/lib/python2.7/dist-packages/oslo_messaging/tests/drivers/zmq/test_zmq_async.py is in python-oslo.messaging 4.6.1-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import mock
import testtools

from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils

zmq = zmq_async.import_zmq()


class TestImportZmq(test_utils.BaseTestCase):

    @testtools.skipIf(zmq is None, "zmq not available")
    def setUp(self):
        super(TestImportZmq, self).setUp()

    def test_config_short_names_are_converted_to_correct_module_names(self):
        mock_try_import = mock.Mock()
        zmq_async.importutils.try_import = mock_try_import

        zmq_async.importutils.try_import.return_value = 'mock zmq module'
        self.assertEqual('mock zmq module', zmq_async.import_zmq('native'))
        mock_try_import.assert_called_with('zmq', default=None)

        zmq_async.importutils.try_import.return_value = 'mock eventlet module'
        self.assertEqual('mock eventlet module',
                         zmq_async.import_zmq('eventlet'))
        mock_try_import.assert_called_with('eventlet.green.zmq', default=None)

    def test_when_no_args_then_default_zmq_module_is_loaded(self):
        mock_try_import = mock.Mock()
        zmq_async.importutils.try_import = mock_try_import

        zmq_async.import_zmq()

        mock_try_import.assert_called_with('eventlet.green.zmq', default=None)

    def test_invalid_config_value_raise_ValueError(self):
        invalid_opt = 'x'

        errmsg = 'Invalid zmq_concurrency value: x'
        with self.assertRaisesRegexp(ValueError, errmsg):
            zmq_async.import_zmq(invalid_opt)


class TestGetPoller(test_utils.BaseTestCase):

    @testtools.skipIf(zmq is None, "zmq not available")
    def setUp(self):
        super(TestGetPoller, self).setUp()

    def test_when_no_arg_to_get_poller_then_return_default_poller(self):
        zmq_async._is_eventlet_zmq_available = lambda: True

        actual = zmq_async.get_poller()

        self.assertTrue(isinstance(actual, green_poller.GreenPoller))

    def test_when_native_poller_requested_then_return_ThreadingPoller(self):
        actual = zmq_async.get_poller('native')

        self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))

    def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
        zmq_async._is_eventlet_zmq_available = lambda: False

        actual = zmq_async.get_poller('eventlet')

        self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))

    def test_when_eventlet_is_available_then_return_GreenPoller(self):
        zmq_async._is_eventlet_zmq_available = lambda: True

        actual = zmq_async.get_poller('eventlet')

        self.assertTrue(isinstance(actual, green_poller.GreenPoller))

    def test_invalid_config_value_raise_ValueError(self):
        invalid_opt = 'x'

        errmsg = 'Invalid zmq_concurrency value: x'
        with self.assertRaisesRegexp(ValueError, errmsg):
            zmq_async.get_poller(invalid_opt)


class TestGetReplyPoller(test_utils.BaseTestCase):

    @testtools.skipIf(zmq is None, "zmq not available")
    def setUp(self):
        super(TestGetReplyPoller, self).setUp()

    def test_default_reply_poller_is_HoldReplyPoller(self):
        zmq_async._is_eventlet_zmq_available = lambda: True

        actual = zmq_async.get_reply_poller()

        self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))

    def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
        zmq_async._is_eventlet_zmq_available = lambda: True

        actual = zmq_async.get_reply_poller('eventlet')

        self.assertTrue(isinstance(actual, green_poller.HoldReplyPoller))

    def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
        zmq_async._is_eventlet_zmq_available = lambda: False

        actual = zmq_async.get_reply_poller('eventlet')

        self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))

    def test_invalid_config_value_raise_ValueError(self):
        invalid_opt = 'x'

        errmsg = 'Invalid zmq_concurrency value: x'
        with self.assertRaisesRegexp(ValueError, errmsg):
            zmq_async.get_reply_poller(invalid_opt)


class TestGetExecutor(test_utils.BaseTestCase):

    @testtools.skipIf(zmq is None, "zmq not available")
    def setUp(self):
        super(TestGetExecutor, self).setUp()

    def test_default_executor_is_GreenExecutor(self):
        zmq_async._is_eventlet_zmq_available = lambda: True

        executor = zmq_async.get_executor('any method')

        self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
        self.assertEqual('any method', executor._method)

    def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
        zmq_async._is_eventlet_zmq_available = lambda: True

        executor = zmq_async.get_executor('any method', 'eventlet')

        self.assertTrue(isinstance(executor, green_poller.GreenExecutor))
        self.assertEqual('any method', executor._method)

    def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
        zmq_async._is_eventlet_zmq_available = lambda: False

        executor = zmq_async.get_executor('any method', 'eventlet')

        self.assertTrue(isinstance(executor,
                                   threading_poller.ThreadingExecutor))
        self.assertEqual('any method', executor._method)

    def test_invalid_config_value_raise_ValueError(self):
        invalid_opt = 'x'

        errmsg = 'Invalid zmq_concurrency value: x'
        with self.assertRaisesRegexp(ValueError, errmsg):
            zmq_async.get_executor('any method', invalid_opt)