/usr/lib/python3/dist-packages/provisioningserver/rpc/common.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 | # Copyright 2014-2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Common RPC classes and utilties."""
__all__ = [
"Authenticate",
"Client",
"Identify",
"RPCProtocol",
]
from os import getpid
from socket import gethostname
from provisioningserver.logger import LegacyLogger
from provisioningserver.rpc.interfaces import (
IConnection,
IConnectionToRegion,
)
from provisioningserver.utils.twisted import asynchronous
from twisted.internet.defer import Deferred
from twisted.protocols import amp
from twisted.python.failure import Failure
log = LegacyLogger()
class Identify(amp.Command):
"""Request the identity of the remote side, e.g. its UUID.
:since: 1.5
"""
response = [(b"ident", amp.Unicode())]
class Authenticate(amp.Command):
"""Authenticate the remote side.
The procedure is as follows:
- When establishing a new connection, the region and the cluster call
`Authenticate` on each other, passing a random chunk of data in
`message`. This message must be unique to avoid replay attacks.
- The remote side adds some salt to the message, and calculates an HMAC
digest, keyed with the shared secret.
The salt is intended to prevent replay attacks: it prevents an intruder
from authenticating itself by calling `Authenticate` on the caller (or
another endpoint in the same MAAS installation) and sending the same
message, receiving the digest and passing it back to the caller.
- The remote side returns this digest and the salt. The caller performs
the same calculation, and compares the digests.
- If the digests match, the connection is put into rotation.
- If the digests do not match, the connection is closed immediately, and
an error is logged.
:since: 1.7
"""
arguments = [
(b"message", amp.String()),
]
response = [
(b"digest", amp.String()),
(b"salt", amp.String()), # Is 'salt' the right term here?
]
errors = []
class Client:
"""Wrapper around an :class:`amp.AMP` instance.
Limits the API to a subset of the behaviour of :class:`amp.AMP`'s,
with alterations to make it suitable for use from a thread outside
of the reactor.
"""
def __init__(self, conn):
super(Client, self).__init__()
assert IConnection.providedBy(conn), (
"%r does not provide IConnection" % (conn,))
self._conn = conn
@property
def ident(self):
"""Something that identifies the far end of the connection."""
return self._conn.ident
@property
def localIdent(self):
"""Something that identifies this end of the connection."""
# Testing the interface here is a wart. There should be a separate
# client for the rack, but that's too much like work right now. Well,
# it's complicated: ideally the client in the region should actually
# provide the same interface, and have a `localIdent` property.
if IConnectionToRegion.providedBy(self._conn):
return self._conn.localIdent
else:
raise NotImplementedError(
"Client localIdent is only available in the rack.")
@property
def address(self):
"""Return the address of the far end of the connection."""
# Testing the interface here is a wart. There should be a separate
# client for the rack, but that's too much like work right now. Well,
# it's complicated: ideally the client in the region should actually
# provide the same interface, and have an `address` property.
if IConnectionToRegion.providedBy(self._conn):
return self._conn.address
else:
raise NotImplementedError(
"Client address is only available in the rack.")
@asynchronous
def __call__(self, cmd, *args, **kwargs):
"""Call a remote RPC method.
This is how the client is normally used.
:note:
Though the call signature shows positional arguments, their use is
an error. They're in the signature is so this method can detect
them and provide a better error message than that from Python.
Python's error message when arguments don't match the call's
signature is not great at best, but it also makes it hard to
figure out the receiver when the `TypeError` is raised in a
different stack from the caller's, e.g. when calling into the
Twisted reactor from a thread.
:param cmd: The `amp.Command` child class representing the remote
method to be invoked.
:param kwargs: Any parameters to the remote method. Only keyword
arguments are accepted.
:return: A deferred result. Call its `wait` method (with a timeout
in seconds) to block on the call's completion.
"""
if len(args) != 0:
receiver_name = "%s.%s" % (
self.__module__, self.__class__.__name__)
raise TypeError(
"%s called with %d positional arguments, %r, but positional "
"arguments are not supported. Usage: client(command, arg1="
"value1, ...)" % (receiver_name, len(args), args))
return self._conn.callRemote(cmd, **kwargs)
@asynchronous
def getHostCertificate(self):
return self._conn.hostCertificate
@asynchronous
def getPeerCertificate(self):
return self._conn.peerCertificate
@asynchronous
def isSecure(self):
return self._conn.peerCertificate is not None
def __eq__(self, other):
return type(other) is type(self) and other._conn is self._conn
def __hash__(self):
return hash(self._conn)
def make_command_ref(box):
"""Make a textual description of an AMP command box.
This is intended to help correlating exceptions between distributed parts
of MAAS. The reference takes the form::
$hostname:pid=$pid:cmd=$command_name:ask=$ask_sequence
where:
* ``hostname`` is the hostname of the machine on which the error
occurred.
* ``pid`` is the process ID of where the error originated.
* ``command_name`` is the AMP command name.
* ``ask_sequence`` is the sequence number used for RPC calls that expect
a reply; see http://amp-protocol.net/ for details.
An extended variant might be valuable: a ``make_box_ref`` function that
returns unambiguous references for command, answer, and errors boxes.
"""
return "%s:pid=%d:cmd=%s:ask=%s" % (
gethostname(), getpid(), box[amp.COMMAND].decode("ascii"),
box.get(amp.ASK, b"none").decode("ascii"))
class RPCProtocol(amp.AMP, object):
"""A specialisation of `amp.AMP`.
It's hard to track exactly when an `amp.AMP` protocol is connected to its
transport, or disconnected, from the "outside". It's necessary to subclass
and override `connectionMade` and `connectionLost` and signal from there,
which is what this class does.
:ivar onConnectionMade: A `Deferred` that fires when `connectionMade` has
been called, i.e. this protocol is now connected.
:ivar onConnectionLost: A `Deferred` that fires when `connectionLost` has
been called, i.e. this protocol is no longer connected.
"""
def __init__(self):
super(RPCProtocol, self).__init__()
self.onConnectionMade = Deferred()
self.onConnectionLost = Deferred()
def connectionMade(self):
super(RPCProtocol, self).connectionMade()
self.onConnectionMade.callback(None)
def connectionLost(self, reason):
super(RPCProtocol, self).connectionLost(reason)
self.onConnectionLost.callback(None)
def dispatchCommand(self, box):
"""Call up, but coerce errors into non-fatal failures.
This is called by `_commandReceived`, which is responsible for
capturing unhandled errors and transmitting them back to the remote
side. It does this within a :class:`amp.QuitBox` which immediately
disconnects the transport after being transmitted.
Here we capture all errors before `_commandReceived` sees them and
wrap them with :class:`amp.RemoteAmpError`. This prevents the
disconnecting behaviour.
"""
d = super(RPCProtocol, self).dispatchCommand(box)
def coerce_error(failure):
if failure.check(amp.RemoteAmpError):
return failure
else:
command = box[amp.COMMAND]
command_ref = make_command_ref(box)
log.err(failure, (
"Unhandled failure dispatching AMP command. This is "
"probably a bug. Please ensure that this error is handled "
"within application code or declared in the signature of "
"the %s command. [%s]") % (command, command_ref))
return Failure(amp.RemoteAmpError(
amp.UNHANDLED_ERROR_CODE, b"Unknown Error [%s]" %
command_ref.encode("ascii"), fatal=False, local=failure))
return d.addErrback(coerce_error)
def unhandledError(self, failure):
"""Terminal errback, after application code has seen the failure.
`amp.BoxDispatcher.unhandledError` calls the `amp.IBoxSender`'s
`unhandledError`. In the default implementation this disconnects the
transport.
Here we instead log the failure but do *not* disconnect because it's
too disruptive to the running of MAAS.
"""
log.err(failure, (
"Unhandled failure during AMP request. This is probably a bug. "
"Please ensure that this error is handled within application "
"code."))
|