This file is indexed.

/usr/lib/python2.7/dist-packages/Scientific/Threading/TaskManager.py is in python-scientific 2.9.4-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# This module provides a simple task manager for running parallel
# calculations on shared-memory machines.
#
# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2006-6-12
#

"""
Parallel task manager for shared-memory multiprocessor machines

@undocumented: Task
"""

import threading

class TaskManager:

    """
    Parallel task manager for shared-memory multiprocessor machines

    This class provides a rather simple way to profit from
    shared-memory multiprocessor machines by running several tasks
    in parallel. The calling program decides how many execution threads
    should run at any given time, and then feeds compute tasks to
    the task manager, who runs them as soon as possible without exceeding
    the maximum number of threads.

    The major limitation of this approach lies in Python's Global
    Interpreter Lock. This effectively means that no more than one
    Python thread can run at the same time. Consequently, parallelization
    can only be achieved if the tasks to be parallelized spend
    significant time in C extension modules that release the Global
    Interpreter Lock.
    """

    def __init__(self, nthreads):
        """
        @param nthreads: the maximum number of compute threads that should
                         run in parallel. Note: This does not include the
                         main thread which generated and feeds the task
                         manager!
        @type nthreads: C{int}
        """
        self.nthreads = nthreads
        self.waiting_tasks = []
        self.running_tasks = []
        self.lock = threading.Lock()
        self.data_lock = threading.Lock()
        self.can_run = threading.Condition(self.lock)
        self.can_submit = threading.Condition(self.lock)
        self.task_available = threading.Condition(self.lock)
        self.scheduler = threading.Thread(target=self._scheduler)
        self.scheduler.start()

    def runTask(self, function, args):
        """
        Run a task as soon as processing capacity becomes available

        @param function: the function that will be executed as the body of
                         the task
        @type function: callable
        @param args: the arguments that will be passed to function when it
                     is called. An additional argument will be added at the
                     end: a lock object that the task can use to get
                     temporarily exclusive access to data shared with other
                     tasks.
        @type args: C{tuple}
        """
        self.can_submit.acquire()
        if len(self.waiting_tasks) >= self.nthreads:
            self.can_submit.wait()
        self.can_submit.release()

        task = Task(self, function, args + (self.data_lock,))

        self.task_available.acquire()
        self.waiting_tasks.append(task)
        self.task_available.notify()
        self.task_available.release()

    def terminate(self):
        """
        Wait until all tasks have finished
        """
        self.task_available.acquire()
        self.waiting_tasks.append(None)
        self.task_available.notify()
        self.task_available.release()
        self.scheduler.join()
        done = 0
        while not done:
            self.can_run.acquire()
            if self.running_tasks:
                self.can_run.wait()
            done = len(self.running_tasks) == 0
            self.can_run.release()

    def _removeTask(self, task):
        self.can_run.acquire()
        self.running_tasks.remove(task)
        self.can_run.notifyAll()
        self.can_run.release()
        
    def _scheduler(self):
        while 1:
            self.task_available.acquire()
            if not self.waiting_tasks:
                self.task_available.wait()
            self.task_available.release()

            self.can_run.acquire()
            while len(self.running_tasks) >= self.nthreads:
                self.can_run.wait()
            task = self.waiting_tasks[0]
            del self.waiting_tasks[0]
            if task is not None:
                self.running_tasks.append(task)
                task.start()
            self.can_submit.notify()
            self.can_run.release()
            if task is None:
                break

class Task(threading.Thread):

    def __init__(self, manager, function, args):
        self.__task_manager = manager
        self.__function = function
        self.__args = args
        threading.Thread.__init__(self)

    def run(self):
        apply(self.__function, self.__args)
        self.__task_manager._removeTask(self)


# Test code

if __name__ == '__main__':

    import time
    from random import randint

    def dummy(n, results, lock):
        print n, "running"
        time.sleep(randint(1, 5))
        lock.acquire()
        results.append(n)
        lock.release()
        print n, "finished"

    m = TaskManager(2)
    results = []
    for i in range(5):
        m.runTask(dummy, (i, results))
    m.terminate()
    print "All finished"
    print results