This file is indexed.

/usr/lib/python3/dist-packages/shade/meta.py is in python3-shade 1.7.0-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import munch
import ipaddress
import six

from shade import exc
from shade import _log


NON_CALLABLES = (six.string_types, bool, dict, int, float, list, type(None))

log = _log.setup_logging(__name__)


def find_nova_addresses(addresses, ext_tag=None, key_name=None, version=4):

    ret = []
    for (k, v) in iter(addresses.items()):
        if key_name is not None and k != key_name:
            # key_name is specified and it doesn't match the current network.
            # Continue with the next one
            continue

        for interface_spec in v:
            if ext_tag is not None:
                if 'OS-EXT-IPS:type' not in interface_spec:
                    # ext_tag is specified, but this interface has no tag
                    # We could actually return right away as this means that
                    # this cloud doesn't support OS-EXT-IPS. Nevertheless,
                    # it would be better to perform an explicit check. e.g.:
                    #   cloud._has_nova_extension('OS-EXT-IPS')
                    # But this needs cloud to be passed to this function.
                    continue
                elif interface_spec['OS-EXT-IPS:type'] != ext_tag:
                    # Type doesn't match, continue with next one
                    continue

            if interface_spec['version'] == version:
                ret.append(interface_spec['addr'])

    return ret


def get_server_ip(server, **kwargs):
    addrs = find_nova_addresses(server['addresses'], **kwargs)
    if not addrs:
        return None
    return addrs[0]


def get_server_private_ip(server, cloud=None):
    """Find the private IP address

    If Neutron is available, search for a port on a network where
    `router:external` is False and `shared` is False. This combination
    indicates a private network with private IP addresses. This port should
    have the private IP.

    If Neutron is not available, or something goes wrong communicating with it,
    as a fallback, try the list of addresses associated with the server dict,
    looking for an IP type tagged as 'fixed' in the network named 'private'.

    Last resort, ignore the IP type and just look for an IP on the 'private'
    network (e.g., Rackspace).
    """
    if cloud and not cloud.use_internal_network():
        return None

    # Short circuit the ports/networks search below with a heavily cached
    # and possibly pre-configured network name
    if cloud:
        int_nets = cloud.get_internal_networks()
        for int_net in int_nets:
            int_ip = get_server_ip(server, key_name=int_net['name'])
            if int_ip is not None:
                return int_ip

    ip = get_server_ip(server, ext_tag='fixed', key_name='private')
    if ip:
        return ip

    # Last resort, and Rackspace
    return get_server_ip(server, key_name='private')


def get_server_external_ipv4(cloud, server):
    """Find an externally routable IP for the server.

    There are 5 different scenarios we have to account for:

    * Cloud has externally routable IP from neutron but neutron APIs don't
      work (only info available is in nova server record) (rackspace)
    * Cloud has externally routable IP from neutron (runabove, ovh)
    * Cloud has externally routable IP from neutron AND supports optional
      private tenant networks (vexxhost, unitedstack)
    * Cloud only has private tenant network provided by neutron and requires
      floating-ip for external routing (dreamhost, hp)
    * Cloud only has private tenant network provided by nova-network and
      requires floating-ip for external routing (auro)

    :param cloud: the cloud we're working with
    :param server: the server dict from which we want to get an IPv4 address
    :return: a string containing the IPv4 address or None
    """

    if not cloud.use_external_network():
        return None

    if server['accessIPv4']:
        return server['accessIPv4']

    # Short circuit the ports/networks search below with a heavily cached
    # and possibly pre-configured network name
    ext_nets = cloud.get_external_networks()
    for ext_net in ext_nets:
        ext_ip = get_server_ip(server, key_name=ext_net['name'])
        if ext_ip is not None:
            return ext_ip

    # Try to get a floating IP address
    # Much as I might find floating IPs annoying, if it has one, that's
    # almost certainly the one that wants to be used
    ext_ip = get_server_ip(server, ext_tag='floating')
    if ext_ip is not None:
        return ext_ip

    # The cloud doesn't support Neutron or Neutron can't be contacted. The
    # server might have fixed addresses that are reachable from outside the
    # cloud (e.g. Rax) or have plain ol' floating IPs

    # Try to get an address from a network named 'public'
    ext_ip = get_server_ip(server, key_name='public')
    if ext_ip is not None:
        return ext_ip

    # Nothing else works, try to find a globally routable IP address
    for interfaces in server['addresses'].values():
        for interface in interfaces:
            try:
                ip = ipaddress.ip_address(interface['addr'])
            except Exception:
                # Skip any error, we're looking for a working ip - if the
                # cloud returns garbage, it wouldn't be the first weird thing
                # but it still doesn't meet the requirement of "be a working
                # ip address"
                continue
            if ip.version == 4 and not ip.is_private:
                return str(ip)

    return None


def get_server_external_ipv6(server):
    """ Get an IPv6 address reachable from outside the cloud.

    This function assumes that if a server has an IPv6 address, that address
    is reachable from outside the cloud.

    :param server: the server from which we want to get an IPv6 address
    :return: a string containing the IPv6 address or None
    """
    if server['accessIPv6']:
        return server['accessIPv6']
    addresses = find_nova_addresses(addresses=server['addresses'], version=6)
    if addresses:
        return addresses[0]
    return None


def get_groups_from_server(cloud, server, server_vars):
    groups = []

    region = cloud.region_name
    cloud_name = cloud.name

    # Create a group for the cloud
    groups.append(cloud_name)

    # Create a group on region
    groups.append(region)

    # And one by cloud_region
    groups.append("%s_%s" % (cloud_name, region))

    # Check if group metadata key in servers' metadata
    group = server['metadata'].get('group')
    if group:
        groups.append(group)

    for extra_group in server['metadata'].get('groups', '').split(','):
        if extra_group:
            groups.append(extra_group)

    groups.append('instance-%s' % server['id'])

    for key in ('flavor', 'image'):
        if 'name' in server_vars[key]:
            groups.append('%s-%s' % (key, server_vars[key]['name']))

    for key, value in iter(server['metadata'].items()):
        groups.append('meta-%s_%s' % (key, value))

    az = server_vars.get('az', None)
    if az:
        # Make groups for az, region_az and cloud_region_az
        groups.append(az)
        groups.append('%s_%s' % (region, az))
        groups.append('%s_%s_%s' % (cloud.name, region, az))
    return groups


def expand_server_vars(cloud, server):
    """Backwards compatibility function."""
    return add_server_interfaces(cloud, server)


def add_server_interfaces(cloud, server):
    """Add network interface information to server.

    Query the cloud as necessary to add information to the server record
    about the network information needed to interface with the server.

    Ensures that public_v4, public_v6, private_v4, private_v6, interface_ip,
                 accessIPv4 and accessIPv6 are always set.
    """
    # First, add an IP address. Set it to '' rather than None if it does
    # not exist to remain consistent with the pre-existing missing values
    server['public_v4'] = get_server_external_ipv4(cloud, server) or ''
    server['public_v6'] = get_server_external_ipv6(server) or ''
    server['private_v4'] = get_server_private_ip(server, cloud) or ''
    interface_ip = None
    if cloud.private and server['private_v4']:
        interface_ip = server['private_v4']
    else:
        if (server['public_v6'] and cloud._local_ipv6
            and not cloud.force_ipv4):
            interface_ip = server['public_v6']
        else:
            interface_ip = server['public_v4']
    if interface_ip:
        server['interface_ip'] = interface_ip

    # Some clouds do not set these, but they're a regular part of the Nova
    # server record. Since we know them, go ahead and set them. In the case
    # where they were set previous, we use the values, so this will not break
    # clouds that provide the information
    if cloud.private and server['private_v4']:
        server['accessIPv4'] = server['private_v4']
    else:
        server['accessIPv4'] = server['public_v4']
    server['accessIPv6'] = server['public_v6']

    return server


def expand_server_security_groups(cloud, server):
    try:
        groups = cloud.list_server_security_groups(server)
    except exc.OpenStackCloudException:
        groups = []
    server['security_groups'] = groups


def get_hostvars_from_server(cloud, server, mounts=None):
    """Expand additional server information useful for ansible inventory.

    Variables in this function may make additional cloud queries to flesh out
    possibly interesting info, making it more expensive to call than
    expand_server_vars if caching is not set up. If caching is set up,
    the extra cost should be minimal.
    """
    server_vars = add_server_interfaces(cloud, server)

    flavor_id = server['flavor']['id']
    flavor_name = cloud.get_flavor_name(flavor_id)
    if flavor_name:
        server_vars['flavor']['name'] = flavor_name

    expand_server_security_groups(cloud, server)

    # OpenStack can return image as a string when you've booted from volume
    if str(server['image']) == server['image']:
        image_id = server['image']
        server_vars['image'] = dict(id=image_id)
    else:
        image_id = server['image'].get('id', None)
    if image_id:
        image_name = cloud.get_image_name(image_id)
        if image_name:
            server_vars['image']['name'] = image_name

    volumes = []
    if cloud.has_service('volume'):
        try:
            for volume in cloud.get_volumes(server):
                # Make things easier to consume elsewhere
                volume['device'] = volume['attachments'][0]['device']
                volumes.append(volume)
        except exc.OpenStackCloudException:
            pass
    server_vars['volumes'] = volumes
    if mounts:
        for mount in mounts:
            for vol in server_vars['volumes']:
                if vol['display_name'] == mount['display_name']:
                    if 'mount' in mount:
                        vol['mount'] = mount['mount']

    return server_vars


def _add_request_id(obj, request_id):
    if request_id is not None:
        obj['x_openstack_request_ids'] = [request_id]
    return obj


def obj_to_dict(obj, request_id=None):
    """ Turn an object with attributes into a dict suitable for serializing.

    Some of the things that are returned in OpenStack are objects with
    attributes. That's awesome - except when you want to expose them as JSON
    structures. We use this as the basis of get_hostvars_from_server above so
    that we can just have a plain dict of all of the values that exist in the
    nova metadata for a server.
    """
    if obj is None:
        return None
    elif isinstance(obj, munch.Munch) or hasattr(obj, 'mock_add_spec'):
        # If we obj_to_dict twice, don't fail, just return the munch
        # Also, don't try to modify Mock objects - that way lies madness
        return obj
    elif hasattr(obj, 'schema') and hasattr(obj, 'validate'):
        # It's a warlock
        return _add_request_id(warlock_to_dict(obj), request_id)
    elif isinstance(obj, dict):
        # The new request-id tracking spec:
        # https://specs.openstack.org/openstack/nova-specs/specs/juno/approved/log-request-id-mappings.html
        # adds a request-ids attribute to returned objects. It does this even
        # with dicts, which now become dict subclasses. So we want to convert
        # the dict we get, but we also want it to fall through to object
        # attribute processing so that we can also get the request_ids
        # data into our resulting object.
        instance = munch.Munch(obj)
    else:
        instance = munch.Munch()

    for key in dir(obj):
        value = getattr(obj, key)
        if isinstance(value, NON_CALLABLES) and not key.startswith('_'):
            instance[key] = value
    return _add_request_id(instance, request_id)


def obj_list_to_dict(obj_list, request_id=None):
    """Enumerate through lists of objects and return lists of dictonaries.

    Some of the objects returned in OpenStack are actually lists of objects,
    and in order to expose the data structures as JSON, we need to facilitate
    the conversion to lists of dictonaries.
    """
    new_list = []
    for obj in obj_list:
        new_list.append(obj_to_dict(obj, request_id=request_id))
    return new_list


def warlock_to_dict(obj):
    # glanceclient v2 uses warlock to construct its objects. Warlock does
    # deep black magic to attribute look up to support validation things that
    # means we cannot use normal obj_to_dict
    obj_dict = munch.Munch()
    for (key, value) in obj.items():
        if isinstance(value, NON_CALLABLES) and not key.startswith('_'):
            obj_dict[key] = value
    return obj_dict