/usr/lib/python3/dist-packages/celery/tests/concurrency/test_concurrency.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | from __future__ import absolute_import
import os
from itertools import count
from celery.concurrency.base import apply_target, BasePool
from celery.tests.case import AppCase, Mock
class test_BasePool(AppCase):
def test_apply_target(self):
scratch = {}
counter = count(0)
def gen_callback(name, retval=None):
def callback(*args):
scratch[name] = (next(counter), args)
return retval
return callback
apply_target(gen_callback('target', 42),
args=(8, 16),
callback=gen_callback('callback'),
accept_callback=gen_callback('accept_callback'))
self.assertDictContainsSubset(
{'target': (1, (8, 16)), 'callback': (2, (42, ))},
scratch,
)
pa1 = scratch['accept_callback']
self.assertEqual(0, pa1[0])
self.assertEqual(pa1[1][0], os.getpid())
self.assertTrue(pa1[1][1])
# No accept callback
scratch.clear()
apply_target(gen_callback('target', 42),
args=(8, 16),
callback=gen_callback('callback'),
accept_callback=None)
self.assertDictEqual(scratch,
{'target': (3, (8, 16)),
'callback': (4, (42, ))})
def test_does_not_debug(self):
x = BasePool(10)
x._does_debug = False
x.apply_async(object)
def test_num_processes(self):
self.assertEqual(BasePool(7).num_processes, 7)
def test_interface_on_start(self):
BasePool(10).on_start()
def test_interface_on_stop(self):
BasePool(10).on_stop()
def test_interface_on_apply(self):
BasePool(10).on_apply()
def test_interface_info(self):
self.assertDictEqual(BasePool(10).info, {})
def test_active(self):
p = BasePool(10)
self.assertFalse(p.active)
p._state = p.RUN
self.assertTrue(p.active)
def test_restart(self):
p = BasePool(10)
with self.assertRaises(NotImplementedError):
p.restart()
def test_interface_on_terminate(self):
p = BasePool(10)
p.on_terminate()
def test_interface_terminate_job(self):
with self.assertRaises(NotImplementedError):
BasePool(10).terminate_job(101)
def test_interface_did_start_ok(self):
self.assertTrue(BasePool(10).did_start_ok())
def test_interface_register_with_event_loop(self):
self.assertIsNone(
BasePool(10).register_with_event_loop(Mock()),
)
def test_interface_on_soft_timeout(self):
self.assertIsNone(BasePool(10).on_soft_timeout(Mock()))
def test_interface_on_hard_timeout(self):
self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))
def test_interface_close(self):
p = BasePool(10)
p.on_close = Mock()
p.close()
self.assertEqual(p._state, p.CLOSE)
p.on_close.assert_called_with()
def test_interface_no_close(self):
self.assertIsNone(BasePool(10).on_close())
|