/usr/lib/python3/dist-packages/castellan/tests/unit/key_manager/mock_key_manager.py is in python3-castellan 0.17.0-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | # Copyright (c) 2015 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A mock implementation of a key manager that stores keys in a dictionary.
This key manager implementation is primarily intended for testing. In
particular, it does not store keys persistently. Lack of a centralized key
store also makes this implementation unsuitable for use among different
services.
Note: Instantiating this class multiple times will create separate key stores.
Keys created in one instance will not be accessible from other instances of
this class.
"""
import binascii
import copy
import random
from cryptography.hazmat import backends
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from oslo_utils import uuidutils
from castellan.common import exception
from castellan.common.objects import private_key as pri_key
from castellan.common.objects import public_key as pub_key
from castellan.common.objects import symmetric_key as sym_key
from castellan.key_manager import key_manager
class MockKeyManager(key_manager.KeyManager):
"""Mocking manager for integration tests.
This mock key manager implementation supports all the methods specified
by the key manager interface. This implementation stores keys within a
dictionary, and as a result, it is not acceptable for use across different
services. Side effects (e.g., raising exceptions) for each method are
handled as specified by the key manager interface.
This key manager is not suitable for use in production deployments.
"""
def __init__(self, configuration=None):
self.conf = configuration
self.keys = {}
def _generate_hex_key(self, key_length):
# hex digit => 4 bits
length = int(key_length / 4)
hex_encoded = self._generate_password(length=length,
symbolgroups='0123456789ABCDEF')
return hex_encoded
def _generate_key(self, **kwargs):
name = kwargs.get('name', None)
algorithm = kwargs.get('algorithm', 'AES')
key_length = kwargs.get('length', 256)
_hex = self._generate_hex_key(key_length)
return sym_key.SymmetricKey(
algorithm,
key_length,
bytes(binascii.unhexlify(_hex)),
name)
def create_key(self, context, **kwargs):
"""Creates a symmetric key.
This implementation returns a UUID for the created key. The algorithm
for the key will always be AES. A Forbidden exception is raised if the
specified context is None.
"""
if context is None:
raise exception.Forbidden()
key = self._generate_key(**kwargs)
return self.store(context, key)
def _generate_public_and_private_key(self, length, name):
crypto_private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=length,
backend=backends.default_backend())
private_der = crypto_private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
crypto_public_key = crypto_private_key.public_key()
public_der = crypto_public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
private_key = pri_key.PrivateKey(
algorithm='RSA',
bit_length=length,
key=bytearray(private_der),
name=name)
public_key = pub_key.PublicKey(
algorithm='RSA',
bit_length=length,
key=bytearray(public_der),
name=name)
return private_key, public_key
def create_key_pair(self, context, algorithm, length,
expiration=None, name=None):
"""Creates an asymmetric key pair.
This implementation returns UUIDs for the created keys in the order:
(private, public)
Forbidden is raised if the context is None.
"""
if context is None:
raise exception.Forbidden()
if algorithm.lower() != 'rsa':
msg = 'Invalid algorithm: {}, only RSA supported'.format(algorithm)
raise ValueError(msg)
valid_lengths = [2048, 3072, 4096]
if length not in valid_lengths:
msg = 'Invalid bit length: {}, only {} supported'.format(
length, valid_lengths)
raise ValueError(msg)
private_key, public_key = self._generate_public_and_private_key(length,
name)
private_key_uuid = self.store(context, private_key)
public_key_uuid = self.store(context, public_key)
return private_key_uuid, public_key_uuid
def _generate_key_id(self):
key_id = uuidutils.generate_uuid()
while key_id in self.keys:
key_id = uuidutils.generate_uuid()
return key_id
def store(self, context, managed_object, **kwargs):
"""Stores (i.e., registers) a key with the key manager."""
if context is None:
raise exception.Forbidden()
key_id = self._generate_key_id()
managed_object._id = key_id
self.keys[key_id] = managed_object
return key_id
def get(self, context, managed_object_id, metadata_only=False, **kwargs):
"""Retrieves the key identified by the specified id.
This implementation returns the key that is associated with the
specified UUID. A Forbidden exception is raised if the specified
context is None; a KeyError is raised if the UUID is invalid.
"""
if context is None:
raise exception.Forbidden()
obj = copy.deepcopy(self.keys[managed_object_id])
if metadata_only:
if hasattr(obj, "_key"):
obj._key = None
if hasattr(obj, "_data"):
obj._data = None
if hasattr(obj, "_passphrase"):
obj._passphrase = None
return obj
def delete(self, context, managed_object_id, **kwargs):
"""Deletes the object identified by the specified id.
A Forbidden exception is raised if the context is None and a
KeyError is raised if the UUID is invalid.
"""
if context is None:
raise exception.Forbidden()
del self.keys[managed_object_id]
def _generate_password(self, length, symbolgroups):
"""Generate a random password from the supplied symbol groups.
At least one symbol from each group will be included. Unpredictable
results if length is less than the number of symbol groups.
Believed to be reasonably secure (with a reasonable password length!)
"""
# NOTE(jerdfelt): Some password policies require at least one character
# from each group of symbols, so start off with one random character
# from each symbol group
password = [random.choice(s) for s in symbolgroups]
# If length < len(symbolgroups), the leading characters will only
# be from the first length groups. Try our best to not be predictable
# by shuffling and then truncating.
random.shuffle(password)
password = password[:length]
length -= len(password)
# then fill with random characters from all symbol groups
symbols = ''.join(symbolgroups)
password.extend([random.choice(symbols) for _i in range(length)])
# Finally, shuffle to ensure first x characters aren't from a
# predictable group
random.shuffle(password)
return ''.join(password)
def list(self, context, object_type=None, metadata_only=False):
"""Retrieves a list of managed objects that match the criteria.
A Forbidden exception is raised if the context is None.
If no search criteria is given, all objects are returned.
"""
if context is None:
raise exception.Forbidden()
objects = []
for obj_id in self.keys:
obj = self.get(context, obj_id, metadata_only=metadata_only)
if type(obj) == object_type or object_type is None:
objects.append(obj)
return objects
|