/usr/share/pyshared/celery/security/serialization.py is in python-celery 2.5.3-4.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | from __future__ import absolute_import
import base64
import sys
from kombu.serialization import registry, encode, decode
from ..exceptions import SecurityError
from ..utils.encoding import bytes_to_str, str_to_bytes
from .certificate import Certificate, FSCertStore
from .key import PrivateKey
def b64encode(s):
return bytes_to_str(base64.b64encode(str_to_bytes(s)))
def b64decode(s):
return base64.b64decode(str_to_bytes(s))
class SecureSerializer(object):
def __init__(self, key=None, cert=None, cert_store=None,
digest="sha1", serializer="json"):
self._key = key
self._cert = cert
self._cert_store = cert_store
self._digest = digest
self._serializer = serializer
def serialize(self, data):
"""serialize data structure into string"""
assert self._key is not None
assert self._cert is not None
try:
content_type, content_encoding, body = encode(
data, serializer=self._serializer)
# What we sign is the serialized body, not the body itself.
# this way the receiver doesn't have to decode the contents
# to verify the signature (and thus avoiding potential flaws
# in the decoding step).
return self._pack(body, content_type, content_encoding,
signature=self._key.sign(body, self._digest),
signer=self._cert.get_id())
except Exception, exc:
raise SecurityError, SecurityError(
"Unable to serialize: %r" % (exc, )), sys.exc_info()[2]
def deserialize(self, data):
"""deserialize data structure from string"""
assert self._cert_store is not None
try:
payload = self._unpack(data)
signature, signer, body = (payload["signature"],
payload["signer"],
payload["body"])
self._cert_store[signer].verify(body,
signature, self._digest)
except Exception, exc:
raise SecurityError, SecurityError(
"Unable to deserialize: %r" % (exc, )), sys.exc_info()[2]
return decode(body, payload["content_type"],
payload["content_encoding"], force=True)
def _pack(self, body, content_type, content_encoding, signer, signature,
sep='\x00\x01'):
return b64encode(sep.join([signer, signature,
content_type, content_encoding, body]))
def _unpack(self, payload, sep='\x00\x01',
fields=("signer", "signature", "content_type",
"content_encoding", "body")):
return dict(zip(fields, b64decode(payload).split(sep)))
def register_auth(key=None, cert=None, store=None, digest="sha1",
serializer="json"):
"""register security serializer"""
s = SecureSerializer(key and PrivateKey(key),
cert and Certificate(cert),
store and FSCertStore(store),
digest=digest, serializer=serializer)
registry.register("auth", s.serialize, s.deserialize,
content_type="application/data",
content_encoding="utf-8")
|