This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/drivers/power/amt.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
# Copyright 2015-2016 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""AMT Power Driver."""

__all__ = []

from itertools import chain
from os.path import (
    dirname,
    join,
)
import re
from subprocess import (
    PIPE,
    Popen,
)
from time import sleep

from lxml import etree
from provisioningserver.drivers import (
    make_ip_extractor,
    make_setting_field,
)
from provisioningserver.drivers.power import (
    is_power_parameter_set,
    PowerActionError,
    PowerAuthError,
    PowerConnError,
    PowerDriver,
    PowerSettingError,
)
from provisioningserver.utils import (
    shell,
    typed,
)


AMT_ERRORS = {
    '401 Unauthorized': {
        'message': (
            "Incorrect password.  Check BMC configuration and try again."),
        'exception': PowerAuthError
    },
    "500 Can't connect": {
        'message': (
            "Could not connect to BMC.  "
            "Check BMC configuration and try again."),
        'exception': PowerConnError
    },
}


REQUIRED_PACKAGES = [["amttool", "amtterm"], ["wsman", "wsmancli"]]


class AMTPowerDriver(PowerDriver):

    name = 'amt'
    description = "Intel AMT"
    settings = [
        make_setting_field(
            'power_pass', "Power password", field_type='password'),
        make_setting_field('power_address', "Power address", required=True),
    ]
    ip_extractor = make_ip_extractor('power_address')

    def detect_missing_packages(self):
        missing_packages = []
        for binary, package in REQUIRED_PACKAGES:
            if not shell.has_command_available(binary):
                missing_packages.append(package)
        return missing_packages

    @typed
    def _render_wsman_state_xml(self, power_change) -> bytes:
        """Render wsman state XML."""
        wsman_state_filename = join(dirname(__file__), "amt.wsman-state.xml")
        wsman_state_ns = {
            "p": (
                "http://schemas.dmtf.org/wbem/wscim/1/cim-schema"
                "/2/CIM_PowerManagementService"
            ),
        }
        tree = etree.parse(wsman_state_filename)
        [ps] = tree.xpath("//p:PowerState", namespaces=wsman_state_ns)
        power_states = {'on': '2', 'off': '8', 'restart': '10'}
        ps.text = power_states[power_change]
        return etree.tostring(tree)

    @typed
    def _parse_multiple_xml_docs(self, xml: bytes):
        """Parse multiple XML documents.

        Each document must commence with an XML document declaration, i.e.
        <?xml ...

        Works around a weird decision in `wsman` where it returns multiple XML
        documents in a single stream.
        """
        xmldecl = re.compile(b'<[?]xml\\s')
        xmldecls = xmldecl.finditer(xml)
        starts = [match.start() for match in xmldecls]
        ends = starts[1:] + [len(xml)]
        frags = (xml[start:end] for start, end in zip(starts, ends))
        return (etree.fromstring(frag) for frag in frags)

    @typed
    def get_power_state(self, xml: bytes) -> str:
        """Get PowerState text from XML."""
        namespaces = {
            "h": (
                "http://schemas.dmtf.org/wbem/wscim/1/cim-schema"
                "/2/CIM_AssociatedPowerManagementService"
            ),
        }
        state = next(chain.from_iterable(
            doc.xpath('//h:PowerState/text()', namespaces=namespaces)
            for doc in self._parse_multiple_xml_docs(xml)
        ))
        return state

    def _get_amt_environment(self, power_pass):
        """Set and return environment for AMT."""
        env = shell.get_env_with_locale()
        env['AMT_PASSWORD'] = power_pass
        return env

    def _set_pxe_boot(self, ip_address, power_pass):
        """Set to PXE for next boot."""
        wsman_pxe_options = {
            'ChangeBootOrder': (
                join(dirname(__file__), "amt.wsman-pxe.xml"),
                ('http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/'
                 'CIM_BootConfigSetting?InstanceID="Intel(r) '
                 'AMT: Boot Configuration 0"')),
            'SetBootConfigRole': (
                join(dirname(__file__), "amt.wsman-boot-config.xml"),
                ('http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/'
                 'CIM_BootService?SystemCreationClassName='
                 '"CIM_ComputerSystem"&SystemName="Intel(r) AMT"'
                 '&CreationClassName="CIM_BootService"&Name="Intel(r)'
                 ' AMT Boot Service"')),
        }
        wsman_opts = (
            '--port', '16992', '--hostname', ip_address, '--username',
            'admin', '--password', power_pass,
            '--noverifypeer', '--noverifyhost'
        )
        # Change boot order to PXE and enable boot config request
        for method, (schema_file, schema_uri) in wsman_pxe_options.items():
            with open(schema_file, "rb") as fd:
                wsman_opts += ('--input', '-')
                action = ('invoke', '--method', method, schema_uri)
                command = ('wsman',) + wsman_opts + action
                self._run(command, power_pass, stdin=fd.read())

    @typed
    def _run(
            self, command: tuple, power_pass: str,
            stdin: bytes=None) -> bytes:
        """Run a subprocess with stdin."""
        env = self._get_amt_environment(power_pass)
        process = Popen(
            command, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
        stdout, stderr = process.communicate(stdin)
        if process.returncode != 0:
            raise PowerActionError(
                "Failed to run command: %s with error: %s" % (
                    command, stderr.decode("utf-8", "replace")))
        return stdout

    @typed
    def _issue_amttool_command(
            self, cmd: str, ip_address: str, power_pass: str,
            amttool_boot_mode=None, stdin=None) -> bytes:
        """Perform a command using amttool."""
        command = ('amttool', ip_address, cmd)
        if cmd in ('power-cycle', 'powerup'):
            command += (amttool_boot_mode,)
        return self._run(command, power_pass, stdin=stdin)

    @typed
    def _issue_wsman_command(
            self, power_change: str, ip_address: str,
            power_pass: str) -> bytes:
        """Perform a command using wsman."""
        wsman_power_schema_uri = (
            'http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/'
            'CIM_PowerManagementService?SystemCreationClassName='
            '"CIM_ComputerSystem"&SystemName="Intel(r) AMT"'
            '&CreationClassName="CIM_PowerManagementService"&Name='
            '"Intel(r) AMT Power Management Service"'
        )
        wsman_query_schema_uri = (
            'http://schemas.dmtf.org/wbem/wscim/1/cim-schema/2/'
            'CIM_AssociatedPowerManagementService'
        )
        wsman_opts = (
            '--port', '16992', '--hostname', ip_address, '--username',
            'admin', '--password', power_pass,
            '--noverifypeer', '--noverifyhost'
        )
        if power_change in ('on', 'off', 'restart'):
            stdin = self._render_wsman_state_xml(power_change)
            wsman_opts += ('--input', '-',)
            action = (
                'invoke', '--method', 'RequestPowerStateChange',
                wsman_power_schema_uri)
            command = ('wsman',) + wsman_opts + action
        elif power_change == 'query':
            stdin = None  # No input for query
            wsman_opts += ('--optimize', '--encoding', 'utf-8',)
            action = ('enumerate', wsman_query_schema_uri)
            command = ('wsman',) + wsman_opts + action
        return self._run(command, power_pass, stdin=stdin)

    def amttool_query_state(self, ip_address, power_pass):
        """Ask for node's power state: 'on' or 'off', via amttool."""
        # Retry the state if it fails because it often fails the first time
        output = None
        for _ in range(10):
            output = self._issue_amttool_command(
                'info', ip_address, power_pass)
            if output is not None and len(output) > 0:
                break
            # Wait 1 second between retries.  AMT controllers are generally
            # very light and may not be comfortable with more frequent
            # queries.
            sleep(1)

        if output is None:
            raise PowerActionError("amttool power querying failed.")

        # Ensure that from this point forward that output is a str.
        output = output.decode("utf-8")

        # Wide awake (S0), or asleep (S1-S4), but not a clean slate that
        # will lead to a fresh boot.
        if 'S5' in output:
            return 'off'
        for state in ('S0', 'S1', 'S2', 'S3', 'S4'):
            if state in output:
                return 'on'
        raise PowerActionError(
            "Got unknown power state from node: %s" % state)

    def wsman_query_state(self, ip_address, power_pass):
        """Ask for node's power state: 'on' or 'off', via wsman."""
        # Retry the state if it fails because it often fails the first time.
        output = None
        for _ in range(10):
            output = self._issue_wsman_command('query', ip_address, power_pass)
            if output is not None and len(output) > 0:
                break
            # Wait 1 second between retries.  AMT controllers are generally
            # very light and may not be comfortable with more frequent
            # queries.
            sleep(1)

        if output is None:
            raise PowerActionError("wsman power querying failed.")
        else:
            state = self.get_power_state(output)
            # There are a LOT of possible power states
            # 1: Other                    9: Power Cycle (Off-Hard)
            # 2: On                       10: Master Bus Reset
            # 3: Sleep - Light            11: Diagnostic Interrupt (NMI)
            # 4: Sleep - Deep             12: Off - Soft Graceful
            # 5: Power Cycle (Off - Soft) 13: Off - Hard Graceful
            # 6: Off - Hard               14: Master Bus Reset Graceful
            # 7: Hibernate (Off - Soft)   15: Power Cycle (Off-Soft Graceful)
            # 8: Off - Soft               16: Power Cycle (Off-Hard Graceful)
            #                             17: Diagnostic Interrupt (INIT)

            # These are all power states that indicate that the system is
            # either ON or will resume function in an ON or Powered Up
            # state (e.g. being power cycled currently)
            if state in (
                    '2', '3', '4', '5', '7', '9', '10', '14', '15', '16'):
                return 'on'
            elif state in ('6', '8', '12', '13'):
                return 'off'
            else:
                raise PowerActionError(
                    "Got unknown power state from node: %s" % state)

    def amttool_restart(self, ip_address, power_pass, amttool_boot_mode):
        """Restart the node via amttool."""
        self._issue_amttool_command(
            'power_cycle', ip_address, power_pass,
            amttool_boot_mode=amttool_boot_mode, stdin=b'yes')

    def amttool_power_on(self, ip_address, power_pass, amttool_boot_mode):
        """Power on the node via amttool."""
        # Try several times.  Power commands often fail the first time.
        for _ in range(10):
            # Issue the AMT command; amttool will prompt for confirmation.
            self._issue_amttool_command(
                'powerup', ip_address, power_pass,
                amttool_boot_mode=amttool_boot_mode, stdin=b'yes')
            if self.amttool_query_state(ip_address, power_pass) == 'on':
                return
            sleep(1)
        raise PowerActionError("Machine is not powering on.  Giving up.")

    def wsman_power_on(self, ip_address, power_pass, restart=False):
        """Power on the node via wsman."""
        power_command = 'restart' if restart else 'on'
        self._set_pxe_boot(ip_address, power_pass)
        self._issue_wsman_command(power_command, ip_address, power_pass)
        # Check power state several times.  It usually takes a second or
        # two to get the correct state.
        for _ in range(10):
            if self.wsman_query_state(ip_address, power_pass) == 'on':
                return  # Success.  Machine is on.
            sleep(1)
        raise PowerActionError("Machine is not powering on.  Giving up.")

    def amttool_power_off(self, ip_address, power_pass):
        """Power off the node via amttool."""
        # Try several times.  Power commands often fail the first time.
        for _ in range(10):
            if self.amttool_query_state(ip_address, power_pass) == 'off':
                # Success.  Machine is off.
                return
                # Issue the AMT command; amttool will prompt for confirmation.
            self._issue_amttool_command(
                'powerdown', ip_address, power_pass, stdin=b'yes')
            sleep(1)
        raise PowerActionError("Machine is not powering off.  Giving up.")

    def wsman_power_off(self, ip_address, power_pass):
        """Power off the node via wsman."""
        # Issue the wsman command to change power state.
        self._issue_wsman_command('off', ip_address, power_pass)
        # Check power state several times.  It usually takes a second or
        # two to get the correct state.
        for _ in range(10):
            if self.wsman_query_state(ip_address, power_pass) == 'off':
                return  # Success.  Machine is off.
            else:
                sleep(1)
        raise PowerActionError("Machine is not powering off.  Giving up.")

    def _get_amt_command(self, ip_address, power_pass):
        """Retrieve AMT command to use, either amttool or wsman
        (if AMT version > 8), for the given system.
        """
        # XXX bug=1331214
        # Check if the AMT ver > 8
        # If so, we need wsman, not amttool
        env = self._get_amt_environment(power_pass)
        process = Popen(
            ('amttool', ip_address, 'info'), stdout=PIPE, stderr=PIPE, env=env)
        stdout, stderr = process.communicate()
        stdout = stdout.decode("utf-8")
        stderr = stderr.decode("utf-8")
        if stdout == "" or stdout.isspace():
            for error, error_info in AMT_ERRORS.items():
                if error in stderr:
                    raise error_info.get(
                        'exception')(error_info.get('message'))
            raise PowerConnError(
                "Unable to retrieve AMT version: %s" % stderr)
        else:
            match = re.search("AMT version:\s*([0-9]+)", stdout)
            if match is None:
                raise PowerActionError(
                    "Unable to extract AMT version from "
                    "amttool output: %s" % stdout)
            else:
                version = match.group(1)
                if int(version) > 8:
                    return 'wsman'
                else:
                    return 'amttool'

    def _get_amttool_boot_mode(self, boot_mode):
        """Set amttool boot mode."""
        # boot_mode tells us whether we're pxe booting or local booting.
        # For local booting, the argument to amttool must be empty
        # (NOT 'hd', it doesn't work!).
        if boot_mode == 'local':
            return ''
        else:
            return boot_mode

    def _get_ip_address(self, power_address, ip_address):
        """Get the IP address of the AMT BMC."""
        # The user specified power_address overrides any automatically
        # determined ip_address.
        if (is_power_parameter_set(power_address) and not
                is_power_parameter_set(ip_address)):
            return power_address
        elif is_power_parameter_set(ip_address):
            return ip_address
        else:
            raise PowerSettingError(
                "No IP address provided.  "
                "Please update BMC configuration and try again.")

    def power_on(self, system_id, context):
        """Power on AMT node."""
        ip_address = self._get_ip_address(
            context.get('power_address'), context.get('ip_address'))
        power_pass = context.get('power_pass')
        amt_command = self._get_amt_command(ip_address, power_pass)
        if amt_command == 'amttool':
            amttool_boot_mode = self._get_amttool_boot_mode(
                context.get('boot_mode'))
            if self.amttool_query_state(ip_address, power_pass) == 'on':
                self.amttool_restart(ip_address, power_pass, amttool_boot_mode)
            else:
                self.amttool_power_on(
                    ip_address, power_pass, amttool_boot_mode)
        elif amt_command == 'wsman':
            if self.wsman_query_state(ip_address, power_pass) == 'on':
                self.wsman_power_on(ip_address, power_pass, restart=True)
            else:
                self.wsman_power_on(ip_address, power_pass)

    def power_off(self, system_id, context):
        """Power off AMT node."""
        ip_address = self._get_ip_address(
            context.get('power_address'), context.get('ip_address'))
        power_pass = context.get('power_pass')
        amt_command = self._get_amt_command(ip_address, power_pass)
        if amt_command == 'amttool':
            if self.amttool_query_state(ip_address, power_pass) != 'off':
                self.amttool_power_off(ip_address, power_pass)
        elif amt_command == 'wsman':
            if self.wsman_query_state(ip_address, power_pass) != 'off':
                self.wsman_power_off(ip_address, power_pass)

    def power_query(self, system_id, context):
        """Power query AMT node."""
        ip_address = self._get_ip_address(
            context.get('power_address'), context.get('ip_address'))
        power_pass = context.get('power_pass')
        amt_command = self._get_amt_command(ip_address, power_pass)
        if amt_command == 'amttool':
            return self.amttool_query_state(ip_address, power_pass)
        elif amt_command == 'wsman':
            return self.wsman_query_state(ip_address, power_pass)