/usr/share/pyshared/application/python/queue.py is in python-application 1.2.8-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | # Copyright (C) 2006-2007 Dan Pascu. See LICENSE for details.
#
"""Event processing queues, that process the events in a distinct thread"""
__all__ = ['EventQueue', 'CumulativeEventQueue']
import Queue
from threading import Thread, Event, Lock
from application import log
# Special events that control the queue operation (for internal use)
class StopProcessing: pass
class ProcessEvents: pass
class DiscardEvents: pass
class EventQueue(Thread):
"""Simple event processing queue that processes one event at a time"""
def __init__(self, handler, name=None, preload=[]):
if not callable(handler):
raise TypeError("handler should be a callable")
Thread.__init__(self, name=name or self.__class__.__name__)
self.setDaemon(True)
self._exit = Event()
self._active = Event()
self._pause_counter = 0
self._pause_lock = Lock()
self._accepting_events = True
self.queue = Queue.Queue()
self.handle = handler
self.load(preload)
self._active.set()
def run(self):
"""Run the event queue processing loop in its own thread"""
while not self._exit.isSet():
self._active.wait()
event = self.queue.get()
if event is StopProcessing:
break
try:
self.handle(event)
except:
log.error("exception happened during event handling")
log.err()
finally:
del event # do not reference this event until the next event arrives, in order to allow it to be released
def stop(self, force_exit=False):
"""Terminate the event processing loop/thread (force_exit=True skips processing events already on queue)"""
if force_exit:
self._exit.set()
self.queue.put(StopProcessing)
## resume processing in case it is paused
self._pause_lock.acquire()
try:
self._pause_counter = 0
self._active.set()
finally:
self._pause_lock.release()
def pause(self):
"""Pause processing events"""
self._pause_lock.acquire()
try:
self._pause_counter += 1
self._active.clear()
finally:
self._pause_lock.release()
def unpause(self):
"""Resume processing events"""
self._pause_lock.acquire()
try:
if self._pause_counter == 0:
return ## already active
self._pause_counter -= 1
if self._pause_counter == 0:
self._active.set()
finally:
self._pause_lock.release()
def resume(self, events=[]):
"""Add events on the queue and resume processing (will unpause and enable accepting events)."""
[self.queue.put(event) for event in events]
self.unpause()
self.accept_events()
def accept_events(self):
"""Accept events for processing"""
self._accepting_events = True
def ignore_events(self):
"""Ingore events for processing"""
self._accepting_events = False
def put(self, event):
"""Add an event on the queue"""
if self._accepting_events:
self.queue.put(event)
def load(self, events):
"""Add multiple events on the queue"""
if self._accepting_events:
[self.queue.put(event) for event in events]
def empty(self):
"""Discard all events that are present on the queue"""
self.pause()
try:
while True:
self.queue.get_nowait()
except Queue.Empty:
pass
self.unpause()
def get_unhandled(self):
"""Get unhandled events after the queue is stopped (events are removed from queue)"""
if self.isAlive():
raise RuntimeError("Queue is still running")
unhandled = []
try:
while True:
event = self.queue.get_nowait()
if event is not StopProcessing:
unhandled.append(event)
except Queue.Empty:
pass
return unhandled
def handle(self, event):
raise RuntimeError("unhandled event")
class CumulativeEventQueue(EventQueue):
"""An event queue that accumulates events and processes all of them together when its process method is called"""
def __init__(self, handler, name=None, preload=[]):
EventQueue.__init__(self, handler, name, preload)
self._waiting = []
def run(self):
"""Run the event queue processing loop in its own thread"""
while not self._exit.isSet():
self._active.wait()
event = self.queue.get()
if event is StopProcessing:
break
elif event is ProcessEvents:
if self._waiting:
preserved = []
try:
unhandled = self.handle(self._waiting)
if not isinstance(unhandled, (list, type(None))):
raise ValueError("%s handler must return a list of unhandled events or None" % self.__class__.__name__)
if unhandled is not None:
preserved = unhandled ## preserve the unhandled events that the handler returned
except:
log.error("exception happened during event handling")
log.err()
self._waiting = preserved
elif event is DiscardEvents:
self._waiting = []
else:
if getattr(event, 'high_priority', False):
try:
self.handle([event])
except:
log.error("exception happened during high priority event handling")
log.err()
finally:
del event # do not reference this event until the next event arrives, in order to allow it to be released
else:
self._waiting.append(event)
def process(self):
"""Trigger accumulated event processing. The processing is done on the queue thread"""
if self._accepting_events:
self.queue.put(ProcessEvents)
def empty(self):
"""Discard any events present on the queue"""
EventQueue.empty(self)
self.queue.put(DiscardEvents)
def get_unhandled(self):
"""Get unhandled events after the queue is stopped (events are removed from queue)"""
unhandled = self._waiting + EventQueue.get_unhandled(self)
self._waiting = []
return [e for e in unhandled if e is not ProcessEvents]
|