This file is indexed.

/usr/lib/python2.7/dist-packages/celery/concurrency/eventlet.py is in python-celery 3.1.20-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
"""
    celery.concurrency.eventlet
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~

    Eventlet pool implementation.

"""
from __future__ import absolute_import

import sys

from time import time

__all__ = ['TaskPool']

W_RACE = """\
Celery module with %s imported before eventlet patched\
"""
RACE_MODS = ('billiard.', 'celery.', 'kombu.')


#: Warn if we couldn't patch early enough,
#: and thread/socket depending celery modules have already been loaded.
for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
    for side in ('thread', 'threading', 'socket'):  # pragma: no cover
        if getattr(mod, side, None):
            import warnings
            warnings.warn(RuntimeWarning(W_RACE % side))


from celery import signals  # noqa
from celery.utils import timer2  # noqa

from . import base  # noqa


def apply_target(target, args=(), kwargs={}, callback=None,
                 accept_callback=None, getpid=None):
    return base.apply_target(target, args, kwargs, callback, accept_callback,
                             pid=getpid())


class Schedule(timer2.Schedule):

    def __init__(self, *args, **kwargs):
        from eventlet.greenthread import spawn_after
        from greenlet import GreenletExit
        super(Schedule, self).__init__(*args, **kwargs)

        self.GreenletExit = GreenletExit
        self._spawn_after = spawn_after
        self._queue = set()

    def _enter(self, eta, priority, entry):
        secs = max(eta - time(), 0)
        g = self._spawn_after(secs, entry)
        self._queue.add(g)
        g.link(self._entry_exit, entry)
        g.entry = entry
        g.eta = eta
        g.priority = priority
        g.cancelled = False
        return g

    def _entry_exit(self, g, entry):
        try:
            try:
                g.wait()
            except self.GreenletExit:
                entry.cancel()
                g.cancelled = True
        finally:
            self._queue.discard(g)

    def clear(self):
        queue = self._queue
        while queue:
            try:
                queue.pop().cancel()
            except (KeyError, self.GreenletExit):
                pass

    @property
    def queue(self):
        return self._queue


class Timer(timer2.Timer):
    Schedule = Schedule

    def ensure_started(self):
        pass

    def stop(self):
        self.schedule.clear()

    def cancel(self, tref):
        try:
            tref.cancel()
        except self.schedule.GreenletExit:
            pass

    def start(self):
        pass


class TaskPool(base.BasePool):
    Timer = Timer

    signal_safe = False
    is_green = True
    task_join_will_block = False

    def __init__(self, *args, **kwargs):
        from eventlet import greenthread
        from eventlet.greenpool import GreenPool
        self.Pool = GreenPool
        self.getcurrent = greenthread.getcurrent
        self.getpid = lambda: id(greenthread.getcurrent())
        self.spawn_n = greenthread.spawn_n

        super(TaskPool, self).__init__(*args, **kwargs)

    def on_start(self):
        self._pool = self.Pool(self.limit)
        signals.eventlet_pool_started.send(sender=self)
        self._quick_put = self._pool.spawn_n
        self._quick_apply_sig = signals.eventlet_pool_apply.send

    def on_stop(self):
        signals.eventlet_pool_preshutdown.send(sender=self)
        if self._pool is not None:
            self._pool.waitall()
        signals.eventlet_pool_postshutdown.send(sender=self)

    def on_apply(self, target, args=None, kwargs=None, callback=None,
                 accept_callback=None, **_):
        self._quick_apply_sig(
            sender=self, target=target, args=args, kwargs=kwargs,
        )
        self._quick_put(apply_target, target, args, kwargs,
                        callback, accept_callback,
                        self.getpid)

    def grow(self, n=1):
        limit = self.limit + n
        self._pool.resize(limit)
        self.limit = limit

    def shrink(self, n=1):
        limit = self.limit - n
        self._pool.resize(limit)
        self.limit = limit

    def _get_info(self):
        return {
            'max-concurrency': self.limit,
            'free-threads': self._pool.free(),
            'running-threads': self._pool.running(),
        }