/usr/lib/python2.7/dist-packages/elixir/ext/encrypted.py is in python-elixir 0.7.1-4build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | '''
An encryption plugin for Elixir utilizing the excellent PyCrypto library, which
can be downloaded here: http://www.amk.ca/python/code/crypto
Values for columns that are specified to be encrypted will be transparently
encrypted and safely encoded for storage in a unicode column using the powerful
and secure Blowfish Cipher using a specified "secret" which can be passed into
the plugin at class declaration time.
Example usage:
.. sourcecode:: python
from elixir import *
from elixir.ext.encrypted import acts_as_encrypted
class Person(Entity):
name = Field(Unicode)
password = Field(Unicode)
ssn = Field(Unicode)
acts_as_encrypted(for_fields=['password', 'ssn'],
with_secret='secret')
The above Person entity will automatically encrypt and decrypt the password and
ssn columns on save, update, and load. Different secrets can be specified on
an entity by entity basis, for added security.
**Important note**: instance attributes are encrypted in-place. This means that
if one of the encrypted attributes of an instance is accessed after the
instance has been flushed to the database (and thus encrypted), the value for
that attribute will be crypted in the in-memory object in addition to the
database row.
'''
import sys
import os
from Crypto.Cipher import Blowfish, AES
from elixir.statements import Statement
from sqlalchemy.orm import MapperExtension, EXT_CONTINUE, EXT_STOP
try:
from sqlalchemy.orm import EXT_PASS
SA05orlater = False
except ImportError:
SA05orlater = True
__all__ = ['acts_as_encrypted']
__doc_all__ = []
#
# encryption and decryption functions
#
# WARNING!!! Blowfish encryption method is vulnerable to attacks
# because it doesn't properly use random seed. It is provided just for
# backward compatibility needed to migrate data. Use AES instead!
def encrypt_value(value, secret):
return Blowfish.new(secret, Blowfish.MODE_CFB) \
.encrypt(value).encode('string_escape')
def decrypt_value(value, secret):
return Blowfish.new(secret, Blowfish.MODE_CFB) \
.decrypt(value.decode('string_escape'))
# Crypto.Cipher.AES is AES128
def encrypt_value_aes(value, secret):
iv = os.urandom(AES.block_size)
pad_len = AES.block_size - len(value) % AES.block_size
padded_value = value + pad_len * chr(pad_len)
res = iv + AES.new(secret, AES.MODE_CBC, iv).encrypt(padded_value)
return res.encode('string_escape')
def decrypt_value_aes(value, secret):
value = value.decode('string_escape')
iv = value[:AES.block_size]
encrypted = value[AES.block_size:]
padded_value = AES.new(secret, AES.MODE_CBC, iv).decrypt(encrypted)
pad_len = ord(padded_value[-1])
assert pad_len >= 1 and pad_len <= AES.block_size
return padded_value[:-pad_len]
#
# acts_as_encrypted statement
#
class ActsAsEncrypted(object):
def __init__(self, entity, for_fields=[], with_secret='abcdef', with_aes=False):
if not with_aes:
sys.stderr.write("""******* WARNING!!! ********
Blowfish encryption method is vulnerable to attacks.
Migrate your data and use with_aes=True\n""")
def perform_encryption(instance, encrypt=True):
encrypted = getattr(instance, '_elixir_encrypted', None)
if encrypted is encrypt:
# skipping encryption or decryption, as it is already done
return
else:
# marking instance as already encrypted/decrypted
instance._elixir_encrypted = encrypt
if encrypt:
if with_aes:
func = encrypt_value_aes
else:
func = encrypt_value
else:
if with_aes:
func = decrypt_value_aes
else:
func = decrypt_value
for column_name in for_fields:
current_value = getattr(instance, column_name)
if current_value:
setattr(instance, column_name,
func(current_value, with_secret))
def perform_decryption(instance):
perform_encryption(instance, encrypt=False)
class EncryptedMapperExtension(MapperExtension):
def before_insert(self, mapper, connection, instance):
perform_encryption(instance)
return EXT_CONTINUE
def before_update(self, mapper, connection, instance):
perform_encryption(instance)
return EXT_CONTINUE
if SA05orlater:
def reconstruct_instance(self, mapper, instance):
perform_decryption(instance)
# no special return value is required for
# reconstruct_instance, but you never know...
return EXT_CONTINUE
else:
def populate_instance(self, mapper, selectcontext, row,
instance, *args, **kwargs):
mapper.populate_instance(selectcontext, instance, row,
*args, **kwargs)
perform_decryption(instance)
# EXT_STOP because we already did populate the instance and
# the normal processing should not happen
return EXT_STOP
# make sure that the entity's mapper has our mapper extension
entity._descriptor.add_mapper_extension(EncryptedMapperExtension())
acts_as_encrypted = Statement(ActsAsEncrypted)
|