This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/boot/windows.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# Copyright 2014-2016 Canonical Ltd.
# Copyright 2014 Cloudbase Solutions SRL.
# This software is licensed under the GNU Affero General Public License
# version 3 (see the file LICENSE).

"""Windows PXE Boot Method"""

__all__ = [
    'WindowsPXEBootMethod',
    ]

import os.path
import re
import shutil
import sys

from provisioningserver.boot import (
    BootMethod,
    BootMethodError,
    BytesReader,
    get_remote_mac,
)
from provisioningserver.config import ClusterConfiguration
from provisioningserver.logger import get_maas_logger
from provisioningserver.rpc import getRegionClient
from provisioningserver.rpc.exceptions import NoSuchNode
from provisioningserver.rpc.region import RequestNodeInfoByMACAddress
from provisioningserver.utils import (
    tftp,
    typed,
)
from provisioningserver.utils.fs import tempdir
from provisioningserver.utils.twisted import (
    asynchronous,
    deferred,
)
from tftp.backend import FilesystemReader
from twisted.internet.defer import (
    inlineCallbacks,
    returnValue,
    succeed,
)
from twisted.python.filepath import FilePath


maaslog = get_maas_logger("windows")


# These files do not exist in the tftproot. WindowsPXEBootMethod
# handles access to these files returning the correct version
# of the file for the booting version of Windows.
#
# Note: Each version of Windows can have different content for
# these files.
STATIC_FILES = [
    'pxeboot.0',
    'bootmgr.exe',
    '\\boot\\bcd',
    '\\boot\\winpe.wim',
    '\\boot\\boot.sdi',
    '\\boot\\font\\wgl4_boot.ttf',
    ]


def get_hivex_module():
    """Returns the hivex module if avaliable.

    python-hivex is an optional dependency, but it is needed
    before MAAS can boot Windows.
    """
    if 'hivex' not in sys.modules:
        try:
            __import__('hivex')
        except ImportError:
            return None
    return sys.modules['hivex']


def load_hivex(*args, **kwargs):
    """Returns the Hivex object."""
    module = get_hivex_module()
    if module is None:
        return None
    return module.Hivex(*args, **kwargs)


@asynchronous
def request_node_info_by_mac_address(mac_address):
    """Request node info for the given mac address.

    :param mac_address: The MAC Address of the node of the event.
    :type mac_address: unicode
    """
    if mac_address is None:
        maaslog.debug("Cannot determine node; MAC address is unknown.")
        return succeed(None)

    client = getRegionClient()
    d = client(RequestNodeInfoByMACAddress, mac_address=mac_address)

    def eb_request_node_info(failure):
        failure.trap(NoSuchNode)
        maaslog.debug("Node doesn't exist for MAC address: %s", mac_address)
        return None

    return d.addErrback(eb_request_node_info)


class Bcd:
    """Allows modification of the load options in a Windows boot
    configuration data file.

    References:
        http://msdn.microsoft.com/en-us/library/windows/desktop/
            - aa362652(v=vs.85).aspx
            - aa362641(v=vs.85).aspx
    """

    GUID_WINDOWS_BOOTMGR = '{9dea862c-5cdd-4e70-acc1-f32b344d4795}'
    BOOT_MGR_DISPLAY_ORDER = '24000001'
    LOAD_OPTIONS = '12000030'

    def __init__(self, filename):
        self.hive = load_hivex(filename, write=True)

        # uids
        objects = self._get_root_objects()
        self.uids = {}
        for i in self.hive.node_children(objects):
            self.uids[self.hive.node_name(i)] = self.hive.node_children(i)

        # default bootloader
        mgr = self.uids[self.GUID_WINDOWS_BOOTMGR][1]
        bootmgr_elems = dict([(self.hive.node_name(i), i) for i in
                             self.hive.node_children(mgr)])
        self.loader = self._get_loader(bootmgr_elems)

    def _get_root_elements(self):
        """Gets the root from the hive."""
        root = self.hive.root()
        r_elems = {}
        for i in self.hive.node_children(root):
            name = self.hive.node_name(i)
            r_elems[name] = i
        return r_elems

    def _get_root_objects(self):
        """Gets the root objects."""
        elems = self._get_root_elements()
        return elems['Objects']

    def _get_loader(self, bootmgr_elems):
        """Get default bootloader."""
        (val,) = self.hive.node_values(
            bootmgr_elems[self.BOOT_MGR_DISPLAY_ORDER])
        loader = self.hive.value_multiple_strings(val)[0]
        return loader

    def _get_loader_elems(self):
        """Get elements present in default boot loader. We need this
        in order to determine the loadoptions key.
        """
        return dict(
            [(self.hive.node_name(i), i)
                for i in self.hive.node_children(self.uids[self.loader][1])])

    def _get_load_options_key(self):
        """Gets the key containing the load options we want to edit."""
        load_elem = self._get_loader_elems()
        load_option_key = load_elem.get(self.LOAD_OPTIONS, None)
        return load_option_key

    def set_load_options(self, value):
        """Sets the loadoptions value to param:value."""
        h = self._get_load_options_key()
        if h is None:
            # No load options key in the hive, add the key
            # so the value can be set.
            h = self.hive.node_add_child(
                self.uids[self.loader][1], self.LOAD_OPTIONS)
        k_type = 1
        key = "Element"
        data = {
            't': k_type,
            'key': key,
            # Windows only accepts utf-16le in load options.
            'value': value.decode('utf-8').encode('utf-16le'),
            }
        self.hive.node_set_value(h, data)
        self.hive.commit(None)


class WindowsPXEBootMethod(BootMethod):

    name = "windows"
    bios_boot_method = "pxe"
    template_subdir = "windows"
    bootloader_path = "pxeboot.0"
    arch_octet = None

    @deferred
    def get_node_info(self):
        """Gets node information via the remote mac."""
        remote_mac = get_remote_mac()
        return request_node_info_by_mac_address(remote_mac)

    def clean_path(self, path):
        """Converts Windows path into a unix path and strips the
        boot subdirectory from the paths.
        """
        path = path.lower().replace('\\', '/')
        if path[0:6] == "/boot/":
            path = path[6:]
        return path

    @inlineCallbacks
    def match_path(self, backend, path):
        """Checks path to see if the boot method should handle
        the requested file.

        :param backend: requesting backend
        :param path: requested path
        :return: dict of match params from path, None if no match
        """
        # If the node is requesting the initial bootloader, then we
        # need to see if this node is set to boot Windows first.
        local_host, local_port = tftp.get_local_address()
        if path == 'pxelinux.0':
            data = yield self.get_node_info()
            if data is None:
                returnValue(None)

            # Only provide the Windows bootloader when installing
            # PXELINUX chainloading will work for the rest of the time.
            purpose = data.get('purpose')
            if purpose != 'install':
                returnValue(None)

            osystem = data.get('osystem')
            if osystem == 'windows':
                # python-hivex is needed to continue.
                if get_hivex_module() is None:
                    raise BootMethodError('python-hivex package is missing.')

                returnValue({
                    'mac': data.get('mac'),
                    'path': self.bootloader_path,
                    'local_host': local_host,
                    })
        # Fix the paths for the other static files, Windows requests.
        elif path.lower() in STATIC_FILES:
            returnValue({
                'mac': get_remote_mac(),
                'path': self.clean_path(path),
                'local_host': local_host,
                })
        returnValue(None)

    def get_reader(self, backend, kernel_params, **extra):
        """Render a configuration file as a unicode string.

        :param backend: requesting backend
        :param kernel_params: An instance of `KernelParameters`.
        :param extra: Allow for other arguments. This is a safety valve;
            parameters generated in another component (for example, see
            `TFTPBackend.get_boot_method_reader`) won't cause this to break.
        """
        path = extra['path']
        if path == 'bcd':
            local_host = extra['local_host']
            return self.compose_bcd(kernel_params, local_host)
        return self.output_static(kernel_params, path)

    @typed
    def link_bootloader(self, destination: str):
        """Installs the required files for Windows booting into the
        tftproot.

        Does nothing. Windows requires manual installation of bootloader
        files, due to licensing.
        """

    def compose_preseed_url(self, url):
        """Modifies the url to replace all forward slashes with
        backslashes, and prepends the ^ character to any upper-case
        characters.

        Boot load options of Windows will all be upper-cased
        as Windows does not care about case, and what gets exposed in the
        registry is all uppercase. MAAS requires a case-sensitive url.

        The Windows install script extracts the preseed url and any character
        that starts with ^ is then uppercased, so that the URL is correct.
        """
        url = url.replace('/', '\\')
        return re.sub(r"([A-Z])", r"^\1", url)

    def get_resource_path(self, kernel_params, path):
        """Gets the resource path from the kernel param."""
        with ClusterConfiguration.open() as config:
            resources = config.tftp_root
        return os.path.join(
            resources, 'windows', kernel_params.arch, kernel_params.subarch,
            kernel_params.release, kernel_params.label, path)

    def compose_bcd(self, kernel_params, local_host):
        """Composes the Windows boot configuration data.

        :param kernel_params: An instance of `KernelParameters`.
        :return: Binary data
        """
        preseed_url = self.compose_preseed_url(kernel_params.preseed_url)
        release_path = "%s\\source" % kernel_params.release
        remote_path = "\\\\%s\\reminst" % local_host
        loadoptions = "%s;%s;%s" % \
            (remote_path, release_path, preseed_url)

        # Generate the bcd file.
        bcd_template = self.get_resource_path(kernel_params, "bcd")
        if not os.path.isfile(bcd_template):
            raise BootMethodError(
                "Failed to find bcd template: %s" % bcd_template)
        with tempdir() as tmp:
            bcd_tmp = os.path.join(tmp, "bcd")
            shutil.copyfile(bcd_template, bcd_tmp)

            bcd = Bcd(bcd_tmp)
            bcd.set_load_options(loadoptions)

            with open(bcd_tmp, 'rb') as stream:
                return BytesReader(stream.read())

    def output_static(self, kernel_params, path):
        """Outputs the static file based on the version of Windows."""
        actual_path = self.get_resource_path(kernel_params, path)
        return FilesystemReader(FilePath(actual_path))