/usr/lib/python2.7/dist-packages/celery/security/certificate.py is in python-celery 3.1.6-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | # -*- coding: utf-8 -*-
"""
celery.security.certificate
~~~~~~~~~~~~~~~~~~~~~~~~~~~
X.509 certificates.
"""
from __future__ import absolute_import
import glob
import os
from kombu.utils.encoding import bytes_to_str
from celery.exceptions import SecurityError
from celery.five import values
from .utils import crypto, reraise_errors
__all__ = ['Certificate', 'CertStore', 'FSCertStore']
class Certificate(object):
"""X.509 certificate."""
def __init__(self, cert):
assert crypto is not None
with reraise_errors('Invalid certificate: {0!r}'):
self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
def has_expired(self):
"""Check if the certificate has expired."""
return self._cert.has_expired()
def get_serial_number(self):
"""Return the serial number in the certificate."""
return self._cert.get_serial_number()
def get_issuer(self):
"""Return issuer (CA) as a string"""
return ' '.join(bytes_to_str(x[1]) for x in
self._cert.get_issuer().get_components())
def get_id(self):
"""Serial number/issuer pair uniquely identifies a certificate"""
return '{0} {1}'.format(self.get_issuer(), self.get_serial_number())
def verify(self, data, signature, digest):
"""Verifies the signature for string containing data."""
with reraise_errors('Bad signature: {0!r}'):
crypto.verify(self._cert, signature, data, digest)
class CertStore(object):
"""Base class for certificate stores"""
def __init__(self):
self._certs = {}
def itercerts(self):
"""an iterator over the certificates"""
for c in values(self._certs):
yield c
def __getitem__(self, id):
"""get certificate by id"""
try:
return self._certs[id]
except KeyError:
raise SecurityError('Unknown certificate: {0!r}'.format(id))
def add_cert(self, cert):
if cert.get_id() in self._certs:
raise SecurityError('Duplicate certificate: {0!r}'.format(id))
self._certs[cert.get_id()] = cert
class FSCertStore(CertStore):
"""File system certificate store"""
def __init__(self, path):
CertStore.__init__(self)
if os.path.isdir(path):
path = os.path.join(path, '*')
for p in glob.glob(path):
with open(p) as f:
cert = Certificate(f.read())
if cert.has_expired():
raise SecurityError(
'Expired certificate: {0!r}'.format(cert.get_id()))
self.add_cert(cert)
|