This file is indexed.

/usr/lib/python3/dist-packages/maasserver/x509.py is in python3-django-maas 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright 2014-2016 Canonical Ltd.
# Copyright 2014 Cloudbase Solutions SRL.
# This software is licensed under the GNU Affero General Public License
# version 3 (see the file LICENSE).

__all__ = []

import getpass
import logging
import os
from os import makedirs
import random
import socket
from string import (
    ascii_lowercase,
    ascii_uppercase,
    digits,
)

import OpenSSL
from provisioningserver.utils.fs import (
    atomic_write,
    read_text_file,
)


logger = logging.getLogger(__name__)


class WinRMX509Error(Exception):
    """Error when generating x509 certificate."""


class WinRMX509:
    """Generates X509 certificates compatible with Windows WinRM."""

    KEY_SIZE = 2048
    PASSPHRASE_LENGTH = 21

    def __init__(self, cert_name, upn_name=None, cert_dir=None):
        self.store = self.get_ssl_dir(cert_dir)
        self.cert_name = cert_name
        self.upn_name = upn_name
        if self.upn_name is None:
            user = getpass.getuser()
            host = socket.getfqdn()
            self.upn_name = "%s@%s" % (user, host)

        self.pem_file = os.path.join(self.store, "%s.pem" % self.cert_name)
        self.key_file = os.path.join(self.store, "%s.key" % self.cert_name)
        self.pfx_file = os.path.join(self.store, "%s.pfx" % self.cert_name)

    def create_cert(self, print_cert=False):
        """Generate a new certifficate, and save it to disk."""
        if os.path.isfile(self.pem_file):
            raise WinRMX509Error(
                "Certificate %s already exists." % self.pem_file)

        key, cert = self.get_key_and_cert()
        self.write_cert(cert)
        self.write_privatekey(key)

        if print_cert:
            self.print_cert_details(self.pem_file)

        logger.debug("Exporting to PKCS12")
        passwd = self.generate_passphrase()
        try:
            self.export_p12(key, cert, passwd)
            logger.debug("Passphrase for exported p12: %s" % passwd)
        except OpenSSL.crypto.Error as err:
            raise WinRMX509Error("Failed to export p12: %s" % err)

    def get_key_and_cert(self):
        """Return the private key and certificate for x509."""
        key = OpenSSL.crypto.PKey()
        key.generate_key(OpenSSL.crypto.TYPE_RSA, self.KEY_SIZE)
        cert = OpenSSL.crypto.X509()
        cert.get_subject().CN = self.upn_name
        subjectAltName = OpenSSL.crypto.X509Extension(
            b"subjectAltName", True,
            ("otherName:1.3.6.1.4.1.311.20.2.3;UTF8:%s"
             % self.upn_name).encode('utf-8'))
        key_usage = OpenSSL.crypto.X509Extension(
            b"extendedKeyUsage", True, b"clientAuth")
        cert.set_serial_number(1000)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
        cert.add_extensions([subjectAltName, key_usage, ])
        cert.set_pubkey(key)
        cert.set_issuer(cert.get_subject())
        cert.sign(key, 'sha1')
        return key, cert

    def get_cert_details(self, pem_file):
        """Return a dictionary containing X509 subject, thumbprint and
        contents."""
        cert, contents = self.load_pem_file(pem_file)
        subject = cert.get_subject().CN
        thumb = cert.digest('SHA1')
        return {'subject': subject, 'thumbprint': thumb, 'contents': contents}

    def write_privatekey(self, key):
        """Write the private key to disk."""
        logger.debug("Writing key: %s" % self.key_file)
        atomic_write(
            OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key),
            self.key_file)

    def write_cert(self, cert):
        """Write the certificate to disk."""
        logger.debug("Writing certificate: %s" % self.pem_file)
        atomic_write(
            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert),
            self.pem_file)

    def print_cert_details(self, pem_file):
        """Print x509 details to stdout."""
        details = self.get_cert_details(pem_file)
        print("Certificate Subject: %s" % details['subject'])
        print("Certificate Thumbprint: %s" % details['thumbprint'])
        print("You may add the following cert in MAAS:")
        print(details['contents'])

    def load_pem_file(self, pem_file):
        """Load a PEM file. Returning `OpenSSL.crypto.X509` object and the
        contents of the file.

        :param pem_file: file to load
        """
        pem_data = read_text_file(pem_file)
        try:
            cert = OpenSSL.crypto.load_certificate(
                OpenSSL.crypto.FILETYPE_PEM, pem_data)
        except OpenSSL.crypto.Error as err:
            raise WinRMX509Error("Failed to load certificate: %s" % err)
        return cert, pem_data

    def export_p12(self, key, cert, passphrase):
        """Create a pcks12 password protected container for the generated
        certificates.

        :param key: Key file to add to PFX file
        :param cert: Certificate file to add to PFX file
        :param passphrase: export passphrase for PFX file
        """
        p12 = OpenSSL.crypto.PKCS12()
        p12.set_certificate(cert)
        p12.set_privatekey(key)
        atomic_write(
            p12.export(
                passphrase=bytes(passphrase.encode("utf-8"))), self.pfx_file)

    def get_ssl_dir(self, cert_dir=None):
        """Return the directory in which to save the certificates. This also
        ensures that the directory exists.
        """
        if cert_dir is None:
            home_dir = os.path.expanduser("~")
            cert_dir = os.path.join(home_dir, '.ssl')
        makedirs(cert_dir, exist_ok=True)
        return cert_dir

    def generate_passphrase(self):
        """Generate an alphanumeric random string to be used together with
        `export_p12`.
        """
        choices = ascii_uppercase + ascii_lowercase + digits
        return ''.join(
            random.choice(choices) for _ in range(self.PASSPHRASE_LENGTH))