This file is indexed.

/usr/share/pyshared/mdp/parallel/scheduling.py is in python-mdp 3.3-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
"""
This module contains the basic classes for task processing via a scheduler.
"""

import threading
import time
import os
try:
    import multiprocessing
except ImportError:
    # Python version < 2.6, have to use fallbacks
    pass


class ResultContainer(object):
    """Abstract base class for result containers."""

    def add_result(self, result_data, task_index):
        """Store a result in the container."""
        pass

    def get_results(self):
        """Return results and reset container."""
        pass


class ListResultContainer(ResultContainer):
    """Basic result container using simply a list."""

    def __init__(self):
        super(ListResultContainer, self).__init__()
        self._results = []

    def add_result(self, result, task_index):
        """Store a result in the container."""
        self._results.append(result)

    def get_results(self):
        """Return the list of results and reset this container.

        Note that the results are stored in the order that they come in, which
        can be different from the orginal task order.
        """
        results = self._results
        self._results = []
        return results


class OrderedResultContainer(ListResultContainer):
    """Default result container with automatic restoring of the result order.

    In general the order of the incoming results in the scheduler can be
    different from the order of the tasks, since some tasks may finish quicker
    than other tasks. This result container restores the original order.
    """

    def __init__(self):
        super(OrderedResultContainer, self).__init__()

    def add_result(self, result, task_index):
        """Store a result in the container.

        The task index is also stored and later used to reconstruct the
        original task order.
        """
        self._results.append((result, task_index))

    def get_results(self):
        """Sort the results into the original order and return them in list."""
        results = self._results
        self._results = []
        results.sort(key=lambda x: x[1])
        return list(zip(*results))[0]


class TaskCallable(object):
    """Abstract base class for task callables.

    This class encapsulates the task behavior and the related fixed data
    (data which stays constant over multiple tasks).
    """

    def setup_environment(self):
        """This hook method is only called when the callable is first called
        in a different Python process / environment.

        It can be used for modifications in the Python environment that are
        required by this callable.
        """
        pass

    def __call__(self, data):
        """Perform the computation and return the result.

        Override this method with a concrete implementation.
        """
        return data

    # TODO: is 'fork' really a good name?
    #    As an alternative one could have a separate CallableFactory class,
    #    but this would make things more complicated for simple callables
    #    (similar to why iterators implement the iterable interface).
    def fork(self):
        """Return a fork of this callable, e.g. by making a copy.

        This method is always called exactly once before a callable is called,
        so instead of the original callable a fresh fork is called. This
        ensures that the original callable is preserved when caching is used.
        If the callable is not modified by the call then it can simply return
        itself.
        """
        return self


class SqrTestCallable(TaskCallable):
    """Callable for testing."""

    def __call__(self, data):
        """Return the squared data."""
        return data**2


class SleepSqrTestCallable(TaskCallable):
    """Callable for testing."""

    def __call__(self, data):
        """Return the squared data[0] after sleeping for data[1] seconds."""
        time.sleep(data[1])
        return data[0]**2


class MDPVersionCallable(TaskCallable):
    """Callable For testing MDP version.

    Should return a unique comparable object which includes version information
    and installed/used modules.
    """

    def __call__(self, data):
        """Ignore input data and return mdp.info()"""
        import mdp
        return mdp.config.info()


class TaskCallableWrapper(TaskCallable):
    """Wrapper to provide a fork method for simple callables like a function.

    This wrapper is applied internally in Scheduler.
    """

    def __init__(self, task_callable):
        """Store and wrap the callable."""
        self._callable = task_callable

    def __call__(self, data):
        """Call the internal callable with the data and return the result."""
        return self._callable(data)


# helper function
def cpu_count():
    """Return the number of CPU cores."""
    try:
        return multiprocessing.cpu_count()
    # TODO: remove except clause once we support only python >= 2.6
    except NameError:
        ## This code part is taken from parallel python.
        # Linux, Unix and MacOS
        if hasattr(os, "sysconf"):
            if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
                # Linux & Unix
                n_cpus = os.sysconf("SC_NPROCESSORS_ONLN")
                if isinstance(n_cpus, int) and n_cpus > 0:
                    return n_cpus
            else:
                # OSX
                return int(os.popen2("sysctl -n hw.ncpu")[1].read())
        # Windows
        if "NUMBER_OF_PROCESSORS" in os.environ:
            n_cpus = int(os.environ["NUMBER_OF_PROCESSORS"])
            if n_cpus > 0:
                return n_cpus
        # Default
        return 1


class Scheduler(object):
    """Base class and trivial implementation for schedulers.

    New tasks are added with add_task(data, callable).
    get_results then returns the results (and locks if tasks are
    pending).

    In this simple scheduler implementation the tasks are simply executed in the
    add_task method.
    """

    def __init__(self, result_container=None, verbose=False):
        """Initialize the scheduler.

        result_container -- Instance of ResultContainer that is used to store
            the results (default is None, in which case a ListResultContainer
            is used).
        verbose -- If True then status messages will be printed to sys.stdout.
        """
        if result_container is None:
            result_container = OrderedResultContainer()
        self.result_container = result_container
        self.verbose = verbose
        self._n_open_tasks = 0  # number of tasks that are currently running
        # count the number of submitted tasks, also used for the task index
        self._task_counter = 0
        self._lock = threading.Lock()
        self._last_callable = None  # last callable is stored
        # task index of the _last_callable, can be *.5 if updated between tasks
        self._last_callable_index = -1.0

    ## public read only properties ##

    @property
    def task_counter(self):
        """This property counts the number of submitted tasks."""
        return self._task_counter

    @property
    def n_open_tasks(self):
        """This property counts of submitted but unfinished tasks."""
        return self._n_open_tasks

    ## main methods ##

    def add_task(self, data, task_callable=None):
        """Add a task to be executed.

        data -- Data for the task.
        task_callable -- A callable, which is called with the data. If it is
            None (default value) then the last provided callable is used.
            If task_callable is not an instance of TaskCallable then a
            TaskCallableWrapper is used.

        The callable together with the data constitutes the task. This method
        blocks if there are no free recources to store or process the task
        (e.g. if no free worker processes are available).
        """
        self._lock.acquire()
        if task_callable is None:
            if self._last_callable is None:
                raise Exception("No task_callable specified and " +
                                "no previous callable available.")
        self._n_open_tasks += 1
        self._task_counter += 1
        task_index = self.task_counter
        if task_callable is None:
            # use the _last_callable_index in _process_task to
            # decide if a cached callable can be used
            task_callable = self._last_callable
        else:
            if not hasattr(task_callable, "fork"):
                # not a TaskCallable (probably a function), so wrap it
                task_callable = TaskCallableWrapper(task_callable)
            self._last_callable = task_callable
            self._last_callable_index = self.task_counter
        self._process_task(data, task_callable, task_index)

    def set_task_callable(self, task_callable):
        """Set the callable that will be used if no task_callable is given.

        Normally the callables are provided via add_task, in which case there
        is no need for this method.

        task_callable -- Callable that will be used unless a new task_callable
            is given.
        """
        self._lock.acquire()
        self._last_callable = task_callable
        # set _last_callable_index to half value since the callable is newer
        # than the last task, but not newer than the next incoming task
        self._last_callable_index = self.task_counter + 0.5
        self._lock.release()

    def _store_result(self, result, task_index):
        """Store a result in the internal result container.

        result -- Result data
        task_index -- Task index. Can be None if an error occured.

        This function blocks to avoid any problems during result storage.
        """
        self._lock.acquire()
        self.result_container.add_result(result, task_index)
        if self.verbose:
            if task_index is not None:
                print "    finished task no. %d" % task_index
            else:
                print "    task failed"
        self._n_open_tasks -= 1
        self._lock.release()

    def get_results(self):
        """Get the accumulated results from the result container.

        This method blocks if there are open tasks.
        """
        while True:
            self._lock.acquire()
            if self._n_open_tasks == 0:
                results = self.result_container.get_results()
                self._lock.release()
                return results
            else:
                self._lock.release()
                time.sleep(1)

    def shutdown(self):
        """Controlled shutdown of the scheduler.

        This method should always be called when the scheduler is no longer
        needed and before the program shuts down! Otherwise one might get
        error messages.
        """
        self._shutdown()

    ## Context Manager interface ##

    def __enter__(self):
        """Return self."""
        return self

    def __exit__(self, type, value, traceback):
        """Shutdown the scheduler.

        It is important that all the calculations have finished
        when this is called, otherwise the shutdown might fail.
        """
        self.shutdown()

    ## override these methods in custom schedulers ##

    def _process_task(self, data, task_callable, task_index):
        """Process the task and store the result.

        You can override this method for custom schedulers.

        Warning: When this method is entered is has the lock, the lock must be
        released here.

        Warning: Note that fork has not been called yet, so the provided
        task_callable must not be called. Only a forked version can be called.
        """
        # IMPORTANT: always call fork, since it must be called at least once!
        task_callable = task_callable.fork()
        result = task_callable(data)
        # release lock before store_result
        self._lock.release()
        self._store_result(result, task_index)

    def _shutdown(self):
        """Hook method for shutdown to be used in custom schedulers."""
        pass