/usr/lib/python3/dist-packages/aioprocessing/connection.py is in python3-aioprocessing 1.0.0-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import multiprocessing
from multiprocessing.connection import (Listener, Client, deliver_challenge,
answer_challenge, wait)
from .executor import CoroBuilder
from .util import run_in_executor
__all__ = ['AioConnection']
class AioConnection(metaclass=CoroBuilder):
coroutines = ['recv', 'poll', 'send_bytes', 'recv_bytes',
'recv_bytes_into', 'send']
def __init__(self, obj):
""" Initialize the AioConnection.
obj - a multiprocessing.Connection object.
"""
super().__init__()
self._obj = obj
def __enter__(self):
self._obj.__enter__()
return self
def __exit__(self, *args, **kwargs):
self._obj.__exit__(*args, **kwargs)
def AioClient(*args, **kwargs):
""" Returns an AioConnection instance. """
conn = Client(*args, **kwargs)
return AioConnection(conn)
class AioListener(metaclass=CoroBuilder):
delegate = Listener
coroutines = ['accept']
def accept(self):
conn = self._obj.accept()
return AioConnection(conn)
def __enter__(self):
self._obj.__enter__()
return self
def __exit__(self, *args, **kwargs):
self._obj.__exit__(*args, **kwargs)
def coro_deliver_challenge(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, deliver_challenge, *args, **kwargs)
def coro_answer_challenge(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, answer_challenge, *args, **kwargs)
def coro_wait(*args, **kwargs):
executor = ThreadPoolExecutor(max_workers=1)
return run_in_executor(executor, wait, *args, **kwargs)
|