/usr/lib/python2.7/dist-packages/eventlet/greenpool.py is in python-eventlet 0.13.0-1ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | import itertools
import traceback
from eventlet import event
from eventlet import greenthread
from eventlet import queue
from eventlet import semaphore
from eventlet.support import greenlets as greenlet
__all__ = ['GreenPool', 'GreenPile']
DEBUG = True
class GreenPool(object):
"""The GreenPool class is a pool of green threads.
"""
def __init__(self, size=1000):
self.size = size
self.coroutines_running = set()
self.sem = semaphore.Semaphore(size)
self.no_coros_running = event.Event()
def resize(self, new_size):
""" Change the max number of greenthreads doing work at any given time.
If resize is called when there are more than *new_size* greenthreads
already working on tasks, they will be allowed to complete but no new
tasks will be allowed to get launched until enough greenthreads finish
their tasks to drop the overall quantity below *new_size*. Until
then, the return value of free() will be negative.
"""
size_delta = new_size - self.size
self.sem.counter += size_delta
self.size = new_size
def running(self):
""" Returns the number of greenthreads that are currently executing
functions in the GreenPool."""
return len(self.coroutines_running)
def free(self):
""" Returns the number of greenthreads available for use.
If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will
block the calling greenthread until a slot becomes available."""
return self.sem.counter
def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
object that is running the function, which can be used to retrieve the
results.
If the pool is currently at capacity, ``spawn`` will block until one of
the running greenthreads completes its task and frees up a slot.
This function is reentrant; *function* can call ``spawn`` on the same
pool without risk of deadlocking the whole thing.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
# a bit hacky to use the GT without switching to it
gt = greenthread.GreenThread(current)
gt.main(function, args, kwargs)
return gt
else:
self.sem.acquire()
gt = greenthread.spawn(function, *args, **kwargs)
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.coroutines_running.add(gt)
gt.link(self._spawn_done)
return gt
def _spawn_n_impl(self, func, args, kwargs, coro):
try:
try:
func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
raise
except:
if DEBUG:
traceback.print_exc()
finally:
if coro is None:
return
else:
coro = greenthread.getcurrent()
self._spawn_done(coro)
def spawn_n(self, function, *args, **kwargs):
"""Create a greenthread to run the *function*, the same as
:meth:`spawn`. The difference is that :meth:`spawn_n` returns
None; the results of *function* are not retrievable.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = greenthread.getcurrent()
if self.sem.locked() and current in self.coroutines_running:
self._spawn_n_impl(function, args, kwargs, None)
else:
self.sem.acquire()
g = greenthread.spawn_n(self._spawn_n_impl,
function, args, kwargs, True)
if not self.coroutines_running:
self.no_coros_running = event.Event()
self.coroutines_running.add(g)
def waitall(self):
"""Waits until all greenthreads in the pool are finished working."""
assert greenthread.getcurrent() not in self.coroutines_running, \
"Calling waitall() from within one of the "\
"GreenPool's greenthreads will never terminate."
if self.running():
self.no_coros_running.wait()
def _spawn_done(self, coro):
self.sem.release()
if coro is not None:
self.coroutines_running.remove(coro)
# if done processing (no more work is waiting for processing),
# we can finish off any waitall() calls that might be pending
if self.sem.balance == self.size:
self.no_coros_running.send(None)
def waiting(self):
"""Return the number of greenthreads waiting to spawn.
"""
if self.sem.balance < 0:
return -self.sem.balance
else:
return 0
def _do_map(self, func, it, gi):
for args in it:
gi.spawn(func, *args)
gi.spawn(return_stop_iteration)
def starmap(self, function, iterable):
"""This is the same as :func:`itertools.starmap`, except that *func* is
executed in a separate green thread for each item, with the concurrency
limited by the pool's size. In operation, starmap consumes a constant
amount of memory, proportional to the size of the pool, and is thus
suited for iterating over extremely long input lists.
"""
if function is None:
function = lambda *a: a
gi = GreenMap(self.size)
greenthread.spawn_n(self._do_map, function, iterable, gi)
return gi
def imap(self, function, *iterables):
"""This is the same as :func:`itertools.imap`, and has the same
concurrency and memory behavior as :meth:`starmap`.
It's quite convenient for, e.g., farming out jobs from a file::
def worker(line):
return do_something(line)
pool = GreenPool()
for result in pool.imap(worker, open("filename", 'r')):
print result
"""
return self.starmap(function, itertools.izip(*iterables))
def return_stop_iteration():
return StopIteration()
class GreenPile(object):
"""GreenPile is an abstraction representing a bunch of I/O-related tasks.
Construct a GreenPile with an existing GreenPool object. The GreenPile will
then use that pool's concurrency as it processes its jobs. There can be
many GreenPiles associated with a single GreenPool.
A GreenPile can also be constructed standalone, not associated with any
GreenPool. To do this, construct it with an integer size parameter instead
of a GreenPool.
It is not advisable to iterate over a GreenPile in a different greenthread
than the one which is calling spawn. The iterator will exit early in that
situation.
"""
def __init__(self, size_or_pool=1000):
if isinstance(size_or_pool, GreenPool):
self.pool = size_or_pool
else:
self.pool = GreenPool(size_or_pool)
self.waiters = queue.LightQueue()
self.used = False
self.counter = 0
def spawn(self, func, *args, **kw):
"""Runs *func* in its own green thread, with the result available by
iterating over the GreenPile object."""
self.used = True
self.counter += 1
try:
gt = self.pool.spawn(func, *args, **kw)
self.waiters.put(gt)
except:
self.counter -= 1
raise
def __iter__(self):
return self
def next(self):
"""Wait for the next result, suspending the current greenthread until it
is available. Raises StopIteration when there are no more results."""
if self.counter == 0 and self.used:
raise StopIteration()
try:
return self.waiters.get().wait()
finally:
self.counter -= 1
# this is identical to GreenPile but it blocks on spawn if the results
# aren't consumed, and it doesn't generate its own StopIteration exception,
# instead relying on the spawning process to send one in when it's done
class GreenMap(GreenPile):
def __init__(self, size_or_pool):
super(GreenMap, self).__init__(size_or_pool)
self.waiters = queue.LightQueue(maxsize=self.pool.size)
def next(self):
try:
val = self.waiters.get().wait()
if isinstance(val, StopIteration):
raise val
else:
return val
finally:
self.counter -= 1
|