This file is indexed.

/usr/share/pyshared/Pyro/EventService/Server.py is in pyro 1:3.14-1.1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#############################################################################
#  
#	Event Service daemon and server classes
#
#	This is part of "Pyro" - Python Remote Objects
#	which is (c) Irmen de Jong - irmen@razorvine.net
#
#############################################################################

import time, types, re, sys, traceback, os
import Pyro.core, Pyro.naming, Pyro.util, Pyro.constants
from Pyro.errors import *
from Pyro.EventService.Event import Event
import Queue
from threading import Thread

Log=Pyro.util.Log

# SUBSCRIBER - each subscriber has one of these worker threads
class Subscriber(Thread):
	def __init__(self, remote):
		Thread.__init__(self)
		self.remote=remote
		# set the callback method to ONEWAY mode:
		self.remote._setOneway("event")
		self.queue=Queue.Queue(Pyro.config.PYRO_ES_QUEUESIZE)
	def run(self):
		while 1:
			event=self.queue.get()
			if isinstance(event,Event):
				try:
					self.remote.event(event)
				except ProtocolError,x:
					break
			else:
				break # it was no Event, so exit
		# this reads all pending items from the queue so that any
		# tasks that are blocked on the queue can continue.
		(queue, self.queue) = (self.queue, None)
		try:
			while 1:
				queue.get(block=0)
		except Queue.Empty:
			pass
		# release the remote connection
		self.remote._release()
		del self.remote
	def send(self, event):
		if self.queue:
			self.queue.put(event, block=Pyro.config.PYRO_ES_BLOCKQUEUE)
	def running(self):
		return self.queue

# The EVENTSERVICE is the actual Pyro server.
#
# BTW: Subscribers are remembered trough their proxy class.
# This class is capable of being a correct key in a dictionary.
class EventService(Pyro.core.ObjBase):
	def __init__(self):
		Pyro.core.ObjBase.__init__(self)
		self.subscribers={}			# subject -> { threadname-> subscriberthread }
		self.subscribersMatch={}	# subjectPattern -> { threadname->subscriberthread }
		self.subscriptionWorkers={}	# subscriber -> subscription thread object
	def _mksequence(self, seq):
		if not (type(seq) in (types.TupleType,types.ListType)):
			return (seq,)
		return seq
	def getSubscriptionWorker(self, subscriber):
		# If this subscriber doesn't have its own subscription thread, create one.
		if subscriber not in self.subscriptionWorkers:
			worker = Subscriber(subscriber)
			worker.start()
			self.subscriptionWorkers[subscriber]=worker
			return worker
		else:
			return self.subscriptionWorkers[subscriber]
	def subscribe(self, subjects, subscriber):
		if not subjects: return
		# Subscribe into a dictionary; this way; somebody can subscribe
		# only once to this subject. Subjects are exact strings.
		for subject in self._mksequence(subjects):
			worker=self.getSubscriptionWorker(subscriber)
			self.subscribers.setdefault(subject.lower(),{}) [worker.getName()]=worker
	def subscribeMatch(self, subjects, subscriber):
		if not subjects: return
		# Subscribe into a dictionary; this way; somebody can subscribe
		# only once to this subject. Subjects are regex patterns.
		for subject in self._mksequence(subjects):
			worker=self.getSubscriptionWorker(subscriber)
			matcher = re.compile(subject,re.IGNORECASE)
			self.subscribersMatch.setdefault(matcher,{}) [worker.getName()]=worker
	def unsubscribe(self, subjects, subscriber):
		if not subjects: return
		for subject in self._mksequence(subjects):
			try:
				blaat=self.subscribers[subject.lower()]  # check for subject
				worker=self.subscriptionWorkers[subscriber]
				del self.subscribers[subject.lower()] [worker.getName()]
				self.killWorkerIfLastSubject(subscriber, worker)
			except KeyError,x:
				try:
					m=re.compile(subject,re.IGNORECASE)
					worker=self.subscriptionWorkers[subscriber]
					del self.subscribersMatch[m] [worker.getName()]
					self.killWorkerIfLastSubject(subscriber,worker)
				except KeyError,x:
					pass

	def publish(self, subjects, message):
		if not subjects: return
		# keep the creation time, this must be the same for all events.
		creationTime=time.time()
		# publish a message. Subjects must be exact strings
		for subject in self._mksequence(subjects):
			event = Event(subject, message, creationTime)
			subjectLC=subject.lower()
			try:
				for (name,s) in self.subscribers[subjectLC].items():
					try:
						if s.running():
							s.send(event)  
						else:
							try:
								del self.subscribers[subjectLC][name]
							except KeyError:
								pass
					except Queue.Full:
						pass
			except KeyError:
				pass
			# process the subject patterns
			for (m,subs) in self.subscribersMatch.items():
				if m.match(subject):
					# send event to all subscribers
					for (name,s) in subs.items():	
						try:
							if s.running():
								s.send(event)  
							else:
								try:
									del subs[name]
								except KeyError:
									pass
						except Queue.Full:
							pass

	def killWorkerIfLastSubject(self, subscriber, worker):
		item=(worker.getName(),worker)
		for v in self.subscribers.values():
			if item in v.items():
				return
		for v in self.subscribersMatch.values():
			if item in v.items():
				return
		worker.send("QUIT")
		del self.subscriptionWorkers[subscriber]


class EventServiceStarter(object):
	def __init__(self, identification=None):
		Pyro.core.initServer()
		self.running=1
		self.identification=identification
		self.started = Pyro.util.getEventObject()
	def start(self, *args, **kwargs):			# see _start for allowed arguments
		kwargs["startloop"]=1
		self._start(*args, **kwargs )
	def initialize(self, *args, **kwargs):		# see _start for allowed arguments
		kwargs["startloop"]=0
		self._start( *args, **kwargs )
	def getServerSockets(self):
		return self.daemon.getServerSockets()
	def waitUntilStarted(self,timeout=None):
		self.started.wait(timeout)
		return self.started.isSet()
	def _start(self,hostname='',port=None,startloop=1,useNameServer=1,norange=0):
		daemon = Pyro.core.Daemon(host=hostname,port=port,norange=norange)
		if self.identification:
			daemon.setAllowedIdentifications([self.identification])
			print 'Requiring connection authentication.'

		if useNameServer:
			locator = Pyro.naming.NameServerLocator(identification=self.identification)
			ns = locator.getNS()
	
			# check if ES already running
			try:
				ns.resolve(Pyro.constants.EVENTSERVER_NAME)
				print 'The Event Server appears to be already running.'
				print 'You cannot start multiple Event Servers.'
				ans=raw_input('Start new Event Server anyway (y/n)? ')
				if ans!='y':
					return
				ns.unregister(Pyro.constants.EVENTSERVER_NAME)
			except NamingError:
				pass
	
			daemon.useNameServer(ns)

		es = EventService()

		esURI=daemon.connect(es, Pyro.constants.EVENTSERVER_NAME)
		print 'URI=',esURI

		message = daemon.validateHostnameAndIP()
		if message:
			print "\nWARNING:",message,"\n"

		print 'Event Server started.'

		self.started.set()		# signal that we've started.

		if startloop:
			Log.msg('ES daemon','This is the Pyro Event Server.')
			
			try:
				if os.name!="java":
					# I use a timeout here otherwise you can't break gracefully on Windows
					daemon.setTimeout(20)
				daemon.requestLoop(lambda s=self: s.running)
			except KeyboardInterrupt:
				Log.warn('ES daemon','shutdown on user break signal')
				print 'Shutting down on user break signal.'
				self.shutdown(es)
			except:
				try:
					(exc_type, exc_value, exc_trb) = sys.exc_info()
					out = ''.join(traceback.format_exception(exc_type, exc_value, exc_trb)[-5:])
					Log.error('ES daemon', 'Unexpected exception, type',exc_type,
						'\n--- partial traceback of this exception follows:\n',
						out,'\n--- end of traceback')
					print '*** Exception occured!!! Partial traceback:'
					print out
					print '*** Resuming operations...'
				finally:	
					del exc_type, exc_value, exc_trb    # delete refs to allow proper GC

			Log.msg('ES daemon','Shut down gracefully.')
			print 'Event Server gracefully stopped.'
		else:
			# no loop, store the required objects for getServerSockets()
			self.daemon=daemon
			self.es=es
			if os.name!="java":
				daemon.setTimeout(20)  # XXX fixed timeout

	def mustContinueRunning(self):
		return self.running
	def handleRequests(self, timeout=None):
		# this method must be called from a custom event loop
		self.daemon.handleRequests(timeout=timeout)
	def shutdown(self,es):
		if es:
			# internal shutdown call with specified ES object
			daemon=es.getDaemon()
		else:
			# custom shutdown call w/o specified ES object, use stored instance
			daemon=self.daemon
			es=self.es
			del self.es, self.daemon
		try:
			daemon.disconnect(es) # clean up nicely
		except NamingError,x:
			Log.warn('ES daemon','disconnect error during shutdown:',x)
		except ConnectionClosedError,x:
			Log.warn('ES daemon','lost connection with Name Server, cannot unregister')
		self.running=0
		daemon.shutdown()


def start(argv):
	Args = Pyro.util.ArgParser()
	Args.parse(argv,'hNn:p:i:')
	if Args.hasOpt('h'):
		print 'Usage: pyro-es [-h] [-n hostname] [-p port] [-N] [-i identification]'
		print '  where -p = ES server port (0 for auto)'
		print '        -n = non-default hostname to bind on'
		print '        -N = do not use the name server'
		print '        -i = the required authentication ID for ES clients,'
		print '             also used to connect to other Pyro services'
		print '        -h = print this help'
		raise SystemExit
	hostname = Args.getOpt('n',None)
	port = Args.getOpt('p',None)
	useNameServer = not Args.hasOpt('N')
	ident = Args.getOpt('i',None)
	if port:
		port=int(port)
	norange=(port==0)
	Args.printIgnored()
	if Args.args:
		print 'Ignored arguments:',' '.join(Args.args)

	print '*** Pyro Event Server ***'
	starter=EventServiceStarter(identification=ident)
	starter.start(hostname,port,useNameServer=useNameServer,norange=norange)


# allow easy starting of the ES by using python -m
if __name__=="__main__":
	start(sys.argv[1:])