This file is indexed.

/usr/lib/python2.7/dist-packages/celery/contrib/testing/worker.py is in python-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""Embedded workers for integration tests."""
from __future__ import absolute_import, unicode_literals
import os
import threading
from contextlib import contextmanager
from celery import worker
from celery.result import allow_join_result, _set_task_join_will_block
from celery.utils.dispatch import Signal
from celery.utils.nodenames import anon_nodename

WORKER_LOGLEVEL = os.environ.get('WORKER_LOGLEVEL', 'error')

test_worker_starting = Signal(
    name='test_worker_starting',
    providing_args={},
)
test_worker_started = Signal(
    name='test_worker_started',
    providing_args={'worker', 'consumer'},
)
test_worker_stopped = Signal(
    name='test_worker_stopped',
    providing_args={'worker'},
)


class TestWorkController(worker.WorkController):
    """Worker that can synchronize on being fully started."""

    def __init__(self, *args, **kwargs):
        # type: (*Any, **Any) -> None
        self._on_started = threading.Event()
        super(TestWorkController, self).__init__(*args, **kwargs)

    def on_consumer_ready(self, consumer):
        # type: (celery.worker.consumer.Consumer) -> None
        """Callback called when the Consumer blueprint is fully started."""
        self._on_started.set()
        test_worker_started.send(
            sender=self.app, worker=self, consumer=consumer)

    def ensure_started(self):
        # type: () -> None
        """Wait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        """
        self._on_started.wait()


@contextmanager
def start_worker(app,
                 concurrency=1,
                 pool='solo',
                 loglevel=WORKER_LOGLEVEL,
                 logfile=None,
                 perform_ping_check=True,
                 ping_task_timeout=10.0,
                 **kwargs):
    # type: (Celery, int, str, Union[str, int],
    #        str, bool, float, **Any) -> # Iterable
    """Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    """
    test_worker_starting.send(sender=app)

    with _start_worker_thread(app,
                              concurrency=concurrency,
                              pool=pool,
                              loglevel=loglevel,
                              logfile=logfile,
                              **kwargs) as worker:
        if perform_ping_check:
            from .tasks import ping
            with allow_join_result():
                assert ping.delay().get(timeout=ping_task_timeout) == 'pong'

        yield worker
    test_worker_stopped.send(sender=app, worker=worker)


@contextmanager
def _start_worker_thread(app,
                         concurrency=1,
                         pool='solo',
                         loglevel=WORKER_LOGLEVEL,
                         logfile=None,
                         WorkController=TestWorkController,
                         **kwargs):
    # type: (Celery, int, str, Union[str, int], str, Any, **Any) -> Iterable
    """Start Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    """
    setup_app_for_worker(app, loglevel, logfile)
    assert 'celery.ping' in app.tasks
    # Make sure we can connect to the broker
    with app.connection() as conn:
        conn.default_channel.queue_declare

    worker = WorkController(
        app=app,
        concurrency=concurrency,
        hostname=anon_nodename(),
        pool=pool,
        loglevel=loglevel,
        logfile=logfile,
        # not allowed to override TestWorkController.on_consumer_ready
        ready_callback=None,
        without_heartbeat=True,
        without_mingle=True,
        without_gossip=True,
        **kwargs)

    t = threading.Thread(target=worker.start)
    t.start()
    worker.ensure_started()
    _set_task_join_will_block(False)

    yield worker

    from celery.worker import state
    state.should_terminate = 0
    t.join(10)
    state.should_terminate = None


@contextmanager
def _start_worker_process(app,
                          concurrency=1,
                          pool='solo',
                          loglevel=WORKER_LOGLEVEL,
                          logfile=None,
                          **kwargs):
    # type (Celery, int, str, Union[int, str], str, **Any) -> Iterable
    """Start worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    """
    from celery.apps.multi import Cluster, Node

    app.set_current()
    cluster = Cluster([Node('testworker1@%h')])
    cluster.start()
    yield
    cluster.stopwait()


def setup_app_for_worker(app, loglevel, logfile):
    # type: (Celery, Union[str, int], str) -> None
    """Setup the app to be used for starting an embedded worker."""
    app.finalize()
    app.set_current()
    app.set_default()
    type(app.log)._setup = False
    app.log.setup(loglevel=loglevel, logfile=logfile)