/usr/lib/python2.7/dist-packages/celery/concurrency/gevent.py is in python-celery 3.1.20-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | # -*- coding: utf-8 -*-
"""
celery.concurrency.gevent
~~~~~~~~~~~~~~~~~~~~~~~~~
gevent pool implementation.
"""
from __future__ import absolute_import
from time import time
try:
from gevent import Timeout
except ImportError: # pragma: no cover
Timeout = None # noqa
from celery.utils import timer2
from .base import apply_target, BasePool
__all__ = ['TaskPool']
def apply_timeout(target, args=(), kwargs={}, callback=None,
accept_callback=None, pid=None, timeout=None,
timeout_callback=None, Timeout=Timeout,
apply_target=apply_target, **rest):
try:
with Timeout(timeout):
return apply_target(target, args, kwargs, callback,
accept_callback, pid,
propagate=(Timeout, ), **rest)
except Timeout:
return timeout_callback(False, timeout)
class Schedule(timer2.Schedule):
def __init__(self, *args, **kwargs):
from gevent.greenlet import Greenlet, GreenletExit
class _Greenlet(Greenlet):
cancel = Greenlet.kill
self._Greenlet = _Greenlet
self._GreenletExit = GreenletExit
super(Schedule, self).__init__(*args, **kwargs)
self._queue = set()
def _enter(self, eta, priority, entry):
secs = max(eta - time(), 0)
g = self._Greenlet.spawn_later(secs, entry)
self._queue.add(g)
g.link(self._entry_exit)
g.entry = entry
g.eta = eta
g.priority = priority
g.cancelled = False
return g
def _entry_exit(self, g):
try:
g.kill()
finally:
self._queue.discard(g)
def clear(self):
queue = self._queue
while queue:
try:
queue.pop().kill()
except KeyError:
pass
@property
def queue(self):
return self._queue
class Timer(timer2.Timer):
Schedule = Schedule
def ensure_started(self):
pass
def stop(self):
self.schedule.clear()
def start(self):
pass
class TaskPool(BasePool):
Timer = Timer
signal_safe = False
is_green = True
task_join_will_block = False
def __init__(self, *args, **kwargs):
from gevent import spawn_raw
from gevent.pool import Pool
self.Pool = Pool
self.spawn_n = spawn_raw
self.timeout = kwargs.get('timeout')
super(TaskPool, self).__init__(*args, **kwargs)
def on_start(self):
self._pool = self.Pool(self.limit)
self._quick_put = self._pool.spawn
def on_stop(self):
if self._pool is not None:
self._pool.join()
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, timeout=None,
timeout_callback=None, **_):
timeout = self.timeout if timeout is None else timeout
return self._quick_put(apply_timeout if timeout else apply_target,
target, args, kwargs, callback, accept_callback,
timeout=timeout,
timeout_callback=timeout_callback)
def grow(self, n=1):
self._pool._semaphore.counter += n
self._pool.size += n
def shrink(self, n=1):
self._pool._semaphore.counter -= n
self._pool.size -= n
@property
def num_processes(self):
return len(self._pool)
|