This file is indexed.

/usr/share/pyshared/celery/security/serialization.py is in python-celery 2.5.3-4.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import absolute_import

import base64
import sys

from kombu.serialization import registry, encode, decode

from ..exceptions import SecurityError
from ..utils.encoding import bytes_to_str, str_to_bytes

from .certificate import Certificate, FSCertStore
from .key import PrivateKey


def b64encode(s):
    return bytes_to_str(base64.b64encode(str_to_bytes(s)))


def b64decode(s):
    return base64.b64decode(str_to_bytes(s))


class SecureSerializer(object):

    def __init__(self, key=None, cert=None, cert_store=None,
            digest="sha1", serializer="json"):
        self._key = key
        self._cert = cert
        self._cert_store = cert_store
        self._digest = digest
        self._serializer = serializer

    def serialize(self, data):
        """serialize data structure into string"""
        assert self._key is not None
        assert self._cert is not None
        try:
            content_type, content_encoding, body = encode(
                    data, serializer=self._serializer)
            # What we sign is the serialized body, not the body itself.
            # this way the receiver doesn't have to decode the contents
            # to verify the signature (and thus avoiding potential flaws
            # in the decoding step).
            return self._pack(body, content_type, content_encoding,
                              signature=self._key.sign(body, self._digest),
                              signer=self._cert.get_id())
        except Exception, exc:
            raise SecurityError, SecurityError(
                    "Unable to serialize: %r" % (exc, )), sys.exc_info()[2]

    def deserialize(self, data):
        """deserialize data structure from string"""
        assert self._cert_store is not None
        try:
            payload = self._unpack(data)
            signature, signer, body = (payload["signature"],
                                       payload["signer"],
                                       payload["body"])
            self._cert_store[signer].verify(body,
                                            signature, self._digest)
        except Exception, exc:
            raise SecurityError, SecurityError(
                    "Unable to deserialize: %r" % (exc, )), sys.exc_info()[2]

        return decode(body, payload["content_type"],
                            payload["content_encoding"], force=True)

    def _pack(self, body, content_type, content_encoding, signer, signature,
            sep='\x00\x01'):
        return b64encode(sep.join([signer, signature,
                                   content_type, content_encoding, body]))

    def _unpack(self, payload, sep='\x00\x01',
            fields=("signer", "signature", "content_type",
                    "content_encoding", "body")):
        return dict(zip(fields, b64decode(payload).split(sep)))


def register_auth(key=None, cert=None, store=None, digest="sha1",
        serializer="json"):
    """register security serializer"""
    s = SecureSerializer(key and PrivateKey(key),
                         cert and Certificate(cert),
                         store and FSCertStore(store),
                         digest=digest, serializer=serializer)
    registry.register("auth", s.serialize, s.deserialize,
                      content_type="application/data",
                      content_encoding="utf-8")