/usr/lib/python3/dist-packages/mdp/parallel/thread_schedule.py is in python3-mdp 3.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | """
Thread based scheduler for distribution across multiple CPU cores.
"""
from __future__ import print_function
from future import standard_library
standard_library.install_aliases()
import threading
import time
import pickle as pickle
from .scheduling import Scheduler, cpu_count
SLEEP_TIME = 0.1 # time spend sleeping when waiting for a thread to finish
class ThreadScheduler(Scheduler):
"""Thread based scheduler.
Because of the GIL this only makes sense if most of the time is spend in
numpy calculations (or some other external non-blocking C code) or for IO,
but can be more efficient than ProcessScheduler because of the
shared memory.
"""
def __init__(self, result_container=None, verbose=False, n_threads=1,
copy_callable=True):
"""Initialize the scheduler.
result_container -- ResultContainer used to store the results.
verbose -- Set to True to get progress reports from the scheduler
(default value is False).
n_threads -- Number of threads used in parallel. If None (default)
then the number of detected CPU cores is used.
copy_callable -- Use deep copies of the task callable in the threads.
This is for example required if some nodes are stateful during
execution (e.g., a BiNode using the coroutine decorator).
"""
super(ThreadScheduler, self).__init__(
result_container=result_container,
verbose=verbose)
if n_threads:
self._n_threads = n_threads
else:
self._n_threads = cpu_count()
self._n_active_threads = 0
self.copy_callable = copy_callable
def _process_task(self, data, task_callable, task_index):
"""Add a task, if possible without blocking.
It blocks when the maximum number of threads is reached (given by
n_threads) or when the system is not able to start a new thread.
"""
task_started = False
while not task_started:
if self._n_active_threads >= self._n_threads:
# release lock for other threads and wait
self._lock.release()
time.sleep(SLEEP_TIME)
self._lock.acquire()
else:
self._lock.release()
task_callable = task_callable.fork()
if self.copy_callable:
# create a deep copy of the task_callable,
# since it might not be thread safe
# (but the fork is still required)
as_str = pickle.dumps(task_callable, -1)
task_callable = pickle.loads(as_str)
try:
thread = threading.Thread(target=self._task_thread,
args=(data, task_callable,
task_index))
thread.start()
task_started = True
except Exception:
if self.verbose:
print ("unable to create new thread,"
" waiting 2 seconds...")
time.sleep(2)
def _task_thread(self, data, task_callable, task_index):
"""Thread function which processes a single task."""
result = task_callable(data)
self._store_result(result, task_index)
self._n_active_threads -= 1
|