This file is indexed.

/usr/lib/python3/dist-packages/tango/gevent_executor.py is in python3-tango 9.2.2-1build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# ------------------------------------------------------------------------------
# This file is part of PyTango (http://pytango.rtfd.io)
#
# Copyright 2006-2012 CELLS / ALBA Synchrotron, Bellaterra, Spain
# Copyright 2013-2014 European Synchrotron Radiation Facility, Grenoble, France
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
# ------------------------------------------------------------------------------

# Future imports
from __future__ import absolute_import

# Imports
import sys
import six
import types
import functools

# Combatibility imports
try:
    from threading import get_ident
except:
    from threading import _get_ident as get_ident

# Gevent imports
import gevent.queue

# Tango imports
from .green import AbstractExecutor

__all__ = ["get_global_executor", "set_global_executor", "GeventExecutor"]

# Global executor

_EXECUTOR = None


def get_global_executor():
    global _EXECUTOR
    if _EXECUTOR is None:
        _EXECUTOR = GeventExecutor()
    return _EXECUTOR


def set_global_executor(executor):
    global _EXECUTOR
    _EXECUTOR = executor


# Patch for gevent threadpool

def get_global_threadpool():
    """Before gevent-1.1.0, patch the spawn method to propagate exception
    raised in the loop to the AsyncResult.
    """
    threadpool = gevent.get_hub().threadpool
    if gevent.version_info < (1, 1) and not hasattr(threadpool, '_spawn'):
        threadpool._spawn = threadpool.spawn
        threadpool.spawn = types.MethodType(
            spawn, threadpool, type(threadpool))
    return threadpool


class ExceptionWrapper:
    def __init__(self, exception, error_string, tb):
        self.exception = exception
        self.error_string = error_string
        self.tb = tb


def wrap_errors(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except:
            return ExceptionWrapper(*sys.exc_info())

    return wrapper


def get_with_exception(result, block=True, timeout=None):
    result = result._get(block, timeout)
    if isinstance(result, ExceptionWrapper):
        # Raise the exception using the caller context
        six.reraise(result.exception, result.error_string, result.tb)
    return result


def spawn(threadpool, fn, *args, **kwargs):
    # The gevent threadpool do not raise exception with async results,
    # we have to wrap it
    fn = wrap_errors(fn)
    result = threadpool._spawn(fn, *args, **kwargs)
    result._get = result.get
    result.get = types.MethodType(get_with_exception, result, type(result))
    return result


# Gevent task and event loop

class GeventTask:
    def __init__(self, event, func, *args, **kwargs):
        self.event = event
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.value = None
        self.exception = None

    def run(self):
        try:
            self.value = self.func(*self.args, **self.kwargs)
        except:
            self.exception = sys.exc_info()
        finally:
            self.event.set()

    def spawn(self):
        return gevent.spawn(self.run)

    def result(self):
        self.event.wait()
        if self.exception:
            six.reraise(*self.exception)
        return self.value


class GeventLoop:
    def __init__(self):
        self.thread_id = get_ident()
        self.tasks = gevent.queue.Queue()
        self.loop = gevent.spawn(self.run)

    def run(self):
        while True:
            self.tasks.get().spawn()

    def is_gevent_thread(self):
        return self.thread_id == get_ident()

    def submit(self, func, *args, **kwargs):
        event = gevent._threading.Event()
        task = GeventTask(event, func, *args, **kwargs)
        self.tasks.put_nowait(task)
        self.tasks.hub.loop.async().send()
        return task


# Gevent executor

class GeventExecutor(AbstractExecutor):
    """Gevent tango executor"""

    asynchronous = True
    default_wait = True

    def __init__(self, loop=None, subexecutor=None):
        if loop is None:
            loop = GeventLoop()
        if subexecutor is None:
            subexecutor = get_global_threadpool()
        self.loop = loop
        self.subexecutor = subexecutor

    def delegate(self, fn, *args, **kwargs):
        """Return the given operation as a gevent future."""
        return self.subexecutor.spawn(fn, *args, **kwargs)

    def access(self, accessor, timeout=None):
        """Return a result from an gevent future."""
        return accessor.get(timeout=timeout)

    def submit(self, fn, *args, **kwargs):
        return self.loop.submit(fn, *args, **kwargs)

    def execute(self, fn, *args, **kwargs):
        """Execute an operation and return the result."""
        if self.loop.is_gevent_thread():
            return fn(*args, **kwargs)
        task = self.submit(fn, *args, **kwargs)
        return task.result()