/usr/lib/python3/dist-packages/maasserver/security.py is in python3-django-maas 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | # Copyright 2014-2015 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Security-related code, primarily relating to TLS."""
__all__ = [
"get_region_certificate",
"get_shared_secret",
]
from datetime import datetime
import os
from maasserver import locks
from maasserver.models.config import Config
from maasserver.utils import synchronised
from maasserver.utils.orm import (
transactional,
with_connection,
)
from maasserver.utils.threads import deferToDatabase
from provisioningserver.security import (
get_shared_secret_filesystem_path,
get_shared_secret_from_filesystem,
set_shared_secret_on_filesystem,
to_bin,
to_hex,
)
from provisioningserver.utils.twisted import (
asynchronous,
synchronous,
)
from pytz import UTC
from twisted.internet import ssl
def get_serial():
ref = datetime(2012, 1, 16, tzinfo=UTC)
now = datetime.now(tz=UTC)
serial = (now - ref).total_seconds()
return int(serial)
def load_region_certificate():
upem = Config.objects.get_config("rpc_region_certificate")
if upem is None:
return None
else:
# The certificate will be returned as a unicode string. However,
# it's in PEM form, a base-64 encoded certificate and key, so we
# need to get back to bytes, then parse it.
pem = upem.encode("ascii")
return ssl.PrivateCertificate.loadPEM(pem)
def save_region_certificate(cert):
assert isinstance(cert, ssl.PrivateCertificate)
# We'll store the PEM dump of the certificate in the database. We'll
# get this as a byte-string, so we need to decode to unicode.
upem = cert.dumpPEM().decode("ascii")
Config.objects.set_config("rpc_region_certificate", upem)
def generate_region_certificate():
key = ssl.KeyPair.generate(size=2048)
return key.selfSignedCert(serialNumber=get_serial(), CN=b"MAAS Region")
@synchronous
@with_connection # Needed by the following lock.
@synchronised(locks.security) # Region-wide lock.
@transactional
def get_region_certificate():
cert = load_region_certificate()
if cert is None:
cert = generate_region_certificate()
save_region_certificate(cert)
return cert
@synchronous
@with_connection # Needed by the following lock.
@synchronised(locks.security) # Region-wide lock.
@transactional
def get_shared_secret_txn():
# Load secret from database, if it exists.
secret_in_db_hex = Config.objects.get_config("rpc_shared_secret")
if secret_in_db_hex is None:
secret_in_db = None
else:
secret_in_db = to_bin(secret_in_db_hex)
# Load secret from the filesystem, if it exists.
secret_on_fs = get_shared_secret_from_filesystem()
if secret_in_db is None and secret_on_fs is None:
secret = os.urandom(16) # 16-bytes of crypto-standard noise.
Config.objects.set_config("rpc_shared_secret", to_hex(secret))
set_shared_secret_on_filesystem(secret)
elif secret_in_db is None:
secret = secret_on_fs
Config.objects.set_config("rpc_shared_secret", to_hex(secret))
elif secret_on_fs is None:
secret = secret_in_db
set_shared_secret_on_filesystem(secret)
elif secret_in_db == secret_on_fs:
secret = secret_in_db # or secret_on_fs.
else:
raise AssertionError(
"The secret stored in the database does not match the secret "
"stored on the filesystem at %s. Please investigate." %
get_shared_secret_filesystem_path())
return secret
@asynchronous(timeout=10)
def get_shared_secret():
"""Get the shared-secret.
It may need to generate a new secret and commit it to the database, hence
this always runs in a separate transaction in a separate thread.
If called from the IO thread (a.k.a. the reactor), it will return a
`Deferred` that'll fire with the secret (a byte string).
If called from another thread it will return the secret directly, but may
block for up to 10 seconds. If it times-out, an exception is raised.
:return: The shared-secret, a short byte string, or a `Deferred` if
called from the IO/reactor thread.
:raises crochet.TimeoutError: when it times-out after being called from
thread other than the IO/reactor thread.
"""
return deferToDatabase(get_shared_secret_txn)
|