This file is indexed.

/usr/lib/python2.7/dist-packages/framework/patterns/asyncworker.py is in fso-frameworkd 0.10.1-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/env python
"""
Asynchronous Worker

This file is part of MPPL: Mickey's Python Pattern Library

(C) 2008 Michael 'Mickey' Lauer <mlauer@vanille-media.de>
GPLv2 or later
"""

__version__ = "1.0.0"
__author__ = "Michael 'Mickey' Lauer <mickey@vanille-media.de>"

from Queue import Queue
import gobject


# FIXME use parent/child logger hierarchy for subsystems/modules
if __debug__:
    class logger():
        @staticmethod
        def debug( message ):
            print message
else:
    import logging
    logger = logging.getLogger( "mppl.asyncworker" )

#============================================================================#
class AsyncWorker( object ):
#============================================================================#
    """
    This class implements an asynchronous worker queue.

    You can insert an element into the queue. If there are any elements,
    glib idle processing will be started and the elements will be processed
    asynchronously. Note that you need a running mainloop.

    If the last element has been processed, the idle task will be removed until
    you add new elements.
    """

    #
    # public API
    #

    def __init__( self ):
        """
        Initialize
        """
        self._queue = Queue()
        self._source = None
        logger.debug( "init" )

    def __del__( self ):
        """
        Cleanup
        """
        if self._source is not None:
            gobject.source_remove( self._source )

    def enqueue( self, *element ):
        """
        Enqueue an element, start processing queue if necessary.
        """
        restart = self._queue.empty() # should we wrap this in a mutex to play thread-safe?
        self._queue.put( element )
        if restart:
            logger.debug( "no elements in queue: starting idle task." )
            self._source = gobject.idle_add( self._processElement )
        else:
            logger.debug( "queue already filled (%s). idle task should still be running (source=%d)..." % ( self._queue.queue, self._source ) )

    def remove( self, *element ):
        """
        Remove one element from the queue.
        """
        self._queue.queue.remove( element )
        if self._queue.empty() and ( self._source is not None ):
            gobject.source_remove( self._source )

    def removeAll( self, *element ):
        while True:
            try:
                self.remove( *element )
            except ValueError:
                break

    def onProcessElement( self, element ):
        """
        Called, when there is an element ready to process.

        Override this to implement your element handling.
        The default implementation does nothing.
        """
        pass

    #
    # private API
    #
    def _processElement( self ):
        """
        Process an element. Start idle processing, if necessary.
        """
        logger.debug( "_processElement()" )
        if self._queue.empty():
            logger.debug( "no more elements: stopping idle task." )
            self._source = None
            return False # don't call me again
        next = self._queue.get()
        logger.debug( "got an element from the queue" )
        try:
            self.onProcessElement( next )
        except:
            logger.exception( 'exception while processing element %s:', next )
        return True

#============================================================================#
class SynchronizedAsyncWorker( AsyncWorker ):
#============================================================================#
    """
    This class implements a synchronized asynchronous worker queue.
    """

    #
    # public API
    #

    def trigger( self ):
        """
        Call, when you are ready to process the next element.
        """
        if not self._queue.empty():
           self._source = gobject.idle_add( self._processElement )

    #
    # private API
    #
    def _processElement( self ):
        """
        Process an element. Stop idle processing.
        """
        logger.debug( "_processElement()" )

        if self._queue.empty():
            logger.warning( "no more elements" )
            self._source = None
            return False # don't call me again

        next = self._queue.get()
        logger.debug( "got an element from the queue" )
        try:
            self.onProcessElement( next )
        except:
            logger.exception( 'exception while processing element %s:', next )
        self._source = None
        return False

#============================================================================#
if __name__ == "__main__":
#============================================================================#

    class TestAsyncWorker( SynchronizedAsyncWorker ):
        def onProcessElement( self, element ):
            print ( "processing %s\n>>>" % repr(element) )
            self.trigger()

    import logging

    logging.basicConfig( \
        level=logging.DEBUG,
        format='%(asctime)s %(levelname)-8s %(message)s',
        datefmt='%d.%b.%Y %H:%M:%S',
        )

    gobject.threads_init()
    import thread

    a = TestAsyncWorker()
    for i in xrange( 10 ):
        a.enqueue( i )

    for i in xrange( 10 ):
        a.enqueue( "yo" )

    a.removeAll( "yo" )
    a.remove( 9 )


    mainloop = gobject.MainLoop()
    thread.start_new_thread( mainloop.run, () )

    import time
    time.sleep( 1 )

    for i in xrange( 1000 ):
        a.enqueue( i )

    del a
    import sys
    sys.exit( 0 )