This file is indexed.

/usr/lib/python2.7/dist-packages/azure/storage/blob/_encryption.py is in python-azure-storage 0.33.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#-------------------------------------------------------------------------
# Copyright (c) Microsoft.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#--------------------------------------------------------------------------

from os import urandom
from json import(
    dumps,
    loads,
)
from .._error import(
    _validate_not_none,
    _validate_key_encryption_key_wrap,
    _ERROR_DATA_NOT_ENCRYPTED,
)
from .._encryption import (
    _generate_encryption_data_dict,
    _generate_AES_CBC_cipher,
    _dict_to_encryption_data,
    _validate_and_unwrap_cek,
    _EncryptionAlgorithm,
)
from cryptography.hazmat.primitives.padding import PKCS7

def _encrypt_blob(blob, key_encryption_key):
    '''
    Encrypts the given blob using AES256 in CBC mode with 128 bit padding.
    Wraps the generated content-encryption-key using the user-provided key-encryption-key (kek). 
    Returns a json-formatted string containing the encryption metadata. This method should
    only be used when a blob is small enough for single shot upload. Encrypting larger blobs
    is done as a part of the _upload_blob_chunks method.

    :param bytes blob:
        The blob to be encrypted.
    :param object key_encryption_key:
        The user-provided key-encryption-key. Must implement the following methods:
        wrap_key(key)--wraps the specified key using an algorithm of the user's choice.
        get_key_wrap_algorithm()--returns the algorithm used to wrap the specified symmetric key.
        get_kid()--returns a string key id for this key-encryption-key.
    :return: A tuple of json-formatted string containing the encryption metadata and the encrypted blob data.
    :rtype: (str, bytes)
    '''

    _validate_not_none('blob', blob)
    _validate_not_none('key_encryption_key', key_encryption_key)
    _validate_key_encryption_key_wrap(key_encryption_key)

    # AES256 uses 256 bit (32 byte) keys and always with 16 byte blocks
    content_encryption_key = urandom(32)
    initialization_vector = urandom(16)

    cipher = _generate_AES_CBC_cipher(content_encryption_key, initialization_vector)

    # PKCS7 with 16 byte blocks ensures compatibility with AES.
    padder = PKCS7(128).padder()
    padded_data = padder.update(blob) + padder.finalize()

    # Encrypt the data.
    encryptor = cipher.encryptor()
    encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
    encryption_data = _generate_encryption_data_dict(key_encryption_key, content_encryption_key,
                                                initialization_vector)
    encryption_data['EncryptionMode'] = 'FullBlob'

    return dumps(encryption_data), encrypted_data

def _generate_blob_encryption_data(key_encryption_key):
    '''
    Generates the encryption_metadata for the blob.
    
    :param bytes key_encryption_key:
        The key-encryption-key used to wrap the cek associate with this blob.
    :return: A tuple containing the cek and iv for this blob as well as the 
        serialized encryption metadata for the blob.
    :rtype: (bytes, bytes, str)
    '''
    encryption_data = None
    content_encryption_key = None
    initialization_vector = None
    if key_encryption_key:
        _validate_key_encryption_key_wrap(key_encryption_key)
        content_encryption_key = urandom(32)
        initialization_vector = urandom(16)
        encryption_data = _generate_encryption_data_dict(key_encryption_key,
                                                            content_encryption_key,
                                                            initialization_vector)
        encryption_data['EncryptionMode'] = 'FullBlob'
        encryption_data = dumps(encryption_data)

    return (content_encryption_key, initialization_vector, encryption_data)

def _decrypt_blob(require_encryption, key_encryption_key, key_resolver, 
                  response, start_offset, end_offset):
    '''
    Decrypts the given blob contents and returns only the requested range.
    
    :param bool require_encryption:
        Whether or not the calling blob service requires objects to be decrypted.
    :param object key_encryption_key:
        The user-provided key-encryption-key. Must implement the following methods:
        wrap_key(key)--wraps the specified key using an algorithm of the user's choice.
        get_key_wrap_algorithm()--returns the algorithm used to wrap the specified symmetric key.
        get_kid()--returns a string key id for this key-encryption-key.
    :param key_resolver(kid):
        The user-provided key resolver. Uses the kid string to return a key-encryption-key 
        implementing the interface defined above.
    :return: The decrypted blob content.
    :rtype: bytes
    '''
    _validate_not_none('response', response)
    content = response.body
    _validate_not_none('content', content)
    
    try:
        encryption_data = _dict_to_encryption_data(loads(response.headers['x-ms-meta-encryptiondata']))
    except:
        if require_encryption:
            raise ValueError(_ERROR_DATA_NOT_ENCRYPTED)
        else:
            return content

    if not(encryption_data.encryption_agent.encryption_algorithm == _EncryptionAlgorithm.AES_CBC_256):
        raise ValueError(_ERROR_UNSUPPORTED_ENCRYPTION_ALGORITHM)

    blob_type = response.headers['x-ms-blob-type']

    iv = None
    unpad = False
    start_range, end_range = 0, len(content)
    if 'content-range' in response.headers:
        range = response.headers['content-range']
        # Format: 'bytes x-y/size'

        # Ignore the word 'bytes'
        range = range.split(' ')

        range = range[1].split('-')
        start_range = int(range[0])
        range = range[1].split('/')
        end_range = int(range[0])
        blob_size = int(range[1])

        if start_offset >= 16:
            iv = content[:16]
            content = content[16:]
            start_offset -= 16
        else:
            iv = encryption_data.content_encryption_IV

        if end_range == blob_size-1:
            unpad = True
    else:
        unpad = True
        iv = encryption_data.content_encryption_IV

    if blob_type == 'PageBlob':
        unpad = False

    content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
    cipher = _generate_AES_CBC_cipher(content_encryption_key, iv)
    decryptor = cipher.decryptor()
    
    content = decryptor.update(content) + decryptor.finalize()
    if unpad:
        unpadder = PKCS7(128).unpadder()
        content = unpadder.update(content) + unpadder.finalize()

    return content[start_offset : len(content) - end_offset]

def _get_blob_encryptor_and_padder(cek, iv, should_pad):
    encryptor = None
    padder = None
        
    if cek is not None and iv is not None:
        cipher = _generate_AES_CBC_cipher(cek, iv)
        encryptor = cipher.encryptor()
        padder = PKCS7(128).padder() if should_pad else None

    return encryptor, padder