/usr/lib/python2.7/dist-packages/celery/tests/concurrency/test_threads.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | from __future__ import absolute_import
from celery.concurrency.threads import NullDict, TaskPool, apply_target
from celery.tests.case import AppCase, Case, Mock, mask_modules, mock_module
class test_NullDict(Case):
def test_setitem(self):
x = NullDict()
x['foo'] = 1
with self.assertRaises(KeyError):
x['foo']
class test_TaskPool(AppCase):
def test_without_threadpool(self):
with mask_modules('threadpool'):
with self.assertRaises(ImportError):
TaskPool()
def test_with_threadpool(self):
with mock_module('threadpool'):
x = TaskPool()
self.assertTrue(x.ThreadPool)
self.assertTrue(x.WorkRequest)
def test_on_start(self):
with mock_module('threadpool'):
x = TaskPool()
x.on_start()
self.assertTrue(x._pool)
self.assertIsInstance(x._pool.workRequests, NullDict)
def test_on_stop(self):
with mock_module('threadpool'):
x = TaskPool()
x.on_start()
x.on_stop()
x._pool.dismissWorkers.assert_called_with(x.limit, do_join=True)
def test_on_apply(self):
with mock_module('threadpool'):
x = TaskPool()
x.on_start()
callback = Mock()
accept_callback = Mock()
target = Mock()
req = x.on_apply(target, args=(1, 2), kwargs={'a': 10},
callback=callback,
accept_callback=accept_callback)
x.WorkRequest.assert_called_with(
apply_target,
(target, (1, 2), {'a': 10}, callback, accept_callback),
)
x._pool.putRequest.assert_called_with(req)
x._pool._results_queue.queue.clear.assert_called_with()
|