This file is indexed.

/usr/lib/python3/dist-packages/aioprocessing/managers.py is in python3-aioprocessing 1.0.0-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import asyncio
from multiprocessing.util import register_after_fork
from queue import Queue
from threading import (Barrier, BoundedSemaphore, Condition, Event,
                       Lock, RLock, Semaphore)
from multiprocessing.managers import (SyncManager, MakeProxyType,
                                      BarrierProxy, EventProxy, ConditionProxy,
                                      AcquirerProxy)

from aioprocessing.locks import _ContextManager
from .executor import _ExecutorMixin


AioBaseQueueProxy = MakeProxyType('AioQueueProxy', (
    'task_done', 'get', 'qsize', 'put', 'put_nowait', 
    'get_nowait', 'empty', 'join', '_qsize', 'full'
    ))


class _AioProxyMixin(_ExecutorMixin):
    _obj = None

    def _async_call(self, method, *args, loop=None, **kwargs):
        return asyncio.async(self.run_in_executor(self._callmethod, method, 
                                                  args, kwargs, loop=loop))


class ProxyCoroBuilder(type):
    """ Build coroutines to proxy functions. """
    def __new__(cls, clsname, bases, dct):
        coro_list = dct.get('coroutines', [])
        existing_coros = set()

        def find_existing_coros(d):
            for attr in d:
                if attr.startswith("coro_") or attr.startswith("thread_"):
                    existing_coros.add(attr)

        # Determine if any bases include the coroutines attribute, or
        # if either this class or a base class provides an actual
        # implementation for a coroutine method.
        find_existing_coros(dct)
        for b in bases:
            b_dct = b.__dict__
            coro_list.extend(b_dct.get('coroutines', []))
            find_existing_coros(b_dct)

        bases += (_AioProxyMixin,)

        for func in coro_list:
            coro_name = 'coro_{}'.format(func)
            if coro_name not in existing_coros:
                dct[coro_name] = cls.coro_maker(func)
        return super().__new__(cls, clsname, bases, dct)

    @staticmethod
    def coro_maker(func):
        def coro_func(self, *args, loop=None, **kwargs):
            return self._async_call(func, *args, loop=loop, 
                                    **kwargs)
        return coro_func


class AioQueueProxy(AioBaseQueueProxy, metaclass=ProxyCoroBuilder):
    """ A Proxy object for AioQueue.
    
    Provides coroutines for calling 'get' and 'put' on the
    proxy.
    
    """
    coroutines = ['get', 'put']


class AioAcquirerProxy(AcquirerProxy, metaclass=ProxyCoroBuilder):
    pool_workers = 1
    coroutines = ['acquire', 'release']

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._threaded_acquire = False
        def _after_fork(obj):
            obj._threaded_acquire = False
        register_after_fork(self, _after_fork)

    def coro_acquire(self, *args, **kwargs):
        """ Non-blocking acquire.

        We need a custom implementation here, because we need to
        set the _threaded_acquire attribute to True once we have
        the lock. This attribute is used by release() to determine
        whether the lock should be released in the main thread,
        or in the Executor thread.

        """
        def lock_acquired(fut):
            if fut.result():
                self._threaded_acquire = True
        out = self.run_in_executor(self.acquire, *args, **kwargs)
        out.add_done_callback(lock_acquired)
        return out

    def __getstate__(self):
        state = super().__getstate__()
        state['_threaded_acquire'] = False
        return state

    def __setstate__(self, state):
        super().__setstate__(state)

    def release(self):
        """ Release the lock.

        If the lock was acquired in the same process via
        coro_acquire, we need to release the lock in the
        ThreadPoolExecutor's thread.

        """
        if self._threaded_acquire:
            out = self.run_in_thread(super().release)
        else:
            out = super().release()
        self._threaded_acquire = False
        return out

    @asyncio.coroutine
    def __aenter__(self):
        yield from self.coro_acquire()
        return None

    @asyncio.coroutine
    def __aexit__(self, *args, **kwargs):
        self.release()

    def __iter__(self):
        yield from self.coro_acquire()
        return _ContextManager(self)


class AioBarrierProxy(BarrierProxy, metaclass=ProxyCoroBuilder):
    coroutines = ['wait']


class AioEventProxy(EventProxy, metaclass=ProxyCoroBuilder):
    coroutines = ['wait']


class AioConditionProxy(ConditionProxy, metaclass=ProxyCoroBuilder):
    coroutines = ['wait', 'wait_for']


class AioSyncManager(SyncManager):
    """ A mp.Manager that provides asyncio-friendly objects. """
    pass


AioSyncManager.register("AioQueue", Queue, AioQueueProxy)
AioSyncManager.register("AioBarrier", Barrier, AioQueueProxy)
AioSyncManager.register("AioBoundedSemaphore", BoundedSemaphore, AioAcquirerProxy)
AioSyncManager.register("AioCondition", Condition, AioConditionProxy)
AioSyncManager.register("AioEvent", Event, AioQueueProxy)
AioSyncManager.register("AioLock", Lock, AioAcquirerProxy)
AioSyncManager.register("AioRLock", RLock, AioAcquirerProxy)
AioSyncManager.register("AioSemaphore", Semaphore, AioAcquirerProxy)