This file is indexed.

/usr/lib/python3/dist-packages/maasserver/security.py is in python3-django-maas 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# Copyright 2014-2015 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Security-related code, primarily relating to TLS."""

__all__ = [
    "get_region_certificate",
    "get_shared_secret",
]

from datetime import datetime
import os

from maasserver import locks
from maasserver.models.config import Config
from maasserver.utils import synchronised
from maasserver.utils.orm import (
    transactional,
    with_connection,
)
from maasserver.utils.threads import deferToDatabase
from provisioningserver.security import (
    get_shared_secret_filesystem_path,
    get_shared_secret_from_filesystem,
    set_shared_secret_on_filesystem,
    to_bin,
    to_hex,
)
from provisioningserver.utils.twisted import (
    asynchronous,
    synchronous,
)
from pytz import UTC
from twisted.internet import ssl


def get_serial():
    ref = datetime(2012, 1, 16, tzinfo=UTC)
    now = datetime.now(tz=UTC)
    serial = (now - ref).total_seconds()
    return int(serial)


def load_region_certificate():
    upem = Config.objects.get_config("rpc_region_certificate")
    if upem is None:
        return None
    else:
        # The certificate will be returned as a unicode string. However,
        # it's in PEM form, a base-64 encoded certificate and key, so we
        # need to get back to bytes, then parse it.
        pem = upem.encode("ascii")
        return ssl.PrivateCertificate.loadPEM(pem)


def save_region_certificate(cert):
    assert isinstance(cert, ssl.PrivateCertificate)
    # We'll store the PEM dump of the certificate in the database. We'll
    # get this as a byte-string, so we need to decode to unicode.
    upem = cert.dumpPEM().decode("ascii")
    Config.objects.set_config("rpc_region_certificate", upem)


def generate_region_certificate():
    key = ssl.KeyPair.generate(size=2048)
    return key.selfSignedCert(serialNumber=get_serial(), CN=b"MAAS Region")


@synchronous
@with_connection  # Needed by the following lock.
@synchronised(locks.security)  # Region-wide lock.
@transactional
def get_region_certificate():
    cert = load_region_certificate()
    if cert is None:
        cert = generate_region_certificate()
        save_region_certificate(cert)
    return cert


@synchronous
@with_connection  # Needed by the following lock.
@synchronised(locks.security)  # Region-wide lock.
@transactional
def get_shared_secret_txn():
    # Load secret from database, if it exists.
    secret_in_db_hex = Config.objects.get_config("rpc_shared_secret")
    if secret_in_db_hex is None:
        secret_in_db = None
    else:
        secret_in_db = to_bin(secret_in_db_hex)
    # Load secret from the filesystem, if it exists.
    secret_on_fs = get_shared_secret_from_filesystem()

    if secret_in_db is None and secret_on_fs is None:
        secret = os.urandom(16)  # 16-bytes of crypto-standard noise.
        Config.objects.set_config("rpc_shared_secret", to_hex(secret))
        set_shared_secret_on_filesystem(secret)
    elif secret_in_db is None:
        secret = secret_on_fs
        Config.objects.set_config("rpc_shared_secret", to_hex(secret))
    elif secret_on_fs is None:
        secret = secret_in_db
        set_shared_secret_on_filesystem(secret)
    elif secret_in_db == secret_on_fs:
        secret = secret_in_db  # or secret_on_fs.
    else:
        raise AssertionError(
            "The secret stored in the database does not match the secret "
            "stored on the filesystem at %s. Please investigate." %
            get_shared_secret_filesystem_path())

    return secret


@asynchronous(timeout=10)
def get_shared_secret():
    """Get the shared-secret.

    It may need to generate a new secret and commit it to the database, hence
    this always runs in a separate transaction in a separate thread.

    If called from the IO thread (a.k.a. the reactor), it will return a
    `Deferred` that'll fire with the secret (a byte string).

    If called from another thread it will return the secret directly, but may
    block for up to 10 seconds. If it times-out, an exception is raised.

    :return: The shared-secret, a short byte string, or a `Deferred` if
        called from the IO/reactor thread.
    :raises crochet.TimeoutError: when it times-out after being called from
        thread other than the IO/reactor thread.
    """
    return deferToDatabase(get_shared_secret_txn)