This file is indexed.

/usr/lib/python2.7/dist-packages/neutron/manager.py is in python-neutron 2:8.0.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# Copyright 2011 VMware, Inc
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import weakref

from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import periodic_task
import six

from neutron._i18n import _, _LI
from neutron.common import utils
from neutron.plugins.common import constants


LOG = logging.getLogger(__name__)

CORE_PLUGINS_NAMESPACE = 'neutron.core_plugins'


class Manager(periodic_task.PeriodicTasks):

    # Set RPC API version to 1.0 by default.
    target = oslo_messaging.Target(version='1.0')

    def __init__(self, host=None):
        if not host:
            host = cfg.CONF.host
        self.host = host
        conf = getattr(self, "conf", cfg.CONF)
        super(Manager, self).__init__(conf)

    def periodic_tasks(self, context, raise_on_error=False):
        self.run_periodic_tasks(context, raise_on_error=raise_on_error)

    def init_host(self):
        """Handle initialization if this is a standalone service.

        Child classes should override this method.

        """
        pass

    def after_start(self):
        """Handler post initialization stuff.

        Child classes can override this method.
        """
        pass


def validate_post_plugin_load():
    """Checks if the configuration variables are valid.

    If the configuration is invalid then the method will return an error
    message. If all is OK then it will return None.
    """
    if ('dhcp_agents_per_network' in cfg.CONF and
        cfg.CONF.dhcp_agents_per_network <= 0):
        msg = _("dhcp_agents_per_network must be >= 1. '%s' "
                "is invalid.") % cfg.CONF.dhcp_agents_per_network
        return msg


def validate_pre_plugin_load():
    """Checks if the configuration variables are valid.

    If the configuration is invalid then the method will return an error
    message. If all is OK then it will return None.
    """
    if cfg.CONF.core_plugin is None:
        msg = _('Neutron core_plugin not configured!')
        return msg


class NeutronManager(object):
    """Neutron's Manager class.

    Neutron's Manager class is responsible for parsing a config file and
    instantiating the correct plugin that concretely implements
    neutron_plugin_base class.
    The caller should make sure that NeutronManager is a singleton.
    """
    _instance = None

    def __init__(self, options=None, config_file=None):
        # If no options have been provided, create an empty dict
        if not options:
            options = {}

        msg = validate_pre_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # NOTE(jkoelker) Testing for the subclass with the __subclasshook__
        #                breaks tach monitoring. It has been removed
        #                intentionally to allow v2 plugins to be monitored
        #                for performance metrics.
        plugin_provider = cfg.CONF.core_plugin
        LOG.info(_LI("Loading core plugin: %s"), plugin_provider)
        self.plugin = self._get_plugin_instance(CORE_PLUGINS_NAMESPACE,
                                                plugin_provider)
        msg = validate_post_plugin_load()
        if msg:
            LOG.critical(msg)
            raise Exception(msg)

        # core plugin as a part of plugin collection simplifies
        # checking extensions
        # TODO(enikanorov): make core plugin the same as
        # the rest of service plugins
        self.service_plugins = {constants.CORE: self.plugin}
        self._load_service_plugins()
        # Used by pecan WSGI
        self.resource_plugin_mappings = {}
        self.resource_controller_mappings = {}

    @staticmethod
    def load_class_for_provider(namespace, plugin_provider):
        """Loads plugin using alias or class name
        :param namespace: namespace where alias is defined
        :param plugin_provider: plugin alias or class name
        :returns plugin that is loaded
        :raises ImportError if fails to load plugin
        """

        try:
            return utils.load_class_by_alias_or_classname(namespace,
                    plugin_provider)
        except ImportError:
            raise ImportError(_("Plugin '%s' not found.") % plugin_provider)

    def _get_plugin_instance(self, namespace, plugin_provider):
        plugin_class = self.load_class_for_provider(namespace, plugin_provider)
        return plugin_class()

    def _load_services_from_core_plugin(self):
        """Puts core plugin in service_plugins for supported services."""
        LOG.debug("Loading services supported by the core plugin")

        # supported service types are derived from supported extensions
        for ext_alias in getattr(self.plugin,
                                 "supported_extension_aliases", []):
            if ext_alias in constants.EXT_TO_SERVICE_MAPPING:
                service_type = constants.EXT_TO_SERVICE_MAPPING[ext_alias]
                self.service_plugins[service_type] = self.plugin
                LOG.info(_LI("Service %s is supported by the core plugin"),
                         service_type)

    def _get_default_service_plugins(self):
        """Get default service plugins to be loaded."""
        return constants.DEFAULT_SERVICE_PLUGINS.keys()

    def _load_service_plugins(self):
        """Loads service plugins.

        Starts from the core plugin and checks if it supports
        advanced services then loads classes provided in configuration.
        """
        # load services from the core plugin first
        self._load_services_from_core_plugin()

        plugin_providers = cfg.CONF.service_plugins
        plugin_providers.extend(self._get_default_service_plugins())
        LOG.debug("Loading service plugins: %s", plugin_providers)
        for provider in plugin_providers:
            if provider == '':
                continue

            LOG.info(_LI("Loading Plugin: %s"), provider)
            plugin_inst = self._get_plugin_instance('neutron.service_plugins',
                                                    provider)

            # only one implementation of svc_type allowed
            # specifying more than one plugin
            # for the same type is a fatal exception
            if plugin_inst.get_plugin_type() in self.service_plugins:
                raise ValueError(_("Multiple plugins for service "
                                   "%s were configured") %
                                 plugin_inst.get_plugin_type())

            self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst

            # search for possible agent notifiers declared in service plugin
            # (needed by agent management extension)
            if (hasattr(self.plugin, 'agent_notifiers') and
                    hasattr(plugin_inst, 'agent_notifiers')):
                self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)

            LOG.debug("Successfully loaded %(type)s plugin. "
                      "Description: %(desc)s",
                      {"type": plugin_inst.get_plugin_type(),
                       "desc": plugin_inst.get_plugin_description()})

    @classmethod
    @utils.synchronized("manager")
    def _create_instance(cls):
        if not cls.has_instance():
            cls._instance = cls()

    @classmethod
    def has_instance(cls):
        return cls._instance is not None

    @classmethod
    def clear_instance(cls):
        cls._instance = None

    @classmethod
    def get_instance(cls):
        # double checked locking
        if not cls.has_instance():
            cls._create_instance()
        return cls._instance

    @classmethod
    def get_plugin(cls):
        # Return a weakref to minimize gc-preventing references.
        return weakref.proxy(cls.get_instance().plugin)

    @classmethod
    def get_service_plugins(cls):
        # Return weakrefs to minimize gc-preventing references.
        service_plugins = cls.get_instance().service_plugins
        return dict((x, weakref.proxy(y))
                    for x, y in six.iteritems(service_plugins))

    @classmethod
    def get_unique_service_plugins(cls):
        service_plugins = cls.get_instance().service_plugins
        return tuple(weakref.proxy(x) for x in set(service_plugins.values()))

    @classmethod
    def set_plugin_for_resource(cls, resource, plugin):
        cls.get_instance().resource_plugin_mappings[resource] = plugin

    @classmethod
    def get_plugin_for_resource(cls, resource):
        return cls.get_instance().resource_plugin_mappings.get(resource)

    @classmethod
    def set_controller_for_resource(cls, resource, controller):
        cls.get_instance().resource_controller_mappings[resource] = controller

    @classmethod
    def get_controller_for_resource(cls, resource):
        res_ctrl_mappings = cls.get_instance().resource_controller_mappings
        # If no controller is found for resource, try replacing dashes with
        # underscores
        return res_ctrl_mappings.get(
            resource,
            res_ctrl_mappings.get(resource.replace('-', '_')))

    @classmethod
    def get_service_plugin_by_path_prefix(cls, path_prefix):
        service_plugins = cls.get_unique_service_plugins()
        for service_plugin in service_plugins:
            plugin_path_prefix = getattr(service_plugin, 'path_prefix', None)
            if plugin_path_prefix and plugin_path_prefix == path_prefix:
                return service_plugin