/usr/lib/python2.7/dist-packages/provisioningserver/rpc/common.py is in python-maas-provisioningserver 1.5.4+bzr2294-0ubuntu1.2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | # Copyright 2014 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Common RPC classes and utilties."""
from __future__ import (
absolute_import,
print_function,
unicode_literals,
)
str = None
__metaclass__ = type
__all__ = [
"Identify",
"Client",
]
from provisioningserver.rpc.interfaces import IConnection
from provisioningserver.utils import asynchronous
from twisted.protocols import amp
class Identify(amp.Command):
"""Request the identity of the remote side, e.g. its UUID."""
response = [(b"ident", amp.Unicode())]
class Client:
"""Wrapper around an :class:`amp.AMP` instance.
Limits the API to a subset of the behaviour of :class:`amp.AMP`'s,
with alterations to make it suitable for use from a thread outside
of the reactor.
"""
def __init__(self, conn):
super(Client, self).__init__()
assert IConnection.providedBy(conn), (
"%r does not provide IConnection" % (conn,))
self._conn = conn
@property
def ident(self):
"""Something that identifies the far end of the connection."""
return self._conn.ident
@asynchronous
def __call__(self, cmd, **kwargs):
return self._conn.callRemote(cmd, **kwargs)
@asynchronous
def getHostCertificate(self):
return self._conn.hostCertificate
@asynchronous
def getPeerCertificate(self):
return self._conn.peerCertificate
@asynchronous
def isSecure(self):
return self._conn.peerCertificate is not None
def __eq__(self, other):
return type(other) is type(self) and other._conn is self._conn
def __hash__(self):
return hash(self._conn)
|