/usr/lib/python3/dist-packages/aioprocessing/managers.py is in python3-aioprocessing 1.0.0-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | import asyncio
from multiprocessing.util import register_after_fork
from queue import Queue
from threading import (Barrier, BoundedSemaphore, Condition, Event,
Lock, RLock, Semaphore)
from multiprocessing.managers import (SyncManager, MakeProxyType,
BarrierProxy, EventProxy, ConditionProxy,
AcquirerProxy)
from aioprocessing.locks import _ContextManager
from .executor import _ExecutorMixin
AioBaseQueueProxy = MakeProxyType('AioQueueProxy', (
'task_done', 'get', 'qsize', 'put', 'put_nowait',
'get_nowait', 'empty', 'join', '_qsize', 'full'
))
class _AioProxyMixin(_ExecutorMixin):
_obj = None
def _async_call(self, method, *args, loop=None, **kwargs):
return asyncio.async(self.run_in_executor(self._callmethod, method,
args, kwargs, loop=loop))
class ProxyCoroBuilder(type):
""" Build coroutines to proxy functions. """
def __new__(cls, clsname, bases, dct):
coro_list = dct.get('coroutines', [])
existing_coros = set()
def find_existing_coros(d):
for attr in d:
if attr.startswith("coro_") or attr.startswith("thread_"):
existing_coros.add(attr)
# Determine if any bases include the coroutines attribute, or
# if either this class or a base class provides an actual
# implementation for a coroutine method.
find_existing_coros(dct)
for b in bases:
b_dct = b.__dict__
coro_list.extend(b_dct.get('coroutines', []))
find_existing_coros(b_dct)
bases += (_AioProxyMixin,)
for func in coro_list:
coro_name = 'coro_{}'.format(func)
if coro_name not in existing_coros:
dct[coro_name] = cls.coro_maker(func)
return super().__new__(cls, clsname, bases, dct)
@staticmethod
def coro_maker(func):
def coro_func(self, *args, loop=None, **kwargs):
return self._async_call(func, *args, loop=loop,
**kwargs)
return coro_func
class AioQueueProxy(AioBaseQueueProxy, metaclass=ProxyCoroBuilder):
""" A Proxy object for AioQueue.
Provides coroutines for calling 'get' and 'put' on the
proxy.
"""
coroutines = ['get', 'put']
class AioAcquirerProxy(AcquirerProxy, metaclass=ProxyCoroBuilder):
pool_workers = 1
coroutines = ['acquire', 'release']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._threaded_acquire = False
def _after_fork(obj):
obj._threaded_acquire = False
register_after_fork(self, _after_fork)
def coro_acquire(self, *args, **kwargs):
""" Non-blocking acquire.
We need a custom implementation here, because we need to
set the _threaded_acquire attribute to True once we have
the lock. This attribute is used by release() to determine
whether the lock should be released in the main thread,
or in the Executor thread.
"""
def lock_acquired(fut):
if fut.result():
self._threaded_acquire = True
out = self.run_in_executor(self.acquire, *args, **kwargs)
out.add_done_callback(lock_acquired)
return out
def __getstate__(self):
state = super().__getstate__()
state['_threaded_acquire'] = False
return state
def __setstate__(self, state):
super().__setstate__(state)
def release(self):
""" Release the lock.
If the lock was acquired in the same process via
coro_acquire, we need to release the lock in the
ThreadPoolExecutor's thread.
"""
if self._threaded_acquire:
out = self.run_in_thread(super().release)
else:
out = super().release()
self._threaded_acquire = False
return out
@asyncio.coroutine
def __aenter__(self):
yield from self.coro_acquire()
return None
@asyncio.coroutine
def __aexit__(self, *args, **kwargs):
self.release()
def __iter__(self):
yield from self.coro_acquire()
return _ContextManager(self)
class AioBarrierProxy(BarrierProxy, metaclass=ProxyCoroBuilder):
coroutines = ['wait']
class AioEventProxy(EventProxy, metaclass=ProxyCoroBuilder):
coroutines = ['wait']
class AioConditionProxy(ConditionProxy, metaclass=ProxyCoroBuilder):
coroutines = ['wait', 'wait_for']
class AioSyncManager(SyncManager):
""" A mp.Manager that provides asyncio-friendly objects. """
pass
AioSyncManager.register("AioQueue", Queue, AioQueueProxy)
AioSyncManager.register("AioBarrier", Barrier, AioQueueProxy)
AioSyncManager.register("AioBoundedSemaphore", BoundedSemaphore, AioAcquirerProxy)
AioSyncManager.register("AioCondition", Condition, AioConditionProxy)
AioSyncManager.register("AioEvent", Event, AioQueueProxy)
AioSyncManager.register("AioLock", Lock, AioAcquirerProxy)
AioSyncManager.register("AioRLock", RLock, AioAcquirerProxy)
AioSyncManager.register("AioSemaphore", Semaphore, AioAcquirerProxy)
|