This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/utils/test_threads.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
from __future__ import absolute_import

from celery.utils.threads import (
    _LocalStack,
    _FastLocalStack,
    LocalManager,
    Local,
    bgThread,
)

from celery.tests.case import Case, override_stdouts, patch


class test_bgThread(Case):

    def test_crash(self):

        class T(bgThread):

            def body(self):
                raise KeyError()

        with patch('os._exit') as _exit:
            with override_stdouts():
                _exit.side_effect = ValueError()
                t = T()
                with self.assertRaises(ValueError):
                    t.run()
                _exit.assert_called_with(1)

    def test_interface(self):
        x = bgThread()
        with self.assertRaises(NotImplementedError):
            x.body()


class test_Local(Case):

    def test_iter(self):
        x = Local()
        x.foo = 'bar'
        ident = x.__ident_func__()
        self.assertIn((ident, {'foo': 'bar'}), list(iter(x)))

        delattr(x, 'foo')
        self.assertNotIn((ident, {'foo': 'bar'}), list(iter(x)))
        with self.assertRaises(AttributeError):
            delattr(x, 'foo')

        self.assertIsNotNone(x(lambda: 'foo'))


class test_LocalStack(Case):

    def test_stack(self):
        x = _LocalStack()
        self.assertIsNone(x.pop())
        x.__release_local__()
        ident = x.__ident_func__
        x.__ident_func__ = ident

        with self.assertRaises(RuntimeError):
            x()[0]

        x.push(['foo'])
        self.assertEqual(x()[0], 'foo')
        x.pop()
        with self.assertRaises(RuntimeError):
            x()[0]


class test_FastLocalStack(Case):

    def test_stack(self):
        x = _FastLocalStack()
        x.push(['foo'])
        x.push(['bar'])
        self.assertEqual(x.top, ['bar'])
        self.assertEqual(len(x), 2)
        x.pop()
        self.assertEqual(x.top, ['foo'])
        x.pop()
        self.assertIsNone(x.top)


class test_LocalManager(Case):

    def test_init(self):
        x = LocalManager()
        self.assertListEqual(x.locals, [])
        self.assertTrue(x.ident_func)

        def ident():
            return 1

        loc = Local()
        x = LocalManager([loc], ident_func=ident)
        self.assertListEqual(x.locals, [loc])
        x = LocalManager(loc, ident_func=ident)
        self.assertListEqual(x.locals, [loc])
        self.assertIs(x.ident_func, ident)
        self.assertIs(x.locals[0].__ident_func__, ident)
        self.assertEqual(x.get_ident(), 1)

        with patch('celery.utils.threads.release_local') as release:
            x.cleanup()
            release.assert_called_with(loc)

        self.assertTrue(repr(x))