This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/security/test_certificate.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from __future__ import absolute_import

from celery.exceptions import SecurityError
from celery.security.certificate import Certificate, CertStore, FSCertStore

from . import CERT1, CERT2, KEY1
from .case import SecurityCase

from celery.tests.case import Mock, SkipTest, mock_open, patch


class test_Certificate(SecurityCase):

    def test_valid_certificate(self):
        Certificate(CERT1)
        Certificate(CERT2)

    def test_invalid_certificate(self):
        self.assertRaises((SecurityError, TypeError), Certificate, None)
        self.assertRaises(SecurityError, Certificate, '')
        self.assertRaises(SecurityError, Certificate, 'foo')
        self.assertRaises(SecurityError, Certificate, CERT1[:20] + CERT1[21:])
        self.assertRaises(SecurityError, Certificate, KEY1)

    def test_has_expired(self):
        raise SkipTest('cert actually expired')
        self.assertFalse(Certificate(CERT1).has_expired())


class test_CertStore(SecurityCase):

    def test_itercerts(self):
        cert1 = Certificate(CERT1)
        cert2 = Certificate(CERT2)
        certstore = CertStore()
        for c in certstore.itercerts():
            self.assertTrue(False)
        certstore.add_cert(cert1)
        certstore.add_cert(cert2)
        for c in certstore.itercerts():
            self.assertIn(c, (cert1, cert2))

    def test_duplicate(self):
        cert1 = Certificate(CERT1)
        certstore = CertStore()
        certstore.add_cert(cert1)
        self.assertRaises(SecurityError, certstore.add_cert, cert1)


class test_FSCertStore(SecurityCase):

    @patch('os.path.isdir')
    @patch('glob.glob')
    @patch('celery.security.certificate.Certificate')
    def test_init(self, Certificate, glob, isdir):
        cert = Certificate.return_value = Mock()
        cert.has_expired.return_value = False
        isdir.return_value = True
        glob.return_value = ['foo.cert']
        with mock_open():
            cert.get_id.return_value = 1
            x = FSCertStore('/var/certs')
            self.assertIn(1, x._certs)
            glob.assert_called_with('/var/certs/*')

            # they both end up with the same id
            glob.return_value = ['foo.cert', 'bar.cert']
            with self.assertRaises(SecurityError):
                x = FSCertStore('/var/certs')
            glob.return_value = ['foo.cert']

            cert.has_expired.return_value = True
            with self.assertRaises(SecurityError):
                x = FSCertStore('/var/certs')

            isdir.return_value = False
            with self.assertRaises(SecurityError):
                x = FSCertStore('/var/certs')