/usr/lib/python3/dist-packages/celery/tests/security/test_certificate.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | from __future__ import absolute_import
from celery.exceptions import SecurityError
from celery.security.certificate import Certificate, CertStore, FSCertStore
from . import CERT1, CERT2, KEY1
from .case import SecurityCase
from celery.tests.case import Mock, SkipTest, mock_open, patch
class test_Certificate(SecurityCase):
def test_valid_certificate(self):
Certificate(CERT1)
Certificate(CERT2)
def test_invalid_certificate(self):
self.assertRaises((SecurityError, TypeError), Certificate, None)
self.assertRaises(SecurityError, Certificate, '')
self.assertRaises(SecurityError, Certificate, 'foo')
self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
self.assertRaises(SecurityError, Certificate, KEY1)
def test_has_expired(self):
raise SkipTest('cert actually expired')
self.assertFalse(Certificate(CERT1).has_expired())
class test_CertStore(SecurityCase):
def test_itercerts(self):
cert1 = Certificate(CERT1)
cert2 = Certificate(CERT2)
certstore = CertStore()
for c in certstore.itercerts():
self.assertTrue(False)
certstore.add_cert(cert1)
certstore.add_cert(cert2)
for c in certstore.itercerts():
self.assertIn(c, (cert1, cert2))
def test_duplicate(self):
cert1 = Certificate(CERT1)
certstore = CertStore()
certstore.add_cert(cert1)
self.assertRaises(SecurityError, certstore.add_cert, cert1)
class test_FSCertStore(SecurityCase):
@patch('os.path.isdir')
@patch('glob.glob')
@patch('celery.security.certificate.Certificate')
def test_init(self, Certificate, glob, isdir):
cert = Certificate.return_value = Mock()
cert.has_expired.return_value = False
isdir.return_value = True
glob.return_value = ['foo.cert']
with mock_open():
cert.get_id.return_value = 1
x = FSCertStore('/var/certs')
self.assertIn(1, x._certs)
glob.assert_called_with('/var/certs/*')
# they both end up with the same id
glob.return_value = ['foo.cert', 'bar.cert']
with self.assertRaises(SecurityError):
x = FSCertStore('/var/certs')
glob.return_value = ['foo.cert']
cert.has_expired.return_value = True
with self.assertRaises(SecurityError):
x = FSCertStore('/var/certs')
isdir.return_value = False
with self.assertRaises(SecurityError):
x = FSCertStore('/var/certs')
|