/usr/share/pyshared/mdp/parallel/scheduling.py is in python-mdp 3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 | """
This module contains the basic classes for task processing via a scheduler.
"""
import threading
import time
import os
try:
import multiprocessing
except ImportError:
# Python version < 2.6, have to use fallbacks
pass
class ResultContainer(object):
"""Abstract base class for result containers."""
def add_result(self, result_data, task_index):
"""Store a result in the container."""
pass
def get_results(self):
"""Return results and reset container."""
pass
class ListResultContainer(ResultContainer):
"""Basic result container using simply a list."""
def __init__(self):
super(ListResultContainer, self).__init__()
self._results = []
def add_result(self, result, task_index):
"""Store a result in the container."""
self._results.append(result)
def get_results(self):
"""Return the list of results and reset this container.
Note that the results are stored in the order that they come in, which
can be different from the orginal task order.
"""
results = self._results
self._results = []
return results
class OrderedResultContainer(ListResultContainer):
"""Default result container with automatic restoring of the result order.
In general the order of the incoming results in the scheduler can be
different from the order of the tasks, since some tasks may finish quicker
than other tasks. This result container restores the original order.
"""
def __init__(self):
super(OrderedResultContainer, self).__init__()
def add_result(self, result, task_index):
"""Store a result in the container.
The task index is also stored and later used to reconstruct the
original task order.
"""
self._results.append((result, task_index))
def get_results(self):
"""Sort the results into the original order and return them in list."""
results = self._results
self._results = []
results.sort(key=lambda x: x[1])
return list(zip(*results))[0]
class TaskCallable(object):
"""Abstract base class for task callables.
This class encapsulates the task behavior and the related fixed data
(data which stays constant over multiple tasks).
"""
def setup_environment(self):
"""This hook method is only called when the callable is first called
in a different Python process / environment.
It can be used for modifications in the Python environment that are
required by this callable.
"""
pass
def __call__(self, data):
"""Perform the computation and return the result.
Override this method with a concrete implementation.
"""
return data
# TODO: is 'fork' really a good name?
# As an alternative one could have a separate CallableFactory class,
# but this would make things more complicated for simple callables
# (similar to why iterators implement the iterable interface).
def fork(self):
"""Return a fork of this callable, e.g. by making a copy.
This method is always called exactly once before a callable is called,
so instead of the original callable a fresh fork is called. This
ensures that the original callable is preserved when caching is used.
If the callable is not modified by the call then it can simply return
itself.
"""
return self
class SqrTestCallable(TaskCallable):
"""Callable for testing."""
def __call__(self, data):
"""Return the squared data."""
return data**2
class SleepSqrTestCallable(TaskCallable):
"""Callable for testing."""
def __call__(self, data):
"""Return the squared data[0] after sleeping for data[1] seconds."""
time.sleep(data[1])
return data[0]**2
class MDPVersionCallable(TaskCallable):
"""Callable For testing MDP version.
Should return a unique comparable object which includes version information
and installed/used modules.
"""
def __call__(self, data):
"""Ignore input data and return mdp.info()"""
import mdp
return mdp.config.info()
class TaskCallableWrapper(TaskCallable):
"""Wrapper to provide a fork method for simple callables like a function.
This wrapper is applied internally in Scheduler.
"""
def __init__(self, task_callable):
"""Store and wrap the callable."""
self._callable = task_callable
def __call__(self, data):
"""Call the internal callable with the data and return the result."""
return self._callable(data)
# helper function
def cpu_count():
"""Return the number of CPU cores."""
try:
return multiprocessing.cpu_count()
# TODO: remove except clause once we support only python >= 2.6
except NameError:
## This code part is taken from parallel python.
# Linux, Unix and MacOS
if hasattr(os, "sysconf"):
if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
# Linux & Unix
n_cpus = os.sysconf("SC_NPROCESSORS_ONLN")
if isinstance(n_cpus, int) and n_cpus > 0:
return n_cpus
else:
# OSX
return int(os.popen2("sysctl -n hw.ncpu")[1].read())
# Windows
if "NUMBER_OF_PROCESSORS" in os.environ:
n_cpus = int(os.environ["NUMBER_OF_PROCESSORS"])
if n_cpus > 0:
return n_cpus
# Default
return 1
class Scheduler(object):
"""Base class and trivial implementation for schedulers.
New tasks are added with add_task(data, callable).
get_results then returns the results (and locks if tasks are
pending).
In this simple scheduler implementation the tasks are simply executed in the
add_task method.
"""
def __init__(self, result_container=None, verbose=False):
"""Initialize the scheduler.
result_container -- Instance of ResultContainer that is used to store
the results (default is None, in which case a ListResultContainer
is used).
verbose -- If True then status messages will be printed to sys.stdout.
"""
if result_container is None:
result_container = OrderedResultContainer()
self.result_container = result_container
self.verbose = verbose
self._n_open_tasks = 0 # number of tasks that are currently running
# count the number of submitted tasks, also used for the task index
self._task_counter = 0
self._lock = threading.Lock()
self._last_callable = None # last callable is stored
# task index of the _last_callable, can be *.5 if updated between tasks
self._last_callable_index = -1.0
## public read only properties ##
@property
def task_counter(self):
"""This property counts the number of submitted tasks."""
return self._task_counter
@property
def n_open_tasks(self):
"""This property counts of submitted but unfinished tasks."""
return self._n_open_tasks
## main methods ##
def add_task(self, data, task_callable=None):
"""Add a task to be executed.
data -- Data for the task.
task_callable -- A callable, which is called with the data. If it is
None (default value) then the last provided callable is used.
If task_callable is not an instance of TaskCallable then a
TaskCallableWrapper is used.
The callable together with the data constitutes the task. This method
blocks if there are no free recources to store or process the task
(e.g. if no free worker processes are available).
"""
self._lock.acquire()
if task_callable is None:
if self._last_callable is None:
raise Exception("No task_callable specified and " +
"no previous callable available.")
self._n_open_tasks += 1
self._task_counter += 1
task_index = self.task_counter
if task_callable is None:
# use the _last_callable_index in _process_task to
# decide if a cached callable can be used
task_callable = self._last_callable
else:
if not hasattr(task_callable, "fork"):
# not a TaskCallable (probably a function), so wrap it
task_callable = TaskCallableWrapper(task_callable)
self._last_callable = task_callable
self._last_callable_index = self.task_counter
self._process_task(data, task_callable, task_index)
def set_task_callable(self, task_callable):
"""Set the callable that will be used if no task_callable is given.
Normally the callables are provided via add_task, in which case there
is no need for this method.
task_callable -- Callable that will be used unless a new task_callable
is given.
"""
self._lock.acquire()
self._last_callable = task_callable
# set _last_callable_index to half value since the callable is newer
# than the last task, but not newer than the next incoming task
self._last_callable_index = self.task_counter + 0.5
self._lock.release()
def _store_result(self, result, task_index):
"""Store a result in the internal result container.
result -- Result data
task_index -- Task index. Can be None if an error occured.
This function blocks to avoid any problems during result storage.
"""
self._lock.acquire()
self.result_container.add_result(result, task_index)
if self.verbose:
if task_index is not None:
print " finished task no. %d" % task_index
else:
print " task failed"
self._n_open_tasks -= 1
self._lock.release()
def get_results(self):
"""Get the accumulated results from the result container.
This method blocks if there are open tasks.
"""
while True:
self._lock.acquire()
if self._n_open_tasks == 0:
results = self.result_container.get_results()
self._lock.release()
return results
else:
self._lock.release()
time.sleep(1)
def shutdown(self):
"""Controlled shutdown of the scheduler.
This method should always be called when the scheduler is no longer
needed and before the program shuts down! Otherwise one might get
error messages.
"""
self._shutdown()
## Context Manager interface ##
def __enter__(self):
"""Return self."""
return self
def __exit__(self, type, value, traceback):
"""Shutdown the scheduler.
It is important that all the calculations have finished
when this is called, otherwise the shutdown might fail.
"""
self.shutdown()
## override these methods in custom schedulers ##
def _process_task(self, data, task_callable, task_index):
"""Process the task and store the result.
You can override this method for custom schedulers.
Warning: When this method is entered is has the lock, the lock must be
released here.
Warning: Note that fork has not been called yet, so the provided
task_callable must not be called. Only a forked version can be called.
"""
# IMPORTANT: always call fork, since it must be called at least once!
task_callable = task_callable.fork()
result = task_callable(data)
# release lock before store_result
self._lock.release()
self._store_result(result, task_index)
def _shutdown(self):
"""Hook method for shutdown to be used in custom schedulers."""
pass
|