This file is indexed.

/usr/lib/python3/dist-packages/mdp/parallel/thread_schedule.py is in python3-mdp 3.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""
Thread based scheduler for distribution across multiple CPU cores.
"""
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()

import threading
import time
import pickle as pickle

from .scheduling import Scheduler, cpu_count

SLEEP_TIME = 0.1  # time spend sleeping when waiting for a thread to finish


class ThreadScheduler(Scheduler):
    """Thread based scheduler.

    Because of the GIL this only makes sense if most of the time is spend in
    numpy calculations (or some other external non-blocking C code) or for IO,
    but can be more efficient than ProcessScheduler because of the
    shared memory.
    """

    def __init__(self, result_container=None, verbose=False, n_threads=1,
                 copy_callable=True):
        """Initialize the scheduler.

        result_container -- ResultContainer used to store the results.
        verbose -- Set to True to get progress reports from the scheduler
            (default value is False).
        n_threads -- Number of threads used in parallel. If None (default)
            then the number of detected CPU cores is used.
        copy_callable -- Use deep copies of the task callable in the threads.
            This is for example required if some nodes are stateful during
            execution (e.g., a BiNode using the coroutine decorator).
        """
        super(ThreadScheduler, self).__init__(
                                            result_container=result_container,
                                            verbose=verbose)
        if n_threads:
            self._n_threads = n_threads
        else:
            self._n_threads = cpu_count()
        self._n_active_threads = 0
        self.copy_callable = copy_callable

    def _process_task(self, data, task_callable, task_index):
        """Add a task, if possible without blocking.

        It blocks when the maximum number of threads is reached (given by
        n_threads) or when the system is not able to start a new thread.
        """
        task_started = False
        while not task_started:
            if self._n_active_threads >= self._n_threads:
                # release lock for other threads and wait
                self._lock.release()
                time.sleep(SLEEP_TIME)
                self._lock.acquire()
            else:
                self._lock.release()
                task_callable = task_callable.fork()
                if self.copy_callable:
                    # create a deep copy of the task_callable,
                    # since it might not be thread safe 
                    # (but the fork is still required)
                    as_str = pickle.dumps(task_callable, -1)
                    task_callable = pickle.loads(as_str)
                try:
                    thread = threading.Thread(target=self._task_thread,
                                              args=(data, task_callable,
                                                    task_index))
                    thread.start()
                    task_started = True
                except Exception:
                    if self.verbose:
                        print ("unable to create new thread,"
                               " waiting 2 seconds...")
                    time.sleep(2)

    def _task_thread(self, data, task_callable, task_index):
        """Thread function which processes a single task."""
        result = task_callable(data)
        self._store_result(result, task_index)
        self._n_active_threads -= 1