This file is indexed.

/usr/lib/python3/dist-packages/celery/worker/pidbox.py is in python3-celery 4.1.0-2ubuntu1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Worker Pidbox (remote control)."""
from __future__ import absolute_import, unicode_literals
import socket
import threading
from kombu.common import ignore_errors
from kombu.utils.encoding import safe_str
from celery.utils.collections import AttributeDict
from celery.utils.functional import pass1
from celery.utils.log import get_logger
from . import control

__all__ = ['Pidbox', 'gPidbox']

logger = get_logger(__name__)
debug, error, info = logger.debug, logger.error, logger.info


class Pidbox(object):
    """Worker mailbox."""

    consumer = None

    def __init__(self, c):
        self.c = c
        self.hostname = c.hostname
        self.node = c.app.control.mailbox.Node(
            safe_str(c.hostname),
            handlers=control.Panel.data,
            state=AttributeDict(
                app=c.app,
                hostname=c.hostname,
                consumer=c,
                tset=pass1 if c.controller.use_eventloop else set),
        )
        self._forward_clock = self.c.app.clock.forward

    def on_message(self, body, message):
        # just increase clock as clients usually don't
        # have a valid clock to adjust with.
        self._forward_clock()
        try:
            self.node.handle_message(body, message)
        except KeyError as exc:
            error('No such control command: %s', exc)
        except Exception as exc:
            error('Control command error: %r', exc, exc_info=True)
            self.reset()

    def start(self, c):
        self.node.channel = c.connection.channel()
        self.consumer = self.node.listen(callback=self.on_message)
        self.consumer.on_decode_error = c.on_decode_error

    def on_stop(self):
        pass

    def stop(self, c):
        self.on_stop()
        self.consumer = self._close_channel(c)

    def reset(self):
        self.stop(self.c)
        self.start(self.c)

    def _close_channel(self, c):
        if self.node and self.node.channel:
            ignore_errors(c, self.node.channel.close)

    def shutdown(self, c):
        self.on_stop()
        if self.consumer:
            debug('Canceling broadcast consumer...')
            ignore_errors(c, self.consumer.cancel)
        self.stop(self.c)


class gPidbox(Pidbox):
    """Worker pidbox (greenlet)."""

    _node_shutdown = None
    _node_stopped = None
    _resets = 0

    def start(self, c):
        c.pool.spawn_n(self.loop, c)

    def on_stop(self):
        if self._node_stopped:
            self._node_shutdown.set()
            debug('Waiting for broadcast thread to shutdown...')
            self._node_stopped.wait()
            self._node_stopped = self._node_shutdown = None

    def reset(self):
        self._resets += 1

    def _do_reset(self, c, connection):
        self._close_channel(c)
        self.node.channel = connection.channel()
        self.consumer = self.node.listen(callback=self.on_message)
        self.consumer.consume()

    def loop(self, c):
        resets = [self._resets]
        shutdown = self._node_shutdown = threading.Event()
        stopped = self._node_stopped = threading.Event()
        try:
            with c.connection_for_read() as connection:
                info('pidbox: Connected to %s.', connection.as_uri())
                self._do_reset(c, connection)
                while not shutdown.is_set() and c.connection:
                    if resets[0] < self._resets:
                        resets[0] += 1
                        self._do_reset(c, connection)
                    try:
                        connection.drain_events(timeout=1.0)
                    except socket.timeout:
                        pass
        finally:
            stopped.set()