/usr/lib/python2.7/dist-packages/prompt_toolkit/eventloop/asyncio_win32.py is in python-prompt-toolkit 1.0.15-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | """
Win32 asyncio event loop.
Windows notes:
- Somehow it doesn't seem to work with the 'ProactorEventLoop'.
"""
from __future__ import unicode_literals
from .base import EventLoop, INPUT_TIMEOUT
from ..terminal.win32_input import ConsoleInputReader
from .callbacks import EventLoopCallbacks
from .asyncio_base import AsyncioTimeout
import asyncio
__all__ = (
'Win32AsyncioEventLoop',
)
class Win32AsyncioEventLoop(EventLoop):
def __init__(self, loop=None):
self._console_input_reader = ConsoleInputReader()
self.running = False
self.closed = False
self.loop = loop or asyncio.get_event_loop()
@asyncio.coroutine
def run_as_coroutine(self, stdin, callbacks):
"""
The input 'event loop'.
"""
# Note: We cannot use "yield from", because this package also
# installs on Python 2.
assert isinstance(callbacks, EventLoopCallbacks)
if self.closed:
raise Exception('Event loop already closed.')
timeout = AsyncioTimeout(INPUT_TIMEOUT, callbacks.input_timeout, self.loop)
self.running = True
try:
while self.running:
timeout.reset()
# Get keys
try:
g = iter(self.loop.run_in_executor(None, self._console_input_reader.read))
while True:
yield next(g)
except StopIteration as e:
keys = e.args[0]
# Feed keys to input processor.
for k in keys:
callbacks.feed_key(k)
finally:
timeout.stop()
def stop(self):
self.running = False
def close(self):
# Note: we should not close the asyncio loop itself, because that one
# was not created here.
self.closed = True
self._console_input_reader.close()
def run_in_executor(self, callback):
self.loop.run_in_executor(None, callback)
def call_from_executor(self, callback, _max_postpone_until=None):
self.loop.call_soon_threadsafe(callback)
def add_reader(self, fd, callback):
" Start watching the file descriptor for read availability. "
self.loop.add_reader(fd, callback)
def remove_reader(self, fd):
" Stop watching the file descriptor for read availability. "
self.loop.remove_reader(fd)
|