This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/boot/__init__.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# Copyright 2012-2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Boot Methods."""

__all__ = [
    "BootMethod",
    "BootMethodRegistry",
    ]

from abc import (
    ABCMeta,
    abstractproperty,
)
from errno import ENOENT
from functools import lru_cache
from io import BytesIO
import os
from typing import Dict

from provisioningserver.boot.tftppath import compose_image_path
from provisioningserver.events import (
    EVENT_TYPES,
    try_send_rack_event,
)
from provisioningserver.kernel_opts import compose_kernel_command_line
from provisioningserver.logger import get_maas_logger
from provisioningserver.rpc import getRegionClient
from provisioningserver.rpc.region import GetArchiveMirrors
from provisioningserver.utils import (
    locate_template,
    tftp,
    typed,
)
from provisioningserver.utils.fs import (
    atomic_copy,
    atomic_symlink,
)
from provisioningserver.utils.network import find_mac_via_arp
from provisioningserver.utils.registry import Registry
from provisioningserver.utils.twisted import asynchronous
import tempita
from tftp.backend import IReader
from twisted.internet.defer import (
    inlineCallbacks,
    returnValue,
)
from zope.interface import implementer


maaslog = get_maas_logger('bootloaders')


@asynchronous
def get_archive_mirrors():
    client = getRegionClient()
    return client(GetArchiveMirrors)


@asynchronous(timeout=10)
@inlineCallbacks
def get_main_archive_url():
    mirrors = yield get_archive_mirrors()
    main_url = mirrors['main'].geturl()
    returnValue(main_url)


@asynchronous(timeout=10)
@inlineCallbacks
def get_ports_archive_url():
    mirrors = yield get_archive_mirrors()
    ports_url = mirrors['ports'].geturl()
    returnValue(ports_url)


@implementer(IReader)
class BytesReader:

    def __init__(self, data):
        super(BytesReader, self).__init__()
        self.buffer = BytesIO(data)
        self.size = len(data)

    def read(self, size):
        return self.buffer.read(size)

    def finish(self):
        self.buffer.close()


class BootMethodError(Exception):
    """Exception raised for errors from a BootMethod."""


@typed
def get_parameters(match) -> Dict[str, str]:
    """Helper that gets the matched parameters from the regex match."""
    return {
        key: value.decode("ascii")
        for key, value in match.groupdict().items()
        if value is not None
        }


def gen_template_filenames(purpose, arch, subarch):
    """List possible template filenames.

    :param purpose: The boot purpose, e.g. "local".
    :param arch: Main machine architecture.
    :param subarch: Sub-architecture, or "generic" if there is none.

    Returns a list of possible PXE template filenames using the following
    lookup order:

      config.{purpose}.{arch}.{subarch}.template
      config.{purpose}.{arch}.template
      config.{purpose}.template
      config.template

    """
    elements = [purpose, arch, subarch]
    while len(elements) >= 1:
        yield "config.%s.template" % ".".join(elements)
        elements.pop()
    yield "config.template"


def get_remote_mac():
    """Gets the requestors MAC address from arp cache.

    This is used, when the dhcp lease file is not up-to-date soon enough
    to extract the MAC address from the IP address assigned by dhcp.
    """
    remote_host, remote_port = tftp.get_remote_address()
    return find_mac_via_arp(remote_host)


class BootMethod(metaclass=ABCMeta):
    """Skeleton for a boot method."""

    # Path prefix that is used for the pxelinux.cfg. Used for
    # the dhcpd.conf that is generated.  Format is "path/to/dir/".
    # relative to tftpboot directory.
    path_prefix = None

    # Arches for which this boot method needs to install boot loaders.
    bootloader_arches = []

    # Bootloader files to symlink into the root tftp directory.
    bootloader_files = []

    @abstractproperty
    def name(self):
        """Name of the boot method."""

    @abstractproperty
    def bios_boot_method(self):
        """Method used by the bios to boot. E.g. `pxe`."""

    @abstractproperty
    def template_subdir(self):
        """Name of template sub-directory."""

    @abstractproperty
    def bootloader_path(self):
        """Relative path from `path_prefix` to boot loader."""

    @abstractproperty
    def arch_octet(self):
        """Architecture type that supports this method. Used for the
        dhcpd.conf file that is generated. Must be in the format XX:XX.
        See http://www.iana.org/assignments/dhcpv6-parameters/
        dhcpv6-parameters.xhtml#processor-architecture
        """

    def match_path(self, backend, path):
        """Checks path for a file the boot method needs to handle.

        :param backend: requesting backend
        :param path: requested path
        :return: dict of match params from path, None if no match
        """
        return None

    def get_reader(self, backend, kernel_params, **extra):
        """Gets the reader the backend will use for this combination of
        boot method, kernel parameters, and extra parameters.

        :param backend: requesting backend
        :param kernel_params: An instance of `KernelParameters`.
        :param extra: Allow for other arguments. This is a safety valve;
            parameters generated in another component (for example, see
            `TFTPBackend.get_boot_method_reader`) won't cause this to break.
        """
        return None

    def _link_simplestream_bootloaders(self, stream_path, destination):
        """Link the bootloaders downloaded from the SimpleStream into the
        destination(tftp root).

        :param stream_path: The path to the bootloaders in the SimpleStream
        :param destination: The path to link the bootloaders to
        """
        for bootloader_file in self.bootloader_files:
            bootloader_src = os.path.join(stream_path, bootloader_file)
            bootloader_dst = os.path.join(destination, bootloader_file)
            if os.path.exists(bootloader_src):
                atomic_symlink(bootloader_src, bootloader_dst)
            else:
                err_msg = (
                    "SimpleStream is missing required bootloader file '%s' "
                    "from bootloader %s." % (bootloader_file, self.name))
                try_send_rack_event(EVENT_TYPES.RACK_IMPORT_ERROR, err_msg)
                maaslog.error(err_msg)

    def _find_and_copy_bootloaders(self, destination, log_missing=True):
        """Attempt to copy bootloaders from the previous snapshot

        :param destination: The path to link the bootloaders to
        :param log_missing: Log missing files, default True

        :return: True if all bootloaders have been found and copied, False
                 otherwise.
        """
        boot_sources_base = os.path.realpath(os.path.join(destination, '..'))
        previous_snapshot = os.path.join(boot_sources_base, 'current')
        files_found = True
        for bootloader_file in self.bootloader_files:
            bootloader_src = os.path.join(previous_snapshot, bootloader_file)
            bootloader_src = os.path.realpath(bootloader_src)
            bootloader_dst = os.path.join(destination, bootloader_file)
            if os.path.exists(bootloader_src):
                # Copy files if their realpath is inside the previous snapshot
                # as once we're done the previous snapshot is deleted. Symlinks
                # to other areas of the filesystem are maintained.
                if boot_sources_base in bootloader_src:
                    atomic_copy(bootloader_src, bootloader_dst)
                else:
                    atomic_symlink(bootloader_src, bootloader_dst)
            else:
                files_found = False
                if log_missing:
                    err_msg = (
                        "Unable to find a copy of %s in the SimpleStream or a "
                        "previously downloaded copy. The %s bootloader type "
                        "may not work." % (bootloader_file, self.name))
                    try_send_rack_event(EVENT_TYPES.RACK_IMPORT_ERROR, err_msg)
                    maaslog.error(err_msg)
        return files_found

    @typed
    def link_bootloader(self, destination: str):
        """Installs the required files for this boot method into the
        destination.

        :param destination: path to install bootloader
        """
        # A bootloader can be used for multiple arches but the rack only
        # stores the bootloader files in the arch listed in the SimpleStream.
        bootloaders_in_simplestream = False
        for arch in self.bootloader_arches:
            stream_path = os.path.join(
                destination, 'bootloader', self.bios_boot_method, arch)
            if os.path.exists(stream_path):
                bootloaders_in_simplestream = True
                break

        if bootloaders_in_simplestream:
            self._link_simplestream_bootloaders(stream_path, destination)
        else:
            self._find_and_copy_bootloaders(destination)

    def __init__(self):
        super(BootMethod, self).__init__()
        # Check the types of subclasses' properties.
        assert isinstance(self.name, str)
        assert isinstance(self.bios_boot_method, str)
        assert isinstance(self.bootloader_path, str)
        assert isinstance(self.template_subdir, str) or (
            self.template_subdir is None)
        assert isinstance(self.bootloader_arches, list) and all(
            isinstance(element, str) for element in self.bootloader_arches)
        assert isinstance(self.bootloader_files, list) and all(
            isinstance(element, str) for element in self.bootloader_files)
        assert isinstance(self.arch_octet, str) or self.arch_octet is None

    @lru_cache(1)
    def get_template_dir(self):
        """Gets the template directory for the boot method."""
        return locate_template("%s" % self.template_subdir)

    @lru_cache(512)
    def get_template(self, purpose, arch, subarch):
        """Gets the best avaliable template for the boot method.

        Templates are loaded each time here so that they can be changed on
        the fly without restarting the provisioning server.

        :param purpose: The boot purpose, e.g. "local".
        :param arch: Main machine architecture.
        :param subarch: Sub-architecture, or "generic" if there is none.
        :return: `tempita.Template`
        """
        pxe_templates_dir = self.get_template_dir()
        for filename in gen_template_filenames(purpose, arch, subarch):
            template_name = os.path.join(pxe_templates_dir, filename)
            try:
                return tempita.Template.from_filename(
                    template_name, encoding="UTF-8")
            except IOError as error:
                if error.errno != ENOENT:
                    raise
        else:
            error = (
                "No PXE template found in %r for:\n"
                "  Purpose: %r, Arch: %r, Subarch: %r\n"
                "This can happen if you manually power up a node when its "
                "state is not one that allows it. Is the node in the "
                "'New' or 'Ready' states? It needs to be Enlisting, "
                "Commissioning or Allocated." % (
                    pxe_templates_dir, purpose, arch, subarch))
            try_send_rack_event(EVENT_TYPES.RACK_IMPORT_ERROR, error)
            raise AssertionError(error)

    def compose_template_namespace(self, kernel_params):
        """Composes the namespace variables that are used by a boot
        method template.
        """
        dtb_subarchs = ['xgene-uboot-mustang']

        def image_dir(params):
            return compose_image_path(
                params.osystem, params.arch, params.subarch,
                params.release, params.label)

        def initrd_path(params):
            # Normally the initrd filename is the SimpleStream filetype. If
            # no filetype is given try the filetype.
            if params.initrd is not None:
                initrd = params.initrd
            else:
                initrd = 'boot-initrd'
            return "%s/%s" % (image_dir(params), initrd)

        def kernel_path(params):
            # Normally the kernel filename is the SimpleStream filetype. If
            # no filetype is given try the filetype.
            if params.kernel is not None:
                kernel = params.kernel
            else:
                kernel = 'boot-kernel'
            return "%s/%s" % (image_dir(params), kernel)

        def dtb_path(params):
            if params.subarch in dtb_subarchs:
                # Normally the dtb filename is the SimpleStream filetype. If
                # no filetype is given try the filetype.
                if params.boot_dtb is not None:
                    boot_dtb = params.boot_dtb
                else:
                    boot_dtb = 'boot-dtb'
                return "%s/%s" % (image_dir(params), boot_dtb)
            else:
                return None

        def kernel_command(params):
            return compose_kernel_command_line(params)

        namespace = {
            "initrd_path": initrd_path,
            "kernel_command": kernel_command,
            "kernel_params": kernel_params,
            "kernel_path": kernel_path,
            "dtb_path": dtb_path,
            }

        return namespace


class BootMethodRegistry(Registry):
    """Registry for boot method classes."""


# Import the supported boot methods after defining BootMethod.
from provisioningserver.boot.pxe import PXEBootMethod
from provisioningserver.boot.uefi_amd64 import UEFIAMD64BootMethod
from provisioningserver.boot.uefi_arm64 import UEFIARM64BootMethod
from provisioningserver.boot.open_firmware_ppc64el import (
    OpenFirmwarePPC64ELBootMethod
)
from provisioningserver.boot.powernv import PowerNVBootMethod
from provisioningserver.boot.windows import WindowsPXEBootMethod


builtin_boot_methods = [
    PXEBootMethod(),
    UEFIAMD64BootMethod(),
    UEFIARM64BootMethod(),
    OpenFirmwarePPC64ELBootMethod(),
    PowerNVBootMethod(),
    WindowsPXEBootMethod(),
]
for method in builtin_boot_methods:
    BootMethodRegistry.register_item(method.name, method)