This file is indexed.

/usr/share/pyshared/async/thread.py is in python-async 0.6.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# -*- coding: utf-8 -*-
"""Module with threading utilities"""
__docformat__ = "restructuredtext"
import threading
import inspect
import Queue

import sys

__all__ = ('do_terminate_threads', 'terminate_threads', 'TerminatableThread', 
			'WorkerThread') 
		

#{ Decorators

def do_terminate_threads(whitelist=list()):
	"""Simple function which terminates all of our threads
	:param whitelist: If whitelist is given, only the given threads will be terminated"""
	for t in threading.enumerate():
		if not isinstance(t, TerminatableThread):
			continue
		if whitelist and t not in whitelist:
			continue
		t.schedule_termination()
		t.stop_and_join()
	# END for each thread

def terminate_threads( func ):
	"""Kills all worker threads the method has created by sending the quit signal.
	This takes over in case of an error in the main function"""
	def wrapper(*args, **kwargs):
		cur_threads = set(threading.enumerate())
		try:
			return func(*args, **kwargs)
		finally:
			do_terminate_threads(set(threading.enumerate()) - cur_threads)
		# END finally shutdown threads
	# END wrapper 
	wrapper.__name__ = func.__name__
	return wrapper

#} END decorators

#{ Classes
	
class TerminatableThread(threading.Thread):
	"""A simple thread able to terminate itself on behalf of the user.
	
	Terminate a thread as follows:
	
	t.stop_and_join()
	
	Derived classes call _should_terminate() to determine whether they should 
	abort gracefully
	"""
	__slots__ = '_terminate'
	
	def __init__(self):
		super(TerminatableThread, self).__init__()
		self._terminate = False
		
		
	#{ Subclass Interface
	def _should_terminate(self):
		""":return: True if this thread should terminate its operation immediately"""
		return self._terminate
		
	def _terminated(self):
		"""Called once the thread terminated. Its called in the main thread
		and may perform cleanup operations"""
		pass

	def start(self):
		"""Start the thread and return self"""
		super(TerminatableThread, self).start()
		return self
	
	#} END subclass interface
		
	#{ Interface
	def schedule_termination(self):
		"""Schedule this thread to be terminated as soon as possible.
		:note: this method does not block."""
		self._terminate = True
		
	def stop_and_join(self):
		"""Ask the thread to stop its operation and wait for it to terminate
		:note: Depending on the implenetation, this might block a moment"""
		self._terminate = True
		self.join()
		self._terminated()
	#} END interface
	
	
class StopProcessing(Exception):
	"""If thrown in a function processed by a WorkerThread, it will terminate"""
	

class WorkerThread(TerminatableThread):
	""" This base allows to call functions on class instances natively.
	As it is meant to work with a pool, the result of the call must be 
	handled by the callee.
	The thread runs forever unless it receives the terminate signal using 
	its task queue.
	
	Tasks could be anything, but should usually be class methods and arguments to
	allow the following:
	
	inq = Queue()
	w = WorkerThread(inq)
	w.start()
	inq.put((WorkerThread.<method>, args, kwargs))
	
	finally we call quit to terminate asap.
	
	alternatively, you can make a call more intuitively - the output is the output queue
	allowing you to get the result right away or later
	w.call(arg, kwarg='value').get()
	
	inq.put(WorkerThread.quit)
	w.join()
	
	You may provide the following tuples as task:
	t[0] = class method, function or instance method
	t[1] = optional, tuple or list of arguments to pass to the routine
	t[2] = optional, dictionary of keyword arguments to pass to the routine
	"""
	__slots__ = ('inq')
	
	
	# define how often we should check for a shutdown request in case our 
	# taskqueue is empty
	shutdown_check_time_s = 0.5
	
	def __init__(self, inq = None):
		super(WorkerThread, self).__init__()
		self.inq = inq
		if inq is None:
			self.inq = Queue.Queue()
	
	@classmethod
	def stop(cls, *args):
		"""If send via the inq of the thread, it will stop once it processed the function"""
		raise StopProcessing
	
	def run(self):
		"""Process input tasks until we receive the quit signal"""
		gettask = self.inq.get
		while True:
			if self._should_terminate():
				break
			# END check for stop request
			
			# note: during shutdown, this turns None in the middle of waiting 
			# for an item to be put onto it - we can't du anything about it - 
			# even if we catch everything and break gracefully, the parent 
			# call will think we failed with an empty exception.
			# Hence we just don't do anything about it. Alternatively
			# we could override the start method to get our own bootstrapping, 
			# which would mean repeating plenty of code in of the threading module.
			tasktuple = gettask()
				
			# needing exactly one function, and one arg
			routine, arg = tasktuple
			
			try:
				try:
					rval = None
					if inspect.ismethod(routine):
						if routine.im_self is None:
							rval = routine(self, arg)
						else:
							rval = routine(arg)
					elif inspect.isroutine(routine):
						rval = routine(arg)
					else:
						# ignore unknown items
						sys.stderr.write("%s: task %s was not understood - terminating\n" % (self.getName(), str(tasktuple)))
						break
					# END make routine call
				finally:
					# make sure we delete the routine to release the reference as soon
					# as possible. Otherwise objects might not be destroyed 
					# while we are waiting
					del(routine)
					del(tasktuple)
			except StopProcessing:
				break
			except Exception,e:
				sys.stderr.write("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !\n" % (self.getName(), str(tasktuple), str(e)))
				continue	# just continue 
			# END routine exception handling
		
			# END handle routine release
		# END endless loop
	
	def stop_and_join(self):
		"""Send stop message to ourselves - we don't block, the thread will terminate 
		once it has finished processing its input queue to receive our termination
		event"""
		# DONT call superclass as it will try to join - join's don't work for 
		# some reason, as python apparently doesn't switch threads (so often)
		# while waiting ... I don't know, but the threads respond properly, 
		# but only if dear python switches to them
		self.inq.put((self.stop, None))
#} END classes