This file is indexed.

/usr/lib/python3/dist-packages/os_win/utils/hostutils10.py is in python3-os-win 3.0.0-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import re

from os_win._i18n import _
from os_win import exceptions
from os_win.utils import hostutils
from oslo_log import log as logging

LOG = logging.getLogger(__name__)


class HostUtils10(hostutils.HostUtils):

    _HGS_NAMESPACE = '//%s/Root/Microsoft/Windows/Hgs'

    _PCI_VENDOR_ID_REGEX = re.compile('VEN_(.*)&DEV')
    _PCI_PRODUCT_ID_REGEX = re.compile('DEV_(.*)&SUBSYS')
    _PCI_ADDRESS_REGEX = re.compile(r'\b\d+\b')

    def __init__(self, host='.'):
        super(HostUtils10, self).__init__(host)
        self._conn_hgs_attr = None

    @property
    def _conn_hgs(self):
        if not self._conn_hgs_attr:
            try:
                namespace = self._HGS_NAMESPACE % self._host
                self._conn_hgs_attr = self._get_wmi_conn(namespace)
            except Exception:
                raise exceptions.OSWinException(
                    _("Namespace %(namespace)s is not supported on this "
                      "Windows version.") %
                    {'namespace': namespace})

        return self._conn_hgs_attr

    def is_host_guarded(self):
        """Checks the host is guarded so it can run Shielded VMs"""

        (return_code,
         host_config) = self._conn_hgs.MSFT_HgsClientConfiguration.Get()
        if return_code:
            LOG.warning('Retrieving the local Host Guardian Service '
                        'Client configuration failed with code: %s',
                        return_code)
            return False
        return host_config.IsHostGuarded

    def supports_nested_virtualization(self):
        """Checks if the host supports nested virtualization.

        :returns: True, Windows / Hyper-V Server 2016 or newer supports nested
            virtualization.
        """
        return True

    def get_pci_passthrough_devices(self):
        """Get host's assignable PCI devices.

        :returns: a list of the assignable PCI devices.
        """
        # NOTE(claudiub): pci_device_objects contains all available PCI
        # devices. When a PCI device is used, another object containing the
        # same devices_instance_path is added.
        pci_device_objects = self._conn.Msvm_PciExpress()

        pci_devices = []
        processed_pci_dev_path = []
        for pci_obj in pci_device_objects:
            pci_path = pci_obj.DeviceInstancePath
            if pci_path in processed_pci_dev_path:
                continue

            address = self._get_pci_device_address(pci_path)
            vendor_id = self._PCI_VENDOR_ID_REGEX.findall(pci_path)
            product_id = self._PCI_PRODUCT_ID_REGEX.findall(pci_path)

            if not (address and vendor_id and product_id):
                # vendor_id / product_id / address not found.
                # skip this PCI device.
                continue

            pci_devices.append({
                'address': address,
                'vendor_id': vendor_id[0],
                'product_id': product_id[0],
                'dev_id': pci_obj.DeviceID,
            })
            processed_pci_dev_path.append(pci_path)

        return pci_devices

    def _get_pci_device_address(self, pci_device_path):
        pnp_device = self._conn_cimv2.Win32_PnPEntity(DeviceID=pci_device_path)
        (return_code, pnp_device_props) = pnp_device[0].GetDeviceProperties()
        if return_code:
            # The properties of the Plug and Play device could not be retrieved
            LOG.debug('Failed to get PnP Device Properties for the PCI '
                      'device: %(pci_dev)s. (return_code=%(return_code)s',
                      {'pci_dev': pci_device_path, 'return_code': return_code})
            return None

        pnp_props = {prop.KeyName: prop.Data for prop in pnp_device_props}
        location_info = pnp_props.get('DEVPKEY_Device_LocationInfo')
        slot = pnp_props.get('DEVPKEY_Device_Address')

        try:
            [bus, domain, funct] = self._PCI_ADDRESS_REGEX.findall(
                location_info)
            address = "%04x:%02x:%02x.%1x" % (
                int(domain), int(bus), int(slot), int(funct))
            return address
        except Exception as ex:
            LOG.debug('Failed to get PCI device address. Device path: '
                      '%(device_path)s. Exception: %(ex)s',
                      {'device_path': pci_device_path, 'ex': ex})
            return None