This file is indexed.

/usr/lib/python2.7/dist-packages/cinder/rpc.py is in python-cinder 2:12.0.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# Copyright 2013 Red Hat, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

__all__ = [
    'init',
    'cleanup',
    'set_defaults',
    'add_extra_exmods',
    'clear_extra_exmods',
    'get_allowed_exmods',
    'RequestContextSerializer',
    'get_client',
    'get_server',
    'get_notifier',
]

from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_utils import importutils
profiler = importutils.try_import('osprofiler.profiler')
import six

import cinder.context
import cinder.exception
from cinder.i18n import _
from cinder import objects
from cinder.objects import base
from cinder import utils

CONF = cfg.CONF
LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None

ALLOWED_EXMODS = [
    cinder.exception.__name__,
]
EXTRA_EXMODS = []


def init(conf):
    global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
    exmods = get_allowed_exmods()
    TRANSPORT = messaging.get_rpc_transport(conf,
                                            allowed_remote_exmods=exmods)
    NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
        conf,
        allowed_remote_exmods=exmods)

    # get_notification_transport has loaded oslo_messaging_notifications config
    # group, so we can now check if notifications are actually enabled.
    if utils.notifications_enabled(conf):
        json_serializer = messaging.JsonPayloadSerializer()
        serializer = RequestContextSerializer(json_serializer)
        NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
                                      serializer=serializer)
    else:
        NOTIFIER = utils.DO_NOTHING


def initialized():
    return None not in [TRANSPORT, NOTIFIER]


def cleanup():
    global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
    if NOTIFIER is None:
        LOG.exception("RPC cleanup: NOTIFIER is None")
    TRANSPORT.cleanup()
    NOTIFICATION_TRANSPORT.cleanup()
    TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None


def set_defaults(control_exchange):
    messaging.set_transport_defaults(control_exchange)


def add_extra_exmods(*args):
    EXTRA_EXMODS.extend(args)


def clear_extra_exmods():
    del EXTRA_EXMODS[:]


def get_allowed_exmods():
    return ALLOWED_EXMODS + EXTRA_EXMODS


class RequestContextSerializer(messaging.Serializer):

    def __init__(self, base):
        self._base = base

    def serialize_entity(self, context, entity):
        if not self._base:
            return entity
        return self._base.serialize_entity(context, entity)

    def deserialize_entity(self, context, entity):
        if not self._base:
            return entity
        return self._base.deserialize_entity(context, entity)

    def serialize_context(self, context):
        _context = context.to_dict()
        if profiler is not None:
            prof = profiler.get()
            if prof:
                trace_info = {
                    "hmac_key": prof.hmac_key,
                    "base_id": prof.get_base_id(),
                    "parent_id": prof.get_id()
                }
                _context.update({"trace_info": trace_info})
        return _context

    def deserialize_context(self, context):
        trace_info = context.pop("trace_info", None)
        if trace_info:
            if profiler is not None:
                profiler.init(**trace_info)

        return cinder.context.RequestContext.from_dict(context)


def get_client(target, version_cap=None, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return messaging.RPCClient(TRANSPORT,
                               target,
                               version_cap=version_cap,
                               serializer=serializer)


def get_server(target, endpoints, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer,
                                    access_policy=access_policy)


@utils.if_notifications_enabled
def get_notifier(service=None, host=None, publisher_id=None):
    assert NOTIFIER is not None
    if not publisher_id:
        publisher_id = "%s.%s" % (service, host or CONF.host)
    return NOTIFIER.prepare(publisher_id=publisher_id)


def assert_min_rpc_version(min_ver, exc=None):
    """Decorator to block RPC calls when version cap is lower than min_ver."""

    if exc is None:
        exc = cinder.exception.ServiceTooOld

    def decorator(f):
        @six.wraps(f)
        def _wrapper(self, *args, **kwargs):
            if not self.client.can_send_version(min_ver):
                msg = _('One of %(binary)s services is too old to accept '
                        '%(method)s request. Required RPC API version is '
                        '%(version)s. Are you running mixed versions of '
                        '%(binary)ss?') % {'binary': self.BINARY,
                                           'version': min_ver,
                                           'method': f.__name__}
                raise exc(msg)
            return f(self, *args, **kwargs)
        return _wrapper
    return decorator


LAST_RPC_VERSIONS = {}
LAST_OBJ_VERSIONS = {}


class RPCAPI(object):
    """Mixin class aggregating methods related to RPC API compatibility."""

    RPC_API_VERSION = '1.0'
    RPC_DEFAULT_VERSION = '1.0'
    TOPIC = ''
    BINARY = ''

    def __init__(self):
        target = messaging.Target(topic=self.TOPIC,
                                  version=self.RPC_API_VERSION)
        obj_version_cap = self.determine_obj_version_cap()
        serializer = base.CinderObjectSerializer(obj_version_cap)

        rpc_version_cap = self.determine_rpc_version_cap()
        self.client = get_client(target, version_cap=rpc_version_cap,
                                 serializer=serializer)

    def _compat_ver(self, current, *legacy):
        versions = (current,) + legacy
        for version in versions[:-1]:
            if self.client.can_send_version(version):
                return version
        return versions[-1]

    def _get_cctxt(self, version=None, **kwargs):
        """Prepare client context

        Version parameter accepts single version string or tuple of strings.
        Compatible version can be obtained later using:
            cctxt = _get_cctxt(...)
            version = cctxt.target.version
        """
        if version is None:
            version = self.RPC_DEFAULT_VERSION
        if isinstance(version, tuple):
            version = self._compat_ver(*version)
        return self.client.prepare(version=version, **kwargs)

    @classmethod
    def determine_rpc_version_cap(cls):
        global LAST_RPC_VERSIONS
        if cls.BINARY in LAST_RPC_VERSIONS:
            return LAST_RPC_VERSIONS[cls.BINARY]

        version_cap = objects.Service.get_minimum_rpc_version(
            cinder.context.get_admin_context(), cls.BINARY)
        if not version_cap:
            # If there is no service we assume they will come up later and will
            # have the same version as we do.
            version_cap = cls.RPC_API_VERSION
        LOG.info('Automatically selected %(binary)s RPC version '
                 '%(version)s as minimum service version.',
                 {'binary': cls.BINARY, 'version': version_cap})
        LAST_RPC_VERSIONS[cls.BINARY] = version_cap
        return version_cap

    @classmethod
    def determine_obj_version_cap(cls):
        global LAST_OBJ_VERSIONS
        if cls.BINARY in LAST_OBJ_VERSIONS:
            return LAST_OBJ_VERSIONS[cls.BINARY]

        version_cap = objects.Service.get_minimum_obj_version(
            cinder.context.get_admin_context(), cls.BINARY)
        # If there is no service we assume they will come up later and will
        # have the same version as we do.
        if not version_cap:
            version_cap = base.OBJ_VERSIONS.get_current()
        LOG.info('Automatically selected %(binary)s objects version '
                 '%(version)s as minimum service version.',
                 {'binary': cls.BINARY, 'version': version_cap})
        LAST_OBJ_VERSIONS[cls.BINARY] = version_cap
        return version_cap