/usr/lib/python2.7/dist-packages/framework/patterns/asyncworker.py is in fso-frameworkd 0.10.1-2ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | #!/usr/bin/env python
"""
Asynchronous Worker
This file is part of MPPL: Mickey's Python Pattern Library
(C) 2008 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
GPLv2 or later
"""
__version__ = "1.0.0"
__author__ = "Michael 'Mickey' Lauer <mickey@vanille-media.de>"
from Queue import Queue
import gobject
# FIXME use parent/child logger hierarchy for subsystems/modules
if __debug__:
class logger():
@staticmethod
def debug( message ):
print message
else:
import logging
logger = logging.getLogger( "mppl.asyncworker" )
#============================================================================#
class AsyncWorker( object ):
#============================================================================#
"""
This class implements an asynchronous worker queue.
You can insert an element into the queue. If there are any elements,
glib idle processing will be started and the elements will be processed
asynchronously. Note that you need a running mainloop.
If the last element has been processed, the idle task will be removed until
you add new elements.
"""
#
# public API
#
def __init__( self ):
"""
Initialize
"""
self._queue = Queue()
self._source = None
logger.debug( "init" )
def __del__( self ):
"""
Cleanup
"""
if self._source is not None:
gobject.source_remove( self._source )
def enqueue( self, *element ):
"""
Enqueue an element, start processing queue if necessary.
"""
restart = self._queue.empty() # should we wrap this in a mutex to play thread-safe?
self._queue.put( element )
if restart:
logger.debug( "no elements in queue: starting idle task." )
self._source = gobject.idle_add( self._processElement )
else:
logger.debug( "queue already filled (%s). idle task should still be running (source=%d)..." % ( self._queue.queue, self._source ) )
def remove( self, *element ):
"""
Remove one element from the queue.
"""
self._queue.queue.remove( element )
if self._queue.empty() and ( self._source is not None ):
gobject.source_remove( self._source )
def removeAll( self, *element ):
while True:
try:
self.remove( *element )
except ValueError:
break
def onProcessElement( self, element ):
"""
Called, when there is an element ready to process.
Override this to implement your element handling.
The default implementation does nothing.
"""
pass
#
# private API
#
def _processElement( self ):
"""
Process an element. Start idle processing, if necessary.
"""
logger.debug( "_processElement()" )
if self._queue.empty():
logger.debug( "no more elements: stopping idle task." )
self._source = None
return False # don't call me again
next = self._queue.get()
logger.debug( "got an element from the queue" )
try:
self.onProcessElement( next )
except:
logger.exception( 'exception while processing element %s:', next )
return True
#============================================================================#
class SynchronizedAsyncWorker( AsyncWorker ):
#============================================================================#
"""
This class implements a synchronized asynchronous worker queue.
"""
#
# public API
#
def trigger( self ):
"""
Call, when you are ready to process the next element.
"""
if not self._queue.empty():
self._source = gobject.idle_add( self._processElement )
#
# private API
#
def _processElement( self ):
"""
Process an element. Stop idle processing.
"""
logger.debug( "_processElement()" )
if self._queue.empty():
logger.warning( "no more elements" )
self._source = None
return False # don't call me again
next = self._queue.get()
logger.debug( "got an element from the queue" )
try:
self.onProcessElement( next )
except:
logger.exception( 'exception while processing element %s:', next )
self._source = None
return False
#============================================================================#
if __name__ == "__main__":
#============================================================================#
class TestAsyncWorker( SynchronizedAsyncWorker ):
def onProcessElement( self, element ):
print ( "processing %s\n>>>" % repr(element) )
self.trigger()
import logging
logging.basicConfig( \
level=logging.DEBUG,
format='%(asctime)s %(levelname)-8s %(message)s',
datefmt='%d.%b.%Y %H:%M:%S',
)
gobject.threads_init()
import thread
a = TestAsyncWorker()
for i in xrange( 10 ):
a.enqueue( i )
for i in xrange( 10 ):
a.enqueue( "yo" )
a.removeAll( "yo" )
a.remove( 9 )
mainloop = gobject.MainLoop()
thread.start_new_thread( mainloop.run, () )
import time
time.sleep( 1 )
for i in xrange( 1000 ):
a.enqueue( i )
del a
import sys
sys.exit( 0 )
|