/usr/lib/python3/dist-packages/celery/tests/concurrency/test_pool.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | from __future__ import absolute_import
import time
import itertools
from billiard.einfo import ExceptionInfo
from celery.tests.case import AppCase, SkipTest
def do_something(i):
return i * i
def long_something():
time.sleep(1)
def raise_something(i):
try:
raise KeyError('FOO EXCEPTION')
except KeyError:
return ExceptionInfo()
class test_TaskPool(AppCase):
def setup(self):
try:
__import__('multiprocessing')
except ImportError:
raise SkipTest('multiprocessing not supported')
from celery.concurrency.prefork import TaskPool
self.TaskPool = TaskPool
def test_attrs(self):
p = self.TaskPool(2)
self.assertEqual(p.limit, 2)
self.assertIsNone(p._pool)
def x_apply(self):
p = self.TaskPool(2)
p.start()
scratchpad = {}
proc_counter = itertools.count()
def mycallback(ret_value):
process = next(proc_counter)
scratchpad[process] = {}
scratchpad[process]['ret_value'] = ret_value
myerrback = mycallback
res = p.apply_async(do_something, args=[10], callback=mycallback)
res2 = p.apply_async(raise_something, args=[10], errback=myerrback)
res3 = p.apply_async(do_something, args=[20], callback=mycallback)
self.assertEqual(res.get(), 100)
time.sleep(0.5)
self.assertDictContainsSubset({'ret_value': 100},
scratchpad.get(0))
self.assertIsInstance(res2.get(), ExceptionInfo)
self.assertTrue(scratchpad.get(1))
time.sleep(1)
self.assertIsInstance(scratchpad[1]['ret_value'],
ExceptionInfo)
self.assertEqual(scratchpad[1]['ret_value'].exception.args,
('FOO EXCEPTION', ))
self.assertEqual(res3.get(), 400)
time.sleep(0.5)
self.assertDictContainsSubset({'ret_value': 400},
scratchpad.get(2))
res3 = p.apply_async(do_something, args=[30], callback=mycallback)
self.assertEqual(res3.get(), 900)
time.sleep(0.5)
self.assertDictContainsSubset({'ret_value': 900},
scratchpad.get(3))
p.stop()
|