/usr/lib/python3/dist-packages/celery/tests/worker/test_components.py is in python3-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | from __future__ import absolute_import
# some of these are tested in test_worker, so I've only written tests
# here to complete coverage. Should move everyting to this module at some
# point [-ask]
from celery.worker.components import (
Queues,
Pool,
)
from celery.tests.case import AppCase, Mock
class test_Queues(AppCase):
def test_create_when_eventloop(self):
w = Mock()
w.use_eventloop = w.pool_putlocks = w.pool_cls.uses_semaphore = True
q = Queues(w)
q.create(w)
self.assertIs(w.process_task, w._process_task_sem)
class test_Pool(AppCase):
def test_close_terminate(self):
w = Mock()
comp = Pool(w)
pool = w.pool = Mock()
comp.close(w)
pool.close.assert_called_with()
comp.terminate(w)
pool.terminate.assert_called_with()
w.pool = None
comp.close(w)
comp.terminate(w)
|