/usr/lib/python3/dist-packages/celery/concurrency/eventlet.py is in python3-celery 3.1.13-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | # -*- coding: utf-8 -*-
"""
celery.concurrency.eventlet
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Eventlet pool implementation.
"""
from __future__ import absolute_import
import sys
from time import time
__all__ = ['TaskPool']
W_RACE = """\
Celery module with %s imported before eventlet patched\
"""
RACE_MODS = ('billiard.', 'celery.', 'kombu.')
#: Warn if we couldn't patch early enough,
#: and thread/socket depending celery modules have already been loaded.
for mod in (mod for mod in sys.modules if mod.startswith(RACE_MODS)):
for side in ('thread', 'threading', 'socket'): # pragma: no cover
if getattr(mod, side, None):
import warnings
warnings.warn(RuntimeWarning(W_RACE % side))
from celery import signals
from celery.utils import timer2
from . import base
def apply_target(target, args=(), kwargs={}, callback=None,
accept_callback=None, getpid=None):
return base.apply_target(target, args, kwargs, callback, accept_callback,
pid=getpid())
class Schedule(timer2.Schedule):
def __init__(self, *args, **kwargs):
from eventlet.greenthread import spawn_after
from greenlet import GreenletExit
super(Schedule, self).__init__(*args, **kwargs)
self.GreenletExit = GreenletExit
self._spawn_after = spawn_after
self._queue = set()
def _enter(self, eta, priority, entry):
secs = max(eta - time(), 0)
g = self._spawn_after(secs, entry)
self._queue.add(g)
g.link(self._entry_exit, entry)
g.entry = entry
g.eta = eta
g.priority = priority
g.cancelled = False
return g
def _entry_exit(self, g, entry):
try:
try:
g.wait()
except self.GreenletExit:
entry.cancel()
g.cancelled = True
finally:
self._queue.discard(g)
def clear(self):
queue = self._queue
while queue:
try:
queue.pop().cancel()
except (KeyError, self.GreenletExit):
pass
@property
def queue(self):
return self._queue
class Timer(timer2.Timer):
Schedule = Schedule
def ensure_started(self):
pass
def stop(self):
self.schedule.clear()
def cancel(self, tref):
try:
tref.cancel()
except self.schedule.GreenletExit:
pass
def start(self):
pass
class TaskPool(base.BasePool):
Timer = Timer
signal_safe = False
is_green = True
task_join_will_block = False
def __init__(self, *args, **kwargs):
from eventlet import greenthread
from eventlet.greenpool import GreenPool
self.Pool = GreenPool
self.getcurrent = greenthread.getcurrent
self.getpid = lambda: id(greenthread.getcurrent())
self.spawn_n = greenthread.spawn_n
super(TaskPool, self).__init__(*args, **kwargs)
def on_start(self):
self._pool = self.Pool(self.limit)
signals.eventlet_pool_started.send(sender=self)
self._quick_put = self._pool.spawn_n
self._quick_apply_sig = signals.eventlet_pool_apply.send
def on_stop(self):
signals.eventlet_pool_preshutdown.send(sender=self)
if self._pool is not None:
self._pool.waitall()
signals.eventlet_pool_postshutdown.send(sender=self)
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
self._quick_apply_sig(
sender=self, target=target, args=args, kwargs=kwargs,
)
self._quick_put(apply_target, target, args, kwargs,
callback, accept_callback,
self.getpid)
def grow(self, n=1):
limit = self.limit + n
self._pool.resize(limit)
self.limit = limit
def shrink(self, n=1):
limit = self.limit - n
self._pool.resize(limit)
self.limit = limit
def _get_info(self):
return {
'max-concurrency': self.limit,
'free-threads': self._pool.free(),
'running-threads': self._pool.running(),
}
|