/usr/lib/python2.7/dist-packages/stopit/threadstop.py is in python-stopit 1.1.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | # -*- coding: utf-8 -*-
"""
=================
stopit.threadstop
=================
Raise asynchronous exceptions in other thread, control the timeout of blocks
or callables with a context manager or a decorator.
"""
import ctypes
import threading
from .utils import TimeoutException, BaseTimeout, base_timeoutable
def async_raise(target_tid, exception):
"""Raises an asynchronous exception in another thread.
Read http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
for further enlightenments.
:param target_tid: target thread identifier
:param exception: Exception class to be raised in that thread
"""
# Ensuring and releasing GIL are useless since we're not in C
# gil_state = ctypes.pythonapi.PyGILState_Ensure()
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid),
ctypes.py_object(exception))
# ctypes.pythonapi.PyGILState_Release(gil_state)
if ret == 0:
raise ValueError("Invalid thread ID {}".format(target_tid))
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(target_tid), None)
raise SystemError("PyThreadState_SetAsyncExc failed")
class ThreadingTimeout(BaseTimeout):
"""Context manager for limiting in the time the execution of a block
using asynchronous threads launching exception.
See :class:`stopit.utils.BaseTimeout` for more information
"""
def __init__(self, seconds, swallow_exc=True):
super(ThreadingTimeout, self).__init__(seconds, swallow_exc)
self.target_tid = threading.current_thread().ident
self.timer = None # PEP8
def stop(self):
"""Called by timer thread at timeout. Raises a Timeout exception in the
caller thread
"""
self.state = BaseTimeout.TIMED_OUT
async_raise(self.target_tid, TimeoutException)
# Required overrides
def setup_interrupt(self):
"""Setting up the resource that interrupts the block
"""
self.timer = threading.Timer(self.seconds, self.stop)
self.timer.start()
def suppress_interrupt(self):
"""Removing the resource that interrupts the block
"""
self.timer.cancel()
class threading_timeoutable(base_timeoutable): #noqa
"""A function or method decorator that raises a ``TimeoutException`` to
decorated functions that should not last a certain amount of time.
this one uses ``ThreadingTimeout`` context manager.
See :class:`.utils.base_timoutable`` class for further comments.
"""
to_ctx_mgr = ThreadingTimeout
|