/usr/lib/python3/dist-packages/tango/asyncio_executor.py is in python3-tango 9.2.2-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | # -----------------------------------------------------------------------------
# This file is part of PyTango (http://pytango.rtfd.io)
#
# Copyright 2006-2012 CELLS / ALBA Synchrotron, Bellaterra, Spain
# Copyright 2013-2014 European Synchrotron Radiation Facility, Grenoble, France
#
# Distributed under the terms of the GNU Lesser General Public License,
# either version 3 of the License, or (at your option) any later version.
# See LICENSE.txt for more info.
# -----------------------------------------------------------------------------
# Future imports
from __future__ import absolute_import
# Imports
import functools
try:
from threading import get_ident
except:
from threading import _get_ident as get_ident
# Asyncio imports
try:
import asyncio
except ImportError:
import trollius as asyncio
try:
from asyncio import run_coroutine_threadsafe
except ImportError:
from .asyncio_tools import run_coroutine_threadsafe
# Tango imports
from .green import AbstractExecutor
__all__ = ["AsyncioExecutor", "get_global_executor", "set_global_executor"]
# Asyncio compatibility
ensure_future = getattr(asyncio, 'ensure_future', getattr(asyncio, 'async'))
# Global executor
_EXECUTOR = None
def get_global_executor():
global _EXECUTOR
if _EXECUTOR is None:
_EXECUTOR = AsyncioExecutor()
return _EXECUTOR
def set_global_executor(executor):
global _EXECUTOR
_EXECUTOR = executor
# Asyncio executor
class AsyncioExecutor(AbstractExecutor):
"""Asyncio tango executor"""
asynchronous = True
default_wait = False
def __init__(self, loop=None, subexecutor=None):
if loop is None:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
self.subexecutor = subexecutor
def delegate(self, fn, *args, **kwargs):
"""Return the given operation as an asyncio future."""
callback = functools.partial(fn, *args, **kwargs)
coro = self.loop.run_in_executor(self.subexecutor, callback)
return ensure_future(coro)
def access(self, accessor, timeout=None):
"""Return a result from an asyncio future."""
if self.loop.is_running():
raise RuntimeError("Loop is already running")
coro = asyncio.wait_for(accessor, timeout, loop=self.loop)
return self.loop.run_until_complete(coro)
def submit(self, fn, *args, **kwargs):
"""Submit an operation"""
corofn = asyncio.coroutine(lambda: fn(*args, **kwargs))
return run_coroutine_threadsafe(corofn(), self.loop)
def execute(self, fn, *args, **kwargs):
"""Execute an operation and return the result."""
if self.loop._thread_id == get_ident():
corofn = asyncio.coroutine(lambda: fn(*args, **kwargs))
return corofn()
future = self.submit(fn, *args, **kwargs)
return future.result()
|