This file is indexed.

/usr/share/pyshared/application/python/queue.py is in python-application 1.2.8-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# Copyright (C) 2006-2007 Dan Pascu. See LICENSE for details.
#

"""Event processing queues, that process the events in a distinct thread"""

__all__ = ['EventQueue', 'CumulativeEventQueue']

import Queue
from threading import Thread, Event, Lock

from application import log

# Special events that control the queue operation (for internal use)
class StopProcessing: pass
class ProcessEvents: pass
class DiscardEvents: pass


class EventQueue(Thread):
    """Simple event processing queue that processes one event at a time"""

    def __init__(self, handler, name=None, preload=[]):
        if not callable(handler):
            raise TypeError("handler should be a callable")
        Thread.__init__(self, name=name or self.__class__.__name__)
        self.setDaemon(True)
        self._exit = Event()
        self._active = Event()
        self._pause_counter = 0
        self._pause_lock = Lock()
        self._accepting_events = True
        self.queue = Queue.Queue()
        self.handle = handler
        self.load(preload)
        self._active.set()
    def run(self):
        """Run the event queue processing loop in its own thread"""
        while not self._exit.isSet():
            self._active.wait()
            event = self.queue.get()
            if event is StopProcessing:
                break
            try:
                self.handle(event)
            except:
                log.error("exception happened during event handling")
                log.err()
            finally:
                del event # do not reference this event until the next event arrives, in order to allow it to be released
    def stop(self, force_exit=False):
        """Terminate the event processing loop/thread (force_exit=True skips processing events already on queue)"""
        if force_exit:
            self._exit.set()
        self.queue.put(StopProcessing)
        ## resume processing in case it is paused
        self._pause_lock.acquire()
        try:
            self._pause_counter = 0
            self._active.set()
        finally:
            self._pause_lock.release()
    def pause(self):
        """Pause processing events"""
        self._pause_lock.acquire()
        try:
            self._pause_counter += 1
            self._active.clear()
        finally:
            self._pause_lock.release()
    def unpause(self):
        """Resume processing events"""
        self._pause_lock.acquire()
        try:
            if self._pause_counter == 0:
                return ## already active
            self._pause_counter -= 1
            if self._pause_counter == 0:
                self._active.set()
        finally:
            self._pause_lock.release()
    def resume(self, events=[]):
        """Add events on the queue and resume processing (will unpause and enable accepting events)."""
        [self.queue.put(event) for event in events]
        self.unpause()
        self.accept_events()
    def accept_events(self):
        """Accept events for processing"""
        self._accepting_events = True
    def ignore_events(self):
        """Ingore events for processing"""
        self._accepting_events = False
    def put(self, event):
        """Add an event on the queue"""
        if self._accepting_events:
            self.queue.put(event)
    def load(self, events):
        """Add multiple events on the queue"""
        if self._accepting_events:
            [self.queue.put(event) for event in events]
    def empty(self):
        """Discard all events that are present on the queue"""
        self.pause()
        try:
            while True:
                self.queue.get_nowait()
        except Queue.Empty:
            pass
        self.unpause()
    def get_unhandled(self):
        """Get unhandled events after the queue is stopped (events are removed from queue)"""
        if self.isAlive():
            raise RuntimeError("Queue is still running")
        unhandled = []
        try:
            while True:
                event = self.queue.get_nowait()
                if event is not StopProcessing:
                    unhandled.append(event)
        except Queue.Empty:
            pass
        return unhandled
    def handle(self, event):
        raise RuntimeError("unhandled event")


class CumulativeEventQueue(EventQueue):
    """An event queue that accumulates events and processes all of them together when its process method is called"""

    def __init__(self, handler, name=None, preload=[]):
        EventQueue.__init__(self, handler, name, preload)
        self._waiting = []
    def run(self):
        """Run the event queue processing loop in its own thread"""
        while not self._exit.isSet():
            self._active.wait()
            event = self.queue.get()
            if event is StopProcessing:
                break
            elif event is ProcessEvents:
                if self._waiting:
                    preserved = []
                    try:
                        unhandled = self.handle(self._waiting)
                        if not isinstance(unhandled, (list, type(None))):
                            raise ValueError("%s handler must return a list of unhandled events or None" % self.__class__.__name__)
                        if unhandled is not None:
                            preserved = unhandled ## preserve the unhandled events that the handler returned
                    except:
                        log.error("exception happened during event handling")
                        log.err()
                    self._waiting = preserved
            elif event is DiscardEvents:
                self._waiting = []
            else:
                if getattr(event, 'high_priority', False):
                    try:
                        self.handle([event])
                    except:
                        log.error("exception happened during high priority event handling")
                        log.err()
                    finally:
                        del event # do not reference this event until the next event arrives, in order to allow it to be released
                else:
                    self._waiting.append(event)
    def process(self):
        """Trigger accumulated event processing. The processing is done on the queue thread"""
        if self._accepting_events:
            self.queue.put(ProcessEvents)
    def empty(self):
        """Discard any events present on the queue"""
        EventQueue.empty(self)
        self.queue.put(DiscardEvents)
    def get_unhandled(self):
        """Get unhandled events after the queue is stopped (events are removed from queue)"""
        unhandled = self._waiting + EventQueue.get_unhandled(self)
        self._waiting = []
        return [e for e in unhandled if e is not ProcessEvents]