This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/concurrency/test_concurrency.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
from __future__ import absolute_import

import os

from itertools import count

from celery.concurrency.base import apply_target, BasePool
from celery.tests.case import AppCase, Mock


class test_BasePool(AppCase):

    def test_apply_target(self):

        scratch = {}
        counter = count(0)

        def gen_callback(name, retval=None):

            def callback(*args):
                scratch[name] = (next(counter), args)
                return retval

            return callback

        apply_target(gen_callback('target', 42),
                     args=(8, 16),
                     callback=gen_callback('callback'),
                     accept_callback=gen_callback('accept_callback'))

        self.assertDictContainsSubset(
            {'target': (1, (8, 16)), 'callback': (2, (42, ))},
            scratch,
        )
        pa1 = scratch['accept_callback']
        self.assertEqual(0, pa1[0])
        self.assertEqual(pa1[1][0], os.getpid())
        self.assertTrue(pa1[1][1])

        # No accept callback
        scratch.clear()
        apply_target(gen_callback('target', 42),
                     args=(8, 16),
                     callback=gen_callback('callback'),
                     accept_callback=None)
        self.assertDictEqual(scratch,
                             {'target': (3, (8, 16)),
                              'callback': (4, (42, ))})

    def test_does_not_debug(self):
        x = BasePool(10)
        x._does_debug = False
        x.apply_async(object)

    def test_num_processes(self):
        self.assertEqual(BasePool(7).num_processes, 7)

    def test_interface_on_start(self):
        BasePool(10).on_start()

    def test_interface_on_stop(self):
        BasePool(10).on_stop()

    def test_interface_on_apply(self):
        BasePool(10).on_apply()

    def test_interface_info(self):
        self.assertDictEqual(BasePool(10).info, {})

    def test_active(self):
        p = BasePool(10)
        self.assertFalse(p.active)
        p._state = p.RUN
        self.assertTrue(p.active)

    def test_restart(self):
        p = BasePool(10)
        with self.assertRaises(NotImplementedError):
            p.restart()

    def test_interface_on_terminate(self):
        p = BasePool(10)
        p.on_terminate()

    def test_interface_terminate_job(self):
        with self.assertRaises(NotImplementedError):
            BasePool(10).terminate_job(101)

    def test_interface_did_start_ok(self):
        self.assertTrue(BasePool(10).did_start_ok())

    def test_interface_register_with_event_loop(self):
        self.assertIsNone(
            BasePool(10).register_with_event_loop(Mock()),
        )

    def test_interface_on_soft_timeout(self):
        self.assertIsNone(BasePool(10).on_soft_timeout(Mock()))

    def test_interface_on_hard_timeout(self):
        self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))

    def test_interface_close(self):
        p = BasePool(10)
        p.on_close = Mock()
        p.close()
        self.assertEqual(p._state, p.CLOSE)
        p.on_close.assert_called_with()

    def test_interface_no_close(self):
        self.assertIsNone(BasePool(10).on_close())