/usr/lib/python3/dist-packages/aioprocessing/pool.py is in python3-aioprocessing 1.0.0-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | from multiprocessing import Pool
from asyncio import Future
import asyncio
from .executor import CoroBuilder
__all__ = ['AioPool']
class AioPool(metaclass=CoroBuilder):
delegate = Pool
coroutines = ['join']
def _coro_func(self, funcname, *args, loop=None, **kwargs):
""" Call the given function, and wrap the reuslt in a Future.
funcname should be the name of a function which takes `callback`
and `error_callback` keyword arguments (e.g. apply_async).
"""
if not loop:
loop = asyncio.get_event_loop()
fut = Future()
def set_result(result):
loop.call_soon_threadsafe(fut.set_result, result)
def set_exc(exc):
loop.call_soon_threadsafe(fut.set_exception, exc)
func = getattr(self._obj, funcname)
func(*args, callback=set_result,
error_callback=set_exc, **kwargs)
return fut
def coro_apply(self, func, args=(), kwds={}, *, loop=None):
return self._coro_func('apply_async', func,
args=args, kwds=kwds, loop=loop)
def coro_map(self, func, iterable, chunksize=None, *, loop=None):
return self._coro_func('map_async', func, iterable,
chunksize=chunksize, loop=loop)
def coro_starmap(self, func, iterable, chunksize=None, *, loop=None):
return self._coro_func('starmap_async', func, iterable,
chunksize=chunksize, loop=loop)
def __enter__(self):
self._obj.__enter__()
return self
def __exit__(self, *args, **kwargs):
self._obj.__exit__(*args, **kwargs)
|