This file is indexed.

/usr/lib/python3/dist-packages/provisioningserver/security.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# Copyright 2014-2017 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Cluster security code."""

__all__ = [
    "calculate_digest",
    "get_shared_secret_filesystem_path",
    "get_shared_secret_from_filesystem",
]

from base64 import (
    urlsafe_b64decode,
    urlsafe_b64encode,
)
import binascii
from binascii import (
    a2b_hex,
    b2a_hex,
)
import errno
from hashlib import sha256
from hmac import HMAC
from os import (
    fchmod,
    makedirs,
)
from os.path import dirname
from sys import (
    stderr,
    stdin,
)
from threading import Lock

from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from provisioningserver.path import get_data_path
from provisioningserver.utils.fs import (
    FileLock,
    read_text_file,
    write_text_file,
)


class MissingSharedSecret(RuntimeError):
    """Raised when the MAAS shared secret is missing."""


def to_hex(b):
    """Convert byte string to hex encoding."""
    assert isinstance(b, bytes), "%r is not a byte string" % (b,)
    return b2a_hex(b).decode("ascii")


def to_bin(u):
    """Convert ASCII-only unicode string to hex encoding."""
    assert isinstance(u, str), "%r is not a unicode string" % (u,)
    # Strip ASCII whitespace from u before converting.
    return a2b_hex(u.encode("ascii").strip())


def get_shared_secret_filesystem_path():
    """Return the path to shared-secret on the filesystem."""
    return get_data_path("var", "lib", "maas", "secret")


def get_shared_secret_from_filesystem():
    """Load the secret from the filesystem.

    `get_shared_secret_filesystem_path` defines where the file will be
    written. If the directory does not already exist, this will attempt to
    create it, including all parent directories.

    :return: A byte string of arbitrary length.
    """
    secret_path = get_shared_secret_filesystem_path()
    makedirs(dirname(secret_path), exist_ok=True)
    with FileLock(secret_path).wait(10):
        # Load secret from the filesystem, if it exists.
        try:
            secret_hex = read_text_file(secret_path)
        except IOError as e:
            if e.errno == errno.ENOENT:
                return None
            else:
                raise
        else:
            return to_bin(secret_hex)


def set_shared_secret_on_filesystem(secret):
    """Write the secret to the filesystem.

    `get_shared_secret_filesystem_path` defines where the file will be
    written. If the directory does not already exist, this will attempt to
    create it, including all parent directories.

    :type secret: A byte string of arbitrary length.
    """
    secret_path = get_shared_secret_filesystem_path()
    makedirs(dirname(secret_path), exist_ok=True)
    secret_hex = to_hex(secret)
    with FileLock(secret_path).wait(10):
        # Ensure that the file has sensible permissions.
        with open(secret_path, "ab") as secret_f:
            fchmod(secret_f.fileno(), 0o640)
        # Write secret to the filesystem.
        write_text_file(secret_path, secret_hex)


def calculate_digest(secret, message, salt):
    """Calculate a SHA-256 HMAC digest for the given data."""
    assert isinstance(secret, bytes), "%r is not a byte string." % (secret,)
    assert isinstance(message, bytes), "%r is not byte string." % (message,)
    assert isinstance(salt, bytes), "%r is not a byte string." % (salt,)
    hmacr = HMAC(secret, digestmod=sha256)
    hmacr.update(message)
    hmacr.update(salt)
    return hmacr.digest()


# Cache the Fernet pre-shared key, since it's expensive to derive the key.
# Note: this will need to change to become a dictionary if salts are supported.
_fernet_psk = None
_fernet_lock = Lock()

# Warning: this should not generally be changed; a MAAS server will not be able
# to communicate with any peers using this value unless it matches. This value
# should be set relatively high, in order to make a brute-force attack to
# determine the MAAS secret impractical.
DEFAULT_ITERATION_COUNT = 100000


def _get_or_create_fernet_psk():
    """Gets or creates a pre-shared key to be used with the Fernet algorithm.

    The pre-shared key is cached in a global to prevent the expense of
    recalculating it.

    Uses the MAAS secret (typically /var/lib/maas/secret) to derive the key.

    :return: A pre-shared key suitable for use with the Fernet class.
    """
    with _fernet_lock:
        global _fernet_psk
        if _fernet_psk is None:
            secret = get_shared_secret_from_filesystem()
            if secret is None:
                raise MissingSharedSecret("MAAS shared secret not found.")
            # Keying material is required by PBKDF2 to be a byte string.
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=32,
                # XXX: It might be better to use the maas_id for the salt.
                # But that requires the maas_id to be known in advance by all
                # parties to the encrypted communication. The format of the
                # cached pre-shared key would also need to change.
                salt=b"",
                # XXX: an infrequently-changing variable iteration count might
                # be nice, but that would require protocol support, and
                # changing the way the PSK is cached.
                iterations=DEFAULT_ITERATION_COUNT,
                backend=default_backend()
            )
            key = kdf.derive(secret)
            key = urlsafe_b64encode(key)
            _fernet_psk = key
        else:
            key = _fernet_psk
    return key


def _get_fernet_context():
    """Returns a Fernet context based on the MAAS secret."""
    key = _get_or_create_fernet_psk()
    f = Fernet(key)
    return f


def fernet_encrypt_psk(message, raw=False):
    """Encrypts the specified message using the Fernet format.

    Returns the encrypted token, as a byte string.

    Note that a Fernet token includes the current time. Users decrypting a
    the token can specify a TTL (in seconds) indicating how long the encrypted
    message should be valid. So the system clock must be correct before calling
    this function.

    :param message: The message to encrypt.
    :type message: Must be of type 'bytes' or a UTF-8 'str'.
    :param raw: if True, returns the decoded base64 bytes representing the
        Fernet token. The bytes must be converted back to base64 to be
        decrypted. (Or the 'raw' argument on the corresponding
        fernet_decrypt_psk() function can be used.)
    :return: the encryption token, as a base64-encoded byte string.
    """
    fernet = _get_fernet_context()
    if isinstance(message, str):
        message = message.encode("utf-8")
    token = fernet.encrypt(message)
    if raw is True:
        token = urlsafe_b64decode(token)
    return token


def fernet_decrypt_psk(token, ttl=None, raw=False):
    """Decrypts the specified Fernet token using the MAAS secret.

    Returns the decrypted token as a byte string; the user is responsible for
    converting it to the correct format or encoding.

    :param message: The token to decrypt.
    :type token: Must be of type 'bytes', or an ASCII base64 string.
    :param ttl: Optional amount of time (in seconds) allowed to have elapsed
        before the message is rejected upon decryption. Note that the Fernet
        library considers times up to 60 seconds into the future (beyond the
        TTL) to be valid.
    :param raw: if True, treats the string as the decoded base64 bytes of a
        Fernet token, and attempts to encode them (as expected by the Fernet
        APIs) before decrypting.
    :return: bytes
    """
    if raw is True:
        token = urlsafe_b64encode(token)
    f = _get_fernet_context()
    if isinstance(token, str):
        token = token.encode("ascii")
    return f.decrypt(token, ttl=ttl)


class InstallSharedSecretScript:
    """Install a shared-secret onto a cluster.

    This class conforms to the contract that :py:func:`MainScript.register`
    requires.
    """

    @staticmethod
    def add_arguments(parser):
        """Initialise options for storing a shared-secret.

        :param parser: An instance of :class:`ArgumentParser`.
        """

    @staticmethod
    def run(args):
        """Install a shared-secret to this cluster.

        When invoked interactively, you'll be prompted to enter the secret.
        Otherwise the secret will be read from the first line of stdin.

        In both cases, the secret must be hex/base16 encoded.
        """
        # Obtain the secret from the invoker.
        if stdin.isatty():
            try:
                secret_hex = input("Secret (hex/base16 encoded): ")
            except EOFError:
                print()  # So that the shell prompt appears on the next line.
                raise SystemExit(1)
            except KeyboardInterrupt:
                print()  # So that the shell prompt appears on the next line.
                raise
        else:
            secret_hex = stdin.readline()
        # Decode and install the secret.
        try:
            secret = to_bin(secret_hex.strip())
        except binascii.Error as error:
            print("Secret could not be decoded:", str(error), file=stderr)
            raise SystemExit(1)
        else:
            set_shared_secret_on_filesystem(secret)
            shared_secret_path = get_shared_secret_filesystem_path()
            print("Secret installed to %s." % shared_secret_path)
            raise SystemExit(0)


class CheckForSharedSecretScript:
    """Check for the presence of a shared-secret on a cluster.

    This class conforms to the contract that :py:func:`MainScript.register`
    requires.
    """

    @staticmethod
    def add_arguments(parser):
        """Initialise options for checking the presence of a shared-secret.

        :param parser: An instance of :class:`ArgumentParser`.
        """

    @staticmethod
    def run(args):
        """Check for the presence of a shared-secret on this cluster.

        Exits 0 (zero) if a shared-secret has been installed.
        """
        if get_shared_secret_from_filesystem() is None:
            print("Shared-secret is NOT installed.")
            raise SystemExit(1)
        else:
            print("Shared-secret is installed.")
            raise SystemExit(0)