/usr/share/pyshared/mdp/parallel/process_schedule.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 | """
Process based scheduler for distribution across multiple CPU cores.
"""
# TODO: use a queue instead of sleep?
# http://docs.python.org/library/queue.html
# TODO: use shared memory for data numpy arrays, but this also requires the
# use of multiprocessing since the ctype objects can't be pickled
# TODO: only return result when get_results is called,
# this sends a special request to the processes to send their data,
# we would have to add support for this to the callable,
# might get too complicated
# TODO: leverage process forks on unix systems,
# might be very efficient due to copy-on-write, see
# http://gael-varoquaux.info/blog/?p=119
# http://www.ibm.com/developerworks/aix/library/au-multiprocessing/
import sys
import os
import cPickle as pickle
import threading
import subprocess
import time
import traceback
import warnings
if __name__ == "__main__":
# try to make sure that mdp can be imported by adding it to sys.path
mdp_path = os.path.realpath(__file__)
mdp_index = mdp_path.rfind("mdp")
if mdp_index:
mdp_path = mdp_path[:mdp_index-1]
# mdp path goes after sys.path
sys.path.append(mdp_path)
# shut off warnings of any kinds
warnings.filterwarnings("ignore", ".*")
import mdp
from mdp.parallel import Scheduler, cpu_count
SLEEP_TIME = 0.1 # time spend sleeping when waiting for a free process
class ProcessScheduler(Scheduler):
"""Scheduler that distributes the task to multiple processes.
The subprocess module is used to start the requested number of processes.
The execution of each task is internally managed by dedicated thread.
This scheduler should work on all platforms (at least on Linux,
Windows XP and Vista).
"""
def __init__(self, result_container=None, verbose=False, n_processes=1,
source_paths=None, python_executable=None,
cache_callable=True):
"""Initialize the scheduler and start the slave processes.
result_container -- ResultContainer used to store the results.
verbose -- Set to True to get progress reports from the scheduler
(default value is False).
n_processes -- Number of processes used in parallel. If None (default)
then the number of detected CPU cores is used.
source_paths -- List of paths that are added to sys.path in
the processes to make the task unpickling work. A single path
instead of a list is also accepted.
If None (default value) then source_paths is set to sys.path.
To prevent this you can specify an empty list.
python_executable -- Python executable that is used for the processes.
The default value is None, in which case sys.executable will be
used.
cache_callable -- Cache the task objects in the processes (default
is True). Disabling caching can reduce the memory usage, but will
generally be less efficient since the task_callable has to be
pickled each time.
"""
super(ProcessScheduler, self).__init__(
result_container=result_container,
verbose=verbose)
if n_processes:
self._n_processes = n_processes
else:
self._n_processes = cpu_count()
self._cache_callable = cache_callable
if python_executable is None:
python_executable = sys.executable
# get the location of this module to start the processes
module_path = os.path.dirname(mdp.__file__)
module_file = os.path.join(module_path, "parallel",
"process_schedule.py")
# Note: -u argument is important on Windows to set stdout to binary
# mode. Otherwise you might get a strange error message for
# copy_reg.
process_args = [python_executable, "-u", module_file]
process_args.append(str(self._cache_callable))
if isinstance(source_paths, str):
source_paths = [source_paths]
if source_paths is None:
source_paths = sys.path
process_args += source_paths
# list of processes not in use, start the processes now
self._free_processes = [subprocess.Popen(args=process_args,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE)
for _ in range(self._n_processes)]
# tag each process with its cached callable task_index,
# this is compared with _last_callable_index to check if the cached
# task_callable is still up to date
for process in self._free_processes:
process._callable_index = -1
if self.verbose:
print ("scheduler initialized with %d processes" %
self._n_processes)
def _shutdown(self):
"""Shut down the slave processes.
If a process is still running a task then an exception is raised.
"""
self._lock.acquire()
if len(self._free_processes) < self._n_processes:
raise Exception("some slave process is still working")
for process in self._free_processes:
pickle.dump("EXIT", process.stdin)
process.stdin.flush()
self._lock.release()
if self.verbose:
print "scheduler shutdown"
def _process_task(self, data, task_callable, task_index):
"""Add a task, if possible without blocking.
It blocks when the system is not able to start a new thread
or when the processes are all in use.
"""
task_started = False
while not task_started:
if not len(self._free_processes):
# release lock for other threads and wait
self._lock.release()
time.sleep(SLEEP_TIME)
self._lock.acquire()
else:
try:
process = self._free_processes.pop()
self._lock.release()
thread = threading.Thread(target=self._task_thread,
args=(process, data,
task_callable, task_index))
thread.start()
task_started = True
except thread.error:
if self.verbose:
print ("unable to create new task thread,"
" waiting 2 seconds...")
time.sleep(2)
def _task_thread(self, process, data, task_callable, task_index):
"""Thread function which cares for a single task.
The task is pushed to the process via stdin, then we wait for the
result on stdout, pass the result to the result container, free
the process and exit.
"""
try:
if self._cache_callable:
# check if the cached callable is up to date
if process._callable_index < self._last_callable_index:
process._callable_index = self._last_callable_index
else:
task_callable = None
# push the task to the process
pickle.dump((data, task_callable, task_index),
process.stdin, protocol=-1)
process.stdin.flush()
# wait for result to arrive
result = pickle.load(process.stdout)
except:
traceback.print_exc()
self._free_processes.append(process)
sys.exit("failed to execute task %d in process:" % task_index)
# store the result and clean up
self._store_result(result, task_index)
self._free_processes.append(process)
def _process_run(cache_callable=True):
"""Run this function in a worker process to receive and run tasks.
It waits for tasks on stdin, and sends the results back via stdout.
"""
# use sys.stdout only for pickled objects, everything else goes to stderr
# NOTE: .buffer is the binary mode interface for stdin and out in py3k
try:
pickle_out = sys.stdout.buffer
except AttributeError:
pickle_out = sys.stdout
try:
pickle_in = sys.stdin.buffer
except AttributeError:
pickle_in = sys.stdin
sys.stdout = sys.stderr
exit_loop = False
last_callable = None # cached callable
while not exit_loop:
task = None
try:
# wait for task to arrive
task = pickle.load(pickle_in)
if task == "EXIT":
exit_loop = True
else:
data, task_callable, task_index = task
if task_callable is None:
if last_callable is None:
err = ("No callable was provided and no cached "
"callable is available.")
raise Exception(err)
task_callable = last_callable.fork()
elif cache_callable:
# store callable in cache
last_callable = task_callable
task_callable.setup_environment()
task_callable = task_callable.fork()
else:
task_callable.setup_environment()
result = task_callable(data)
del task_callable # free memory
pickle.dump(result, pickle_out, protocol=-1)
pickle_out.flush()
except Exception, exception:
# return the exception instead of the result
if task is None:
print "unpickling a task caused an exception in a process:"
else:
print "task %d caused exception in process:" % task[2]
print exception
traceback.print_exc()
sys.stdout.flush()
sys.exit()
if __name__ == "__main__":
# first argument is cache_callable flag
cache_callable = sys.argv[1] == "True"
if len(sys.argv) > 2:
# remaining arguments are code paths,
# put them in front so that they take precedence over PYTHONPATH
new_paths = [sys_arg for sys_arg in sys.argv[2:]
if sys_arg not in sys.path]
sys.path = new_paths + sys.path
_process_run(cache_callable=cache_callable)
|