/usr/lib/python3/dist-packages/provisioningserver/boot/windows.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 | # Copyright 2014-2016 Canonical Ltd.
# Copyright 2014 Cloudbase Solutions SRL.
# This software is licensed under the GNU Affero General Public License
# version 3 (see the file LICENSE).
"""Windows PXE Boot Method"""
__all__ = [
'WindowsPXEBootMethod',
]
import os.path
import re
import shutil
import sys
from provisioningserver.boot import (
BootMethod,
BootMethodError,
BytesReader,
get_remote_mac,
)
from provisioningserver.config import ClusterConfiguration
from provisioningserver.logger import get_maas_logger
from provisioningserver.rpc import getRegionClient
from provisioningserver.rpc.exceptions import NoSuchNode
from provisioningserver.rpc.region import RequestNodeInfoByMACAddress
from provisioningserver.utils import (
tftp,
typed,
)
from provisioningserver.utils.fs import tempdir
from provisioningserver.utils.twisted import (
asynchronous,
deferred,
)
from tftp.backend import FilesystemReader
from twisted.internet.defer import (
inlineCallbacks,
returnValue,
succeed,
)
from twisted.python.filepath import FilePath
maaslog = get_maas_logger("windows")
# These files do not exist in the tftproot. WindowsPXEBootMethod
# handles access to these files returning the correct version
# of the file for the booting version of Windows.
#
# Note: Each version of Windows can have different content for
# these files.
STATIC_FILES = [
'pxeboot.0',
'bootmgr.exe',
'\\boot\\bcd',
'\\boot\\winpe.wim',
'\\boot\\boot.sdi',
'\\boot\\font\\wgl4_boot.ttf',
]
def get_hivex_module():
"""Returns the hivex module if avaliable.
python-hivex is an optional dependency, but it is needed
before MAAS can boot Windows.
"""
if 'hivex' not in sys.modules:
try:
__import__('hivex')
except ImportError:
return None
return sys.modules['hivex']
def load_hivex(*args, **kwargs):
"""Returns the Hivex object."""
module = get_hivex_module()
if module is None:
return None
return module.Hivex(*args, **kwargs)
@asynchronous
def request_node_info_by_mac_address(mac_address):
"""Request node info for the given mac address.
:param mac_address: The MAC Address of the node of the event.
:type mac_address: unicode
"""
if mac_address is None:
maaslog.debug("Cannot determine node; MAC address is unknown.")
return succeed(None)
client = getRegionClient()
d = client(RequestNodeInfoByMACAddress, mac_address=mac_address)
def eb_request_node_info(failure):
failure.trap(NoSuchNode)
maaslog.debug("Node doesn't exist for MAC address: %s", mac_address)
return None
return d.addErrback(eb_request_node_info)
class Bcd:
"""Allows modification of the load options in a Windows boot
configuration data file.
References:
http://msdn.microsoft.com/en-us/library/windows/desktop/
- aa362652(v=vs.85).aspx
- aa362641(v=vs.85).aspx
"""
GUID_WINDOWS_BOOTMGR = '{9dea862c-5cdd-4e70-acc1-f32b344d4795}'
BOOT_MGR_DISPLAY_ORDER = '24000001'
LOAD_OPTIONS = '12000030'
def __init__(self, filename):
self.hive = load_hivex(filename, write=True)
# uids
objects = self._get_root_objects()
self.uids = {}
for i in self.hive.node_children(objects):
self.uids[self.hive.node_name(i)] = self.hive.node_children(i)
# default bootloader
mgr = self.uids[self.GUID_WINDOWS_BOOTMGR][1]
bootmgr_elems = dict([(self.hive.node_name(i), i) for i in
self.hive.node_children(mgr)])
self.loader = self._get_loader(bootmgr_elems)
def _get_root_elements(self):
"""Gets the root from the hive."""
root = self.hive.root()
r_elems = {}
for i in self.hive.node_children(root):
name = self.hive.node_name(i)
r_elems[name] = i
return r_elems
def _get_root_objects(self):
"""Gets the root objects."""
elems = self._get_root_elements()
return elems['Objects']
def _get_loader(self, bootmgr_elems):
"""Get default bootloader."""
(val,) = self.hive.node_values(
bootmgr_elems[self.BOOT_MGR_DISPLAY_ORDER])
loader = self.hive.value_multiple_strings(val)[0]
return loader
def _get_loader_elems(self):
"""Get elements present in default boot loader. We need this
in order to determine the loadoptions key.
"""
return dict(
[(self.hive.node_name(i), i)
for i in self.hive.node_children(self.uids[self.loader][1])])
def _get_load_options_key(self):
"""Gets the key containing the load options we want to edit."""
load_elem = self._get_loader_elems()
load_option_key = load_elem.get(self.LOAD_OPTIONS, None)
return load_option_key
def set_load_options(self, value):
"""Sets the loadoptions value to param:value."""
h = self._get_load_options_key()
if h is None:
# No load options key in the hive, add the key
# so the value can be set.
h = self.hive.node_add_child(
self.uids[self.loader][1], self.LOAD_OPTIONS)
k_type = 1
key = "Element"
data = {
't': k_type,
'key': key,
# Windows only accepts utf-16le in load options.
'value': value.decode('utf-8').encode('utf-16le'),
}
self.hive.node_set_value(h, data)
self.hive.commit(None)
class WindowsPXEBootMethod(BootMethod):
name = "windows"
bios_boot_method = "pxe"
template_subdir = "windows"
bootloader_path = "pxeboot.0"
arch_octet = None
@deferred
def get_node_info(self):
"""Gets node information via the remote mac."""
remote_mac = get_remote_mac()
return request_node_info_by_mac_address(remote_mac)
def clean_path(self, path):
"""Converts Windows path into a unix path and strips the
boot subdirectory from the paths.
"""
path = path.lower().replace('\\', '/')
if path[0:6] == "/boot/":
path = path[6:]
return path
@inlineCallbacks
def match_path(self, backend, path):
"""Checks path to see if the boot method should handle
the requested file.
:param backend: requesting backend
:param path: requested path
:return: dict of match params from path, None if no match
"""
# If the node is requesting the initial bootloader, then we
# need to see if this node is set to boot Windows first.
local_host, local_port = tftp.get_local_address()
if path == 'pxelinux.0':
data = yield self.get_node_info()
if data is None:
returnValue(None)
# Only provide the Windows bootloader when installing
# PXELINUX chainloading will work for the rest of the time.
purpose = data.get('purpose')
if purpose != 'install':
returnValue(None)
osystem = data.get('osystem')
if osystem == 'windows':
# python-hivex is needed to continue.
if get_hivex_module() is None:
raise BootMethodError('python-hivex package is missing.')
returnValue({
'mac': data.get('mac'),
'path': self.bootloader_path,
'local_host': local_host,
})
# Fix the paths for the other static files, Windows requests.
elif path.lower() in STATIC_FILES:
returnValue({
'mac': get_remote_mac(),
'path': self.clean_path(path),
'local_host': local_host,
})
returnValue(None)
def get_reader(self, backend, kernel_params, **extra):
"""Render a configuration file as a unicode string.
:param backend: requesting backend
:param kernel_params: An instance of `KernelParameters`.
:param extra: Allow for other arguments. This is a safety valve;
parameters generated in another component (for example, see
`TFTPBackend.get_boot_method_reader`) won't cause this to break.
"""
path = extra['path']
if path == 'bcd':
local_host = extra['local_host']
return self.compose_bcd(kernel_params, local_host)
return self.output_static(kernel_params, path)
@typed
def link_bootloader(self, destination: str):
"""Installs the required files for Windows booting into the
tftproot.
Does nothing. Windows requires manual installation of bootloader
files, due to licensing.
"""
def compose_preseed_url(self, url):
"""Modifies the url to replace all forward slashes with
backslashes, and prepends the ^ character to any upper-case
characters.
Boot load options of Windows will all be upper-cased
as Windows does not care about case, and what gets exposed in the
registry is all uppercase. MAAS requires a case-sensitive url.
The Windows install script extracts the preseed url and any character
that starts with ^ is then uppercased, so that the URL is correct.
"""
url = url.replace('/', '\\')
return re.sub(r"([A-Z])", r"^\1", url)
def get_resource_path(self, kernel_params, path):
"""Gets the resource path from the kernel param."""
with ClusterConfiguration.open() as config:
resources = config.tftp_root
return os.path.join(
resources, 'windows', kernel_params.arch, kernel_params.subarch,
kernel_params.release, kernel_params.label, path)
def compose_bcd(self, kernel_params, local_host):
"""Composes the Windows boot configuration data.
:param kernel_params: An instance of `KernelParameters`.
:return: Binary data
"""
preseed_url = self.compose_preseed_url(kernel_params.preseed_url)
release_path = "%s\\source" % kernel_params.release
remote_path = "\\\\%s\\reminst" % local_host
loadoptions = "%s;%s;%s" % \
(remote_path, release_path, preseed_url)
# Generate the bcd file.
bcd_template = self.get_resource_path(kernel_params, "bcd")
if not os.path.isfile(bcd_template):
raise BootMethodError(
"Failed to find bcd template: %s" % bcd_template)
with tempdir() as tmp:
bcd_tmp = os.path.join(tmp, "bcd")
shutil.copyfile(bcd_template, bcd_tmp)
bcd = Bcd(bcd_tmp)
bcd.set_load_options(loadoptions)
with open(bcd_tmp, 'rb') as stream:
return BytesReader(stream.read())
def output_static(self, kernel_params, path):
"""Outputs the static file based on the version of Windows."""
actual_path = self.get_resource_path(kernel_params, path)
return FilesystemReader(FilePath(actual_path))
|