/usr/lib/python3/dist-packages/tango/gevent_executor.py is in python3-tango 9.2.2-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | # ------------------------------------------------------------------------------
# This file is part of PyTango (http://pytango.rtfd.io)
#
# Copyright 2006-2012 CELLS / ALBA Synchrotron, Bellaterra, Spain
# Copyright 2013-2014 European Synchrotron Radiation Facility, Grenoble, France
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
# ------------------------------------------------------------------------------
# Future imports
from __future__ import absolute_import
# Imports
import sys
import six
import types
import functools
# Combatibility imports
try:
from threading import get_ident
except:
from threading import _get_ident as get_ident
# Gevent imports
import gevent.queue
# Tango imports
from .green import AbstractExecutor
__all__ = ["get_global_executor", "set_global_executor", "GeventExecutor"]
# Global executor
_EXECUTOR = None
def get_global_executor():
global _EXECUTOR
if _EXECUTOR is None:
_EXECUTOR = GeventExecutor()
return _EXECUTOR
def set_global_executor(executor):
global _EXECUTOR
_EXECUTOR = executor
# Patch for gevent threadpool
def get_global_threadpool():
"""Before gevent-1.1.0, patch the spawn method to propagate exception
raised in the loop to the AsyncResult.
"""
threadpool = gevent.get_hub().threadpool
if gevent.version_info < (1, 1) and not hasattr(threadpool, '_spawn'):
threadpool._spawn = threadpool.spawn
threadpool.spawn = types.MethodType(
spawn, threadpool, type(threadpool))
return threadpool
class ExceptionWrapper:
def __init__(self, exception, error_string, tb):
self.exception = exception
self.error_string = error_string
self.tb = tb
def wrap_errors(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
return ExceptionWrapper(*sys.exc_info())
return wrapper
def get_with_exception(result, block=True, timeout=None):
result = result._get(block, timeout)
if isinstance(result, ExceptionWrapper):
# Raise the exception using the caller context
six.reraise(result.exception, result.error_string, result.tb)
return result
def spawn(threadpool, fn, *args, **kwargs):
# The gevent threadpool do not raise exception with async results,
# we have to wrap it
fn = wrap_errors(fn)
result = threadpool._spawn(fn, *args, **kwargs)
result._get = result.get
result.get = types.MethodType(get_with_exception, result, type(result))
return result
# Gevent task and event loop
class GeventTask:
def __init__(self, event, func, *args, **kwargs):
self.event = event
self.func = func
self.args = args
self.kwargs = kwargs
self.value = None
self.exception = None
def run(self):
try:
self.value = self.func(*self.args, **self.kwargs)
except:
self.exception = sys.exc_info()
finally:
self.event.set()
def spawn(self):
return gevent.spawn(self.run)
def result(self):
self.event.wait()
if self.exception:
six.reraise(*self.exception)
return self.value
class GeventLoop:
def __init__(self):
self.thread_id = get_ident()
self.tasks = gevent.queue.Queue()
self.loop = gevent.spawn(self.run)
def run(self):
while True:
self.tasks.get().spawn()
def is_gevent_thread(self):
return self.thread_id == get_ident()
def submit(self, func, *args, **kwargs):
event = gevent._threading.Event()
task = GeventTask(event, func, *args, **kwargs)
self.tasks.put_nowait(task)
self.tasks.hub.loop.async().send()
return task
# Gevent executor
class GeventExecutor(AbstractExecutor):
"""Gevent tango executor"""
asynchronous = True
default_wait = True
def __init__(self, loop=None, subexecutor=None):
if loop is None:
loop = GeventLoop()
if subexecutor is None:
subexecutor = get_global_threadpool()
self.loop = loop
self.subexecutor = subexecutor
def delegate(self, fn, *args, **kwargs):
"""Return the given operation as a gevent future."""
return self.subexecutor.spawn(fn, *args, **kwargs)
def access(self, accessor, timeout=None):
"""Return a result from an gevent future."""
return accessor.get(timeout=timeout)
def submit(self, fn, *args, **kwargs):
return self.loop.submit(fn, *args, **kwargs)
def execute(self, fn, *args, **kwargs):
"""Execute an operation and return the result."""
if self.loop.is_gevent_thread():
return fn(*args, **kwargs)
task = self.submit(fn, *args, **kwargs)
return task.result()
|