/usr/lib/python3/dist-packages/aioprocessing/locks.py is in python3-aioprocessing 1.0.0-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | import asyncio
from .executor import CoroBuilder
from multiprocessing import (Event, Lock, RLock, BoundedSemaphore,
Condition, Semaphore, Barrier)
from multiprocessing.util import register_after_fork
__all__ = ["AioLock", "AioRLock", "AioBarrier", "AioCondition", "AioEvent",
"AioSemaphore", "AioBoundedSemaphore"]
class _ContextManager:
""" Context manager.
This enables the following idiom for acquiring and releasing a
lock around a block:
with (yield from lock):
<block>
"""
def __init__(self, lock):
self._lock = lock
def __enter__(self):
# We have no use for the "as ..." clause in the with
# statement for locks.
return None
def __exit__(self, *args):
try:
self._lock.release()
finally:
self._lock = None # Crudely prevent reuse.
class AioBaseLock(metaclass=CoroBuilder):
pool_workers = 1
coroutines = ['acquire', 'release']
def __init__(self, *args, **kwargs):
self._threaded_acquire = False
def _after_fork(obj):
obj._threaded_acquire = False
register_after_fork(self, _after_fork)
def coro_acquire(self, *args, **kwargs):
""" Non-blocking acquire.
We need a custom implementation here, because we need to
set the _threaded_acquire attribute to True once we have
the lock. This attribute is used by release() to determine
whether the lock should be released in the main thread,
or in the Executor thread.
"""
def lock_acquired(fut):
if fut.result():
self._threaded_acquire = True
out = self.run_in_executor(self._obj.acquire, *args, **kwargs)
out.add_done_callback(lock_acquired)
return out
def __getstate__(self):
state = super().__getstate__()
state['_threaded_acquire'] = False
return state
def __setstate__(self, state):
super().__setstate__(state)
def release(self):
""" Release the lock.
If the lock was acquired in the same process via
coro_acquire, we need to release the lock in the
ThreadPoolExecutor's thread.
"""
if self._threaded_acquire:
out = self.run_in_thread(self._obj.release)
else:
out = self._obj.release()
self._threaded_acquire = False
return out
@asyncio.coroutine
def __aenter__(self):
yield from self.coro_acquire()
return None
@asyncio.coroutine
def __aexit__(self, *args, **kwargs):
self.release()
def __enter__(self):
return self._obj.__enter__()
def __exit__(self, *args, **kwargs):
return self._obj.__exit__(*args, **kwargs)
def __iter__(self):
yield from self.coro_acquire()
return _ContextManager(self)
class AioBaseWaiter(metaclass=CoroBuilder):
pool_workers = 1
coroutines = ['wait']
class AioBarrier(AioBaseWaiter):
delegate = Barrier
pass
class AioCondition(AioBaseLock, AioBaseWaiter):
delegate = Condition
pool_workers = 1
coroutines = ['wait_for', 'notify', 'notify_all']
class AioEvent(AioBaseWaiter):
delegate = Event
class AioLock(AioBaseLock):
delegate = Lock
class AioRLock(AioBaseLock):
delegate = RLock
class AioSemaphore(AioBaseLock):
delegate = Semaphore
class AioBoundedSemaphore(AioBaseLock):
delegate = BoundedSemaphore
|