/usr/lib/python2.7/dist-packages/azure/storage/blob/_encryption.py is in python-azure-storage 0.33.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | #-------------------------------------------------------------------------
# Copyright (c) Microsoft. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#--------------------------------------------------------------------------
from os import urandom
from json import(
dumps,
loads,
)
from .._error import(
_validate_not_none,
_validate_key_encryption_key_wrap,
_ERROR_DATA_NOT_ENCRYPTED,
)
from .._encryption import (
_generate_encryption_data_dict,
_generate_AES_CBC_cipher,
_dict_to_encryption_data,
_validate_and_unwrap_cek,
_EncryptionAlgorithm,
)
from cryptography.hazmat.primitives.padding import PKCS7
def _encrypt_blob(blob, key_encryption_key):
'''
Encrypts the given blob using AES256 in CBC mode with 128 bit padding.
Wraps the generated content-encryption-key using the user-provided key-encryption-key (kek).
Returns a json-formatted string containing the encryption metadata. This method should
only be used when a blob is small enough for single shot upload. Encrypting larger blobs
is done as a part of the _upload_blob_chunks method.
:param bytes blob:
The blob to be encrypted.
:param object key_encryption_key:
The user-provided key-encryption-key. Must implement the following methods:
wrap_key(key)--wraps the specified key using an algorithm of the user's choice.
get_key_wrap_algorithm()--returns the algorithm used to wrap the specified symmetric key.
get_kid()--returns a string key id for this key-encryption-key.
:return: A tuple of json-formatted string containing the encryption metadata and the encrypted blob data.
:rtype: (str, bytes)
'''
_validate_not_none('blob', blob)
_validate_not_none('key_encryption_key', key_encryption_key)
_validate_key_encryption_key_wrap(key_encryption_key)
# AES256 uses 256 bit (32 byte) keys and always with 16 byte blocks
content_encryption_key = urandom(32)
initialization_vector = urandom(16)
cipher = _generate_AES_CBC_cipher(content_encryption_key, initialization_vector)
# PKCS7 with 16 byte blocks ensures compatibility with AES.
padder = PKCS7(128).padder()
padded_data = padder.update(blob) + padder.finalize()
# Encrypt the data.
encryptor = cipher.encryptor()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
encryption_data = _generate_encryption_data_dict(key_encryption_key, content_encryption_key,
initialization_vector)
encryption_data['EncryptionMode'] = 'FullBlob'
return dumps(encryption_data), encrypted_data
def _generate_blob_encryption_data(key_encryption_key):
'''
Generates the encryption_metadata for the blob.
:param bytes key_encryption_key:
The key-encryption-key used to wrap the cek associate with this blob.
:return: A tuple containing the cek and iv for this blob as well as the
serialized encryption metadata for the blob.
:rtype: (bytes, bytes, str)
'''
encryption_data = None
content_encryption_key = None
initialization_vector = None
if key_encryption_key:
_validate_key_encryption_key_wrap(key_encryption_key)
content_encryption_key = urandom(32)
initialization_vector = urandom(16)
encryption_data = _generate_encryption_data_dict(key_encryption_key,
content_encryption_key,
initialization_vector)
encryption_data['EncryptionMode'] = 'FullBlob'
encryption_data = dumps(encryption_data)
return (content_encryption_key, initialization_vector, encryption_data)
def _decrypt_blob(require_encryption, key_encryption_key, key_resolver,
response, start_offset, end_offset):
'''
Decrypts the given blob contents and returns only the requested range.
:param bool require_encryption:
Whether or not the calling blob service requires objects to be decrypted.
:param object key_encryption_key:
The user-provided key-encryption-key. Must implement the following methods:
wrap_key(key)--wraps the specified key using an algorithm of the user's choice.
get_key_wrap_algorithm()--returns the algorithm used to wrap the specified symmetric key.
get_kid()--returns a string key id for this key-encryption-key.
:param key_resolver(kid):
The user-provided key resolver. Uses the kid string to return a key-encryption-key
implementing the interface defined above.
:return: The decrypted blob content.
:rtype: bytes
'''
_validate_not_none('response', response)
content = response.body
_validate_not_none('content', content)
try:
encryption_data = _dict_to_encryption_data(loads(response.headers['x-ms-meta-encryptiondata']))
except:
if require_encryption:
raise ValueError(_ERROR_DATA_NOT_ENCRYPTED)
else:
return content
if not(encryption_data.encryption_agent.encryption_algorithm == _EncryptionAlgorithm.AES_CBC_256):
raise ValueError(_ERROR_UNSUPPORTED_ENCRYPTION_ALGORITHM)
blob_type = response.headers['x-ms-blob-type']
iv = None
unpad = False
start_range, end_range = 0, len(content)
if 'content-range' in response.headers:
range = response.headers['content-range']
# Format: 'bytes x-y/size'
# Ignore the word 'bytes'
range = range.split(' ')
range = range[1].split('-')
start_range = int(range[0])
range = range[1].split('/')
end_range = int(range[0])
blob_size = int(range[1])
if start_offset >= 16:
iv = content[:16]
content = content[16:]
start_offset -= 16
else:
iv = encryption_data.content_encryption_IV
if end_range == blob_size-1:
unpad = True
else:
unpad = True
iv = encryption_data.content_encryption_IV
if blob_type == 'PageBlob':
unpad = False
content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
cipher = _generate_AES_CBC_cipher(content_encryption_key, iv)
decryptor = cipher.decryptor()
content = decryptor.update(content) + decryptor.finalize()
if unpad:
unpadder = PKCS7(128).unpadder()
content = unpadder.update(content) + unpadder.finalize()
return content[start_offset : len(content) - end_offset]
def _get_blob_encryptor_and_padder(cek, iv, should_pad):
encryptor = None
padder = None
if cek is not None and iv is not None:
cipher = _generate_AES_CBC_cipher(cek, iv)
encryptor = cipher.encryptor()
padder = PKCS7(128).padder() if should_pad else None
return encryptor, padder
|