/usr/lib/python2.7/dist-packages/celery/contrib/testing/worker.py is in python-celery 4.1.0-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """Embedded workers for integration tests."""
from __future__ import absolute_import, unicode_literals
import os
import threading
from contextlib import contextmanager
from celery import worker
from celery.result import allow_join_result, _set_task_join_will_block
from celery.utils.dispatch import Signal
from celery.utils.nodenames import anon_nodename
WORKER_LOGLEVEL = os.environ.get('WORKER_LOGLEVEL', 'error')
test_worker_starting = Signal(
name='test_worker_starting',
providing_args={},
)
test_worker_started = Signal(
name='test_worker_started',
providing_args={'worker', 'consumer'},
)
test_worker_stopped = Signal(
name='test_worker_stopped',
providing_args={'worker'},
)
class TestWorkController(worker.WorkController):
"""Worker that can synchronize on being fully started."""
def __init__(self, *args, **kwargs):
# type: (*Any, **Any) -> None
self._on_started = threading.Event()
super(TestWorkController, self).__init__(*args, **kwargs)
def on_consumer_ready(self, consumer):
# type: (celery.worker.consumer.Consumer) -> None
"""Callback called when the Consumer blueprint is fully started."""
self._on_started.set()
test_worker_started.send(
sender=self.app, worker=self, consumer=consumer)
def ensure_started(self):
# type: () -> None
"""Wait for worker to be fully up and running.
Warning:
Worker must be started within a thread for this to work,
or it will block forever.
"""
self._on_started.wait()
@contextmanager
def start_worker(app,
concurrency=1,
pool='solo',
loglevel=WORKER_LOGLEVEL,
logfile=None,
perform_ping_check=True,
ping_task_timeout=10.0,
**kwargs):
# type: (Celery, int, str, Union[str, int],
# str, bool, float, **Any) -> # Iterable
"""Start embedded worker.
Yields:
celery.app.worker.Worker: worker instance.
"""
test_worker_starting.send(sender=app)
with _start_worker_thread(app,
concurrency=concurrency,
pool=pool,
loglevel=loglevel,
logfile=logfile,
**kwargs) as worker:
if perform_ping_check:
from .tasks import ping
with allow_join_result():
assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
yield worker
test_worker_stopped.send(sender=app, worker=worker)
@contextmanager
def _start_worker_thread(app,
concurrency=1,
pool='solo',
loglevel=WORKER_LOGLEVEL,
logfile=None,
WorkController=TestWorkController,
**kwargs):
# type: (Celery, int, str, Union[str, int], str, Any, **Any) -> Iterable
"""Start Celery worker in a thread.
Yields:
celery.worker.Worker: worker instance.
"""
setup_app_for_worker(app, loglevel, logfile)
assert 'celery.ping' in app.tasks
# Make sure we can connect to the broker
with app.connection() as conn:
conn.default_channel.queue_declare
worker = WorkController(
app=app,
concurrency=concurrency,
hostname=anon_nodename(),
pool=pool,
loglevel=loglevel,
logfile=logfile,
# not allowed to override TestWorkController.on_consumer_ready
ready_callback=None,
without_heartbeat=True,
without_mingle=True,
without_gossip=True,
**kwargs)
t = threading.Thread(target=worker.start)
t.start()
worker.ensure_started()
_set_task_join_will_block(False)
yield worker
from celery.worker import state
state.should_terminate = 0
t.join(10)
state.should_terminate = None
@contextmanager
def _start_worker_process(app,
concurrency=1,
pool='solo',
loglevel=WORKER_LOGLEVEL,
logfile=None,
**kwargs):
# type (Celery, int, str, Union[int, str], str, **Any) -> Iterable
"""Start worker in separate process.
Yields:
celery.app.worker.Worker: worker instance.
"""
from celery.apps.multi import Cluster, Node
app.set_current()
cluster = Cluster([Node('testworker1@%h')])
cluster.start()
yield
cluster.stopwait()
def setup_app_for_worker(app, loglevel, logfile):
# type: (Celery, Union[str, int], str) -> None
"""Setup the app to be used for starting an embedded worker."""
app.finalize()
app.set_current()
app.set_default()
type(app.log)._setup = False
app.log.setup(loglevel=loglevel, logfile=logfile)
|