/usr/lib/python3/dist-packages/provisioningserver/drivers/power/wedge.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | # Copyright 2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Facebook's Wedge Power Driver."""
__all__ = []
from socket import error as SOCKETError
from paramiko import (
AutoAddPolicy,
SSHClient,
SSHException,
)
from provisioningserver.drivers import (
make_ip_extractor,
make_setting_field,
)
from provisioningserver.drivers.power import (
PowerActionError,
PowerConnError,
PowerDriver,
PowerFatalError,
)
class WedgeState:
OFF = ('Microserver power is off')
ON = ('Microserver power is on')
class WedgePowerDriver(PowerDriver):
name = 'wedge'
description = "Facebook's Wedge"
settings = [
make_setting_field('power_address', "IP address", required=True),
make_setting_field('power_user', "Power user"),
make_setting_field(
'power_pass', "Power password", field_type='password'),
]
ip_extractor = make_ip_extractor('power_address')
def detect_missing_packages(self):
# uses pure-python paramiko ssh client - nothing to look for!
return []
def run_wedge_command(self, command,
power_address=None, power_user=None, power_pass=None,
**extra):
"""Run a single command and return unparsed text from stdout."""
try:
ssh_client = SSHClient()
ssh_client.set_missing_host_key_policy(AutoAddPolicy())
ssh_client.connect(power_address,
username=power_user,
password=power_pass)
_, stdout, _ = ssh_client.exec_command(command)
output = stdout.read().decode('utf-8').strip()
except (SSHException, EOFError, SOCKETError) as e:
raise PowerConnError(
"Could not make SSH connection to Wedge for "
"%s on %s - %s" % (power_user, power_address, e))
finally:
ssh_client.close()
return output
def power_on(self, system_id, context):
"""Power on Wedge."""
try:
self.run_wedge_command(
"/usr/local/bin/wedge_power.sh on",
**context)
except PowerConnError:
raise PowerActionError(
"Wedge Power Driver unable to power on")
def power_off(self, system_id, context):
"""Power off Wedge."""
try:
self.run_wedge_command(
"/usr/local/bin/wedge_power.sh off",
**context)
except PowerConnError:
raise PowerActionError(
"Wedge Power Driver unable to power off")
def power_query(self, system_id, context):
"""Power query Wedge."""
try:
power_state = self.run_wedge_command(
"/usr/local/bin/wedge_power.sh status",
**context)
except PowerConnError:
raise PowerActionError(
"Wedge Power Driver unable to power query")
else:
if power_state in WedgeState.OFF:
return 'off'
elif power_state in WedgeState.ON:
return 'on'
else:
raise PowerFatalError(
"Wedge Power Driver retrieved unknown power response %s"
% power_state)
|