This file is indexed.

/usr/lib/python2.7/dist-packages/celery/tests/concurrency/test_pool.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from __future__ import absolute_import

import time
import itertools

from billiard.einfo import ExceptionInfo

from celery.tests.case import AppCase, SkipTest


def do_something(i):
    return i * i


def long_something():
    time.sleep(1)


def raise_something(i):
    try:
        raise KeyError('FOO EXCEPTION')
    except KeyError:
        return ExceptionInfo()


class test_TaskPool(AppCase):

    def setup(self):
        try:
            __import__('multiprocessing')
        except ImportError:
            raise SkipTest('multiprocessing not supported')
        from celery.concurrency.prefork import TaskPool
        self.TaskPool = TaskPool

    def test_attrs(self):
        p = self.TaskPool(2)
        self.assertEqual(p.limit, 2)
        self.assertIsNone(p._pool)

    def x_apply(self):
        p = self.TaskPool(2)
        p.start()
        scratchpad = {}
        proc_counter = itertools.count()

        def mycallback(ret_value):
            process = next(proc_counter)
            scratchpad[process] = {}
            scratchpad[process]['ret_value'] = ret_value

        myerrback = mycallback

        res = p.apply_async(do_something, args=[10], callback=mycallback)
        res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
        res3 = p.apply_async(do_something, args=[20], callback=mycallback)

        self.assertEqual(res.get(), 100)
        time.sleep(0.5)
        self.assertDictContainsSubset({'ret_value': 100},
                                      scratchpad.get(0))

        self.assertIsInstance(res2.get(), ExceptionInfo)
        self.assertTrue(scratchpad.get(1))
        time.sleep(1)
        self.assertIsInstance(scratchpad[1]['ret_value'],
                              ExceptionInfo)
        self.assertEqual(scratchpad[1]['ret_value'].exception.args,
                         ('FOO EXCEPTION', ))

        self.assertEqual(res3.get(), 400)
        time.sleep(0.5)
        self.assertDictContainsSubset({'ret_value': 400},
                                      scratchpad.get(2))

        res3 = p.apply_async(do_something, args=[30], callback=mycallback)

        self.assertEqual(res3.get(), 900)
        time.sleep(0.5)
        self.assertDictContainsSubset({'ret_value': 900},
                                      scratchpad.get(3))
        p.stop()