/usr/lib/python3/dist-packages/prompt_toolkit/eventloop/posix.py is in python3-prompt-toolkit 1.0.9-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 | from __future__ import unicode_literals
import fcntl
import os
import random
import signal
import threading
import time
from prompt_toolkit.terminal.vt100_input import InputStream
from prompt_toolkit.utils import DummyContext, in_main_thread
from prompt_toolkit.input import Input
from .base import EventLoop, INPUT_TIMEOUT
from .callbacks import EventLoopCallbacks
from .inputhook import InputHookContext
from .posix_utils import PosixStdinReader
from .utils import TimeIt
from .select import AutoSelector, Selector, fd_to_int
__all__ = (
'PosixEventLoop',
)
_now = time.time
class PosixEventLoop(EventLoop):
"""
Event loop for posix systems (Linux, Mac os X).
"""
def __init__(self, inputhook=None, selector=AutoSelector):
assert inputhook is None or callable(inputhook)
assert issubclass(selector, Selector)
self.running = False
self.closed = False
self._running = False
self._callbacks = None
self._calls_from_executor = []
self._read_fds = {} # Maps fd to handler.
self.selector = selector()
# Create a pipe for inter thread communication.
self._schedule_pipe = os.pipe()
fcntl.fcntl(self._schedule_pipe[0], fcntl.F_SETFL, os.O_NONBLOCK)
# Create inputhook context.
self._inputhook_context = InputHookContext(inputhook) if inputhook else None
def run(self, stdin, callbacks):
"""
The input 'event loop'.
"""
assert isinstance(stdin, Input)
assert isinstance(callbacks, EventLoopCallbacks)
assert not self._running
if self.closed:
raise Exception('Event loop already closed.')
self._running = True
self._callbacks = callbacks
inputstream = InputStream(callbacks.feed_key)
current_timeout = [INPUT_TIMEOUT] # Nonlocal
# Create reader class.
stdin_reader = PosixStdinReader(stdin.fileno())
# Only attach SIGWINCH signal handler in main thread.
# (It's not possible to attach signal handlers in other threads. In
# that case we should rely on a the main thread to call this manually
# instead.)
if in_main_thread():
ctx = call_on_sigwinch(self.received_winch)
else:
ctx = DummyContext()
def read_from_stdin():
" Read user input. "
# Feed input text.
data = stdin_reader.read()
inputstream.feed(data)
# Set timeout again.
current_timeout[0] = INPUT_TIMEOUT
# Quit when the input stream was closed.
if stdin_reader.closed:
self.stop()
self.add_reader(stdin, read_from_stdin)
self.add_reader(self._schedule_pipe[0], None)
with ctx:
while self._running:
# Call inputhook.
if self._inputhook_context:
with TimeIt() as inputhook_timer:
def ready(wait):
" True when there is input ready. The inputhook should return control. "
return self._ready_for_reading(current_timeout[0] if wait else 0) != []
self._inputhook_context.call_inputhook(ready)
inputhook_duration = inputhook_timer.duration
else:
inputhook_duration = 0
# Calculate remaining timeout. (The inputhook consumed some of the time.)
if current_timeout[0] is None:
remaining_timeout = None
else:
remaining_timeout = max(0, current_timeout[0] - inputhook_duration)
# Wait until input is ready.
fds = self._ready_for_reading(remaining_timeout)
# When any of the FDs are ready. Call the appropriate callback.
if fds:
# Create lists of high/low priority tasks. The main reason
# for this is to allow painting the UI to happen as soon as
# possible, but when there are many events happening, we
# don't want to call the UI renderer 1000x per second. If
# the eventloop is completely saturated with many CPU
# intensive tasks (like processing input/output), we say
# that drawing the UI can be postponed a little, to make
# CPU available. This will be a low priority task in that
# case.
tasks = []
low_priority_tasks = []
now = None # Lazy load time. (Fewer system calls.)
for fd in fds:
# For the 'call_from_executor' fd, put each pending
# item on either the high or low priority queue.
if fd == self._schedule_pipe[0]:
for c, max_postpone_until in self._calls_from_executor:
if max_postpone_until is None:
# Execute now.
tasks.append(c)
else:
# Execute soon, if `max_postpone_until` is in the future.
now = now or _now()
if max_postpone_until < now:
tasks.append(c)
else:
low_priority_tasks.append((c, max_postpone_until))
self._calls_from_executor = []
# Flush all the pipe content.
os.read(self._schedule_pipe[0], 1024)
else:
handler = self._read_fds.get(fd)
if handler:
tasks.append(handler)
# Handle everything in random order. (To avoid starvation.)
random.shuffle(tasks)
random.shuffle(low_priority_tasks)
# When there are high priority tasks, run all these.
# Schedule low priority tasks for the next iteration.
if tasks:
for t in tasks:
t()
# Postpone low priority tasks.
for t, max_postpone_until in low_priority_tasks:
self.call_from_executor(t, _max_postpone_until=max_postpone_until)
else:
# Currently there are only low priority tasks -> run them right now.
for t, _ in low_priority_tasks:
t()
else:
# Flush all pending keys on a timeout. (This is most
# important to flush the vt100 'Escape' key early when
# nothing else follows.)
inputstream.flush()
# Fire input timeout event.
callbacks.input_timeout()
current_timeout[0] = None
self.remove_reader(stdin)
self.remove_reader(self._schedule_pipe[0])
self._callbacks = None
def _ready_for_reading(self, timeout=None):
"""
Return the file descriptors that are ready for reading.
"""
fds = self.selector.select(timeout)
return fds
def received_winch(self):
"""
Notify the event loop that SIGWINCH has been received
"""
# Process signal asynchronously, because this handler can write to the
# output, and doing this inside the signal handler causes easily
# reentrant calls, giving runtime errors..
# Furthur, this has to be thread safe. When the CommandLineInterface
# runs not in the main thread, this function still has to be called
# from the main thread. (The only place where we can install signal
# handlers.)
def process_winch():
if self._callbacks:
self._callbacks.terminal_size_changed()
self.call_from_executor(process_winch)
def run_in_executor(self, callback):
"""
Run a long running function in a background thread.
(This is recommended for code that could block the event loop.)
Similar to Twisted's ``deferToThread``.
"""
# Wait until the main thread is idle.
# We start the thread by using `call_from_executor`. The event loop
# favours processing input over `calls_from_executor`, so the thread
# will not start until there is no more input to process and the main
# thread becomes idle for an instant. This is good, because Python
# threading favours CPU over I/O -- an autocompletion thread in the
# background would cause a significantly slow down of the main thread.
# It is mostly noticable when pasting large portions of text while
# having real time autocompletion while typing on.
def start_executor():
threading.Thread(target=callback).start()
self.call_from_executor(start_executor)
def call_from_executor(self, callback, _max_postpone_until=None):
"""
Call this function in the main event loop.
Similar to Twisted's ``callFromThread``.
:param _max_postpone_until: `None` or `time.time` value. For interal
use. If the eventloop is saturated, consider this task to be low
priority and postpone maximum until this timestamp. (For instance,
repaint is done using low priority.)
"""
assert _max_postpone_until is None or isinstance(_max_postpone_until, float)
self._calls_from_executor.append((callback, _max_postpone_until))
if self._schedule_pipe:
try:
os.write(self._schedule_pipe[1], b'x')
except (AttributeError, IndexError, OSError):
# Handle race condition. We're in a different thread.
# - `_schedule_pipe` could have become None in the meantime.
# - We catch `OSError` (actually BrokenPipeError), because the
# main thread could have closed the pipe already.
pass
def stop(self):
"""
Stop the event loop.
"""
self._running = False
def close(self):
self.closed = True
# Close pipes.
schedule_pipe = self._schedule_pipe
self._schedule_pipe = None
if schedule_pipe:
os.close(schedule_pipe[0])
os.close(schedule_pipe[1])
if self._inputhook_context:
self._inputhook_context.close()
def add_reader(self, fd, callback):
" Add read file descriptor to the event loop. "
fd = fd_to_int(fd)
self._read_fds[fd] = callback
self.selector.register(fd)
def remove_reader(self, fd):
" Remove read file descriptor from the event loop. "
fd = fd_to_int(fd)
if fd in self._read_fds:
del self._read_fds[fd]
self.selector.unregister(fd)
class call_on_sigwinch(object):
"""
Context manager which Installs a SIGWINCH callback.
(This signal occurs when the terminal size changes.)
"""
def __init__(self, callback):
self.callback = callback
self.previous_callback = None
def __enter__(self):
self.previous_callback = signal.signal(signal.SIGWINCH, lambda *a: self.callback())
def __exit__(self, *a, **kw):
if self.previous_callback is None:
# Normally, `signal.signal` should never return `None`.
# For some reason it happens here:
# https://github.com/jonathanslenders/python-prompt-toolkit/pull/174
signal.signal(signal.SIGWINCH, 0)
else:
signal.signal(signal.SIGWINCH, self.previous_callback)
|