/usr/share/pyshared/Scientific/DistributedComputing/MasterSlave.py is in python-scientific 2.8-2build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 | #
# Master-slave process manager for distributed computing
# based on Pyro
#
# Written by Konrad Hinsen <hinsen@cnrs-orleans.fr>
# last revision: 2008-8-18
#
"""
Distributed computing using a master-slave model
The classes in this module provide a simple way to parallelize independent
computations in a program. The communication is handled by the Pyro package,
which must be installed before this module can be used.
Pyro can be obtained from http://pyro.sourceforge.net/.
By default, the Pyro name server is used to initialize communication. Please
read the Pyro documentation for learning how to use the name server.
The principle of the master-slave model is that there is a single master
process that defines computational tasks and any number of slave processes
that execute these tasks. The master defines task requests and then waits
for the results to come in. The slaves wait for a task request, execute it,
return the result, and wait for the next task. There can be any number of
slave processes, which can be started and terminated independently, the
only condition being that no slave process can be started before its master
process. This setup makes it possible to perform a lengthy computation using
a variable number of processors.
Communication between the master and the slave processes passes through
a TaskManager object that is created automatically as part of the master
process. The task manager stores and hands out task requests and results.
The task manager also keeps track of the slave processes. When a slave process
disappears (because it was killed or because of a hardware failure), the
task manager re-schedules its active task(s) to another slave process. This
makes the master-slave system very fault tolerant.
Each task manager has a label that makes it possible to distinguish between
several master-slave groups running at the same time. It is by the label
that slave processes identify the master process for which they work.
The script "task_manager" prints statistics about a currently active task
manager; it takes the label as an argument. It shows the number of currently
active processes (master plus slaves), the number of waiting and running
tasks, and the number of results waiting to be picked up.
The script Examples/master_slave_demo.py illustrates the use of the
master-slave setup in a simple script. Both master and slave processes
are defined in the same script. The scripts Examples/master.py and
Examples/slave.py show a master-slave setup using two distinct scripts.
This is more flexible because task requests and result retrievals
can be made from anywhere in the master code.
"""
from Scientific.DistributedComputing.TaskManager import \
TaskManager, TaskManagerTermination, TaskRaisedException
import Pyro.core
import Pyro.naming
import Pyro.errors
import threading
import time
import copy
debug = False
class MasterProcess(object):
"""
Master process in a master-slave setup
A master process in a program is implemented by subclassing
this class and overriding the method "run", which calls the methods
"requestTask" and "retrieveResult". The process is then
launched by calling the method "start".
"""
def __init__(self, label, use_name_server=True):
"""
@param label: the label that identifies the task manager
@type label: C{str}
@param use_name_server: If C{True} (default), the task manager is
registered with the Pyro name server. If
C{False}, the name server is not used and
slave processes need to know the host
on which the master process is running.
@type use_name_server: C{bool}
"""
self.label = label
self.task_manager = TaskManager()
self.process_id = \
self.task_manager.registerProcess(info = getMachineInfo())
Pyro.core.initServer(banner=False)
self.pyro_ns = None
if use_name_server:
self.pyro_ns=Pyro.naming.NameServerLocator().getNS()
self.manager_thread = threading.Thread(target = self.taskManagerThread)
self.manager_thread.start()
self.global_states = {}
def taskManagerThread(self):
"""
This method represents the code that is executed in a background
thread for remote access to the task manager.
"""
self.pyro_daemon=Pyro.core.Daemon()
if self.pyro_ns is not None:
# Make another name server proxy for this thread
pyro_ns=Pyro.naming.NameServerLocator().getNS()
self.pyro_daemon.useNameServer(pyro_ns)
try:
pyro_ns.createGroup("TaskManager")
except Pyro.errors.NamingError:
pass
uri = self.pyro_daemon.connect(self.task_manager,
"TaskManager.%s" % self.label)
try:
self.pyro_daemon.requestLoop()
finally:
self.pyro_daemon.shutdown(True)
if self.pyro_ns is not None:
try:
pyro_ns.unregister("TaskManager.%s" % self.label)
except Pyro.errors.NamingError:
pass
def requestTask(self, tag, *parameters):
"""
Launches a task request. The task will be executed by a slave
process in a method called "do_"+tag that is called with the
parameters given in the task request. Note that the order of
task executions is not defined.
@param tag: a tag identifying the computational task. It corresponds
to the name of a method in the slave process.
@type tag: C{str}
@param parameters: the parameters passed to the corresponding method
in the slave process. The only restriction on their
types is that all parameters must be picklable.
@return: a unique task id
@rtype: C{str}
"""
return self.task_manager.addTaskRequest(tag, parameters)
def retrieveResult(self, tag=None):
"""
@param tag: a tag identifying the computational task from which a
return value is requested. If C{None}, results from
any task will be accepted.
@type tag: C{str}
@return: a tuple containing three values: the task id to which the
result corresponds, the tag of the computational task,
and the result returned by the slave method that handled
the task
@rtype: C{tuple}
@raises TaskRaisedException: if the slave method raised an exception
"""
try:
if tag is None:
return self.task_manager.getAnyResult()
else:
task_id, result = self.task_manager.getResultWithTag(tag)
return task_id, tag, result
except TaskManagerTermination:
return None, None, None
def setGlobalState(self, **kw):
state_id = min(self.global_states.keys() + [0]) + 1
self.global_states[state_id] = kw.keys()
for name, value in kw.items():
label = "state_%d_%s" % (state_id, name)
if debug:
print "Storing state value ", label
self.task_manager.storeData(**{label: value})
return state_id
def deleteGlobalState(self, state_id):
for name in self.global_states[state_id]:
label = "state_%d_%s" % (state_id, name)
if debug:
print "Deleting state value ", label
self.task_manager.deleteData(label)
def start(self):
"""
Starts the master process.
"""
try:
self.run()
finally:
self.shutdown()
def shutdown(self):
self.task_manager.terminate()
while self.task_manager.numberOfActiveProcesses() > 1:
time.sleep(0.1)
self.pyro_daemon.shutdown()
self.manager_thread.join()
def run(self):
"""
The main routine of the master process. This method must be
overridden in subclasses.
"""
raise NotImplementedError
def launchSlaveJobs(self, n=1):
"""
Launch n slave jobs on the machine that also runs the master job.
@param n: the number of slave jobs to be launched.
@type n: C{int}
"""
import subprocess, sys
slave_script = ('label="%s"\n' % self.label) + '''
import Pyro.core
import Pyro.errors
import sys
Pyro.core.initClient(banner=False)
while True:
try:
task_manager = \
Pyro.core.getProxyForURI("PYROLOC://localhost/TaskManager.%s"
% label)
break
except Pyro.errors.NamingError:
continue
try:
slave_code = task_manager.retrieveData("slave_code")
except KeyError:
print "No slave code available for %s" % label
raise SystemExit
namespace = {}
sys.modules["__main__"].SLAVE_PROCESS_LABEL = label
sys.modules["__main__"].SLAVE_NAMESPACE = namespace
exec slave_code in namespace
'''
directory = self.task_manager.retrieveData("cwd")
for i in range(n):
process = subprocess.Popen([sys.executable],
stdin=subprocess.PIPE,
cwd=directory)
process.stdin.write(slave_script)
process.stdin.close()
class SlaveProcess(object):
"""
Slave process in a master-slave setup
A concrete slave process in a program is implemented by subclassing
this class and adding the methods that handle the computational
tasks. Such a method has the name "do_" followed by the tag of the
computational task. The process is then launched by calling the
method "start".
"""
def __init__(self, label, master_host=None, watchdog_period=120.):
"""
@param label: the label that identifies the task manager
@type label: C{str}
@param master_host: If C{None} (default), the task manager of the
master process is located using the Pyro name
server. If no name server is used, this parameter
must be the hostname of the machine on which the
master process runs, plus the port number if it
is different from the default (7766).
@type master_host: C{str} or C{NoneType}
@param watchdog_period: the interval (in seconds) at which the
slave process sends messages to the
manager to signal that it is still alive.
If None, no messages are sent at all. In that
case, the manager cannot recognize if the slave
job has crashed or been killed.
@type watchdog_period: C{int} or C{NoneType}
"""
Pyro.core.initClient(banner=False)
if master_host is None:
self.task_manager = \
Pyro.core.getProxyForURI("PYRONAME://TaskManager.%s" % label)
else:
# URI defaults to "PYROLOC://localhost:7766/"
uri = "PYROLOC://%s/TaskManager.%s" % (master_host, label)
self.task_manager = Pyro.core.getProxyForURI(uri)
self.watchdog_period = watchdog_period
self.done = False
self.global_state_cache = {}
# Compile a dictionary of methods that implement tasks
import inspect
self.task_methods = {}
if debug:
print "Scanning task handler methods..."
for name, value in inspect.getmembers(self, inspect.isroutine):
if name[:3] == "do_":
self.task_methods[name] = value
if debug:
print " found handler for task ", name[:3]
if debug:
print len(self.task_methods), "task handlers found in class"
print "If the slave is defined by a script or module, it is"
print "normal that none have been found!"
def watchdogThread(self):
"""
This method is run in a separate thread that pings the master process
regularly to signal that it is still alive.
"""
task_manager = copy.copy(self.task_manager)
while True:
task_manager.ping(self.process_id)
if self.done:
break
time.sleep(self.watchdog_period)
def processParameter(self, parameter):
if isinstance(parameter, GlobalStateValue):
try:
if debug:
print "Returning state value", parameter.label
return self.global_state_cache[parameter.label]
except KeyError:
if debug:
print "Retrieving state value", parameter.label
self.global_state_cache[parameter.label] = \
self.task_manager.retrieveData(parameter.label)
return self.global_state_cache[parameter.label]
else:
return parameter
def start(self, namespace=None):
"""
Starts the slave process.
"""
if debug:
print "Starting slave process"
if namespace is None:
namespace = self.task_methods
self.process_id = \
self.task_manager.registerProcess(self.watchdog_period,
info = getMachineInfo())
if self.watchdog_period is not None:
self.background_thread = \
threading.Thread(target=self.watchdogThread)
self.background_thread.setDaemon(True)
self.background_thread.start()
# The slave process main loop
while True:
# Should we terminate for whatever reason?
if self.terminationTest():
break
# Get a task
try:
task_id, tag, parameters = \
self.task_manager.getAnyTask(self.process_id)
if debug:
print "Got task", task_id, "of type", tag
except TaskManagerTermination:
break
# Find the method to call
try:
method = namespace["do_%s" % tag]
except KeyError:
if debug:
print "No suitable handler was found, returning task."
self.task_manager.returnTask(task_id)
continue
# Replace GlobalStateValue objects by the associated values
parameters = tuple(self.processParameter(p) for p in parameters)
# Call the method
try:
if debug:
print "Executing task handler..."
result = method(*parameters)
if debug:
print "...done."
except KeyboardInterrupt:
if debug:
print "Keyboard interrupt"
self.task_manager.returnTask(task_id)
self.task_manager.unregisterProcess(self.process_id)
raise
except Exception, e:
if debug:
print "Exception:"
traceback.print_exc()
import traceback, StringIO
tb_text = StringIO.StringIO()
traceback.print_exc(None, tb_text)
tb_text = tb_text.getvalue()
self.task_manager.storeException(task_id, e, tb_text)
else:
if debug:
print "Storing result..."
self.task_manager.storeResult(task_id, result)
if debug:
print "...done."
self.task_manager.unregisterProcess(self.process_id)
self.done = True
# Subclasses can redefine this method
def terminationTest(self):
return False
def getMachineInfo():
import os
sysname, nodename, release, version, machine = os.uname()
pid = os.getpid()
return "PID %d on %s (%s)" % (pid, nodename, machine)
class GlobalStateValue(object):
def __init__(self, state_id, name):
self.label = "state_%d_%s" % (state_id, name)
#
# Job handling utility
#
def runJob(label, master_class, slave_class, watchdog_period=120.,
launch_slaves=0):
"""
Creates an instance of the master_class and runs it. A copy
of the script and the current working directory are stored in the
TaskManager object to enable the task_manager script to launch
slave processes.
@param label: the label that identifies the task manager
@type label: C{str}
@param master_class: the class implementing the master process
(a subclass of L{MasterProcess})
@param slave_class: the class implementing the slave process
(a subclass of L{SlaveProcess})
@param watchdog_period: the interval (in seconds) at which the
slave process sends messages to the
manager to signal that it is still alive.
If None, no messages are sent at all. In that
case, the manager cannot recognize if the slave
job has crashed or been killed.
@type watchdog_period: C{int} or C{NoneType}
@param launch_slaves: the number of slaves jobs to launch
immediately on the same machine that runs
the master process
@type launch_slaves: C{int}
"""
import inspect
import os
import sys
main_module = sys.modules['__main__']
try:
slave_label = main_module.SLAVE_PROCESS_LABEL
master = label != slave_label
except AttributeError:
master = True
if master:
filename = inspect.getsourcefile(main_module)
source = file(filename).read()
process = master_class(label)
process.task_manager.storeData(slave_code = source,
cwd = os.getcwd())
if launch_slaves > 0:
process.launchSlaveJobs(launch_slaves)
process.start()
else:
slave_class(label, watchdog_period=watchdog_period).start()
#
# Alternate interface for multi-module programs
#
def initializeMasterProcess(label, slave_script=None, slave_module=None,
use_name_server=True):
"""
Initializes a master process.
@param label: the label that identifies the task manager
@type label: C{str}
@param slave_script: the file name of the script that defines
the corresponding slave process
@type slave_script: C{str}
@param slave_module: the name of the module that defines
the corresponding slave process
@type slave_module: C{str}
@param use_name_server: If C{True} (default), the task manager is
registered with the Pyro name server. If
C{False}, the name server is not used and
slave processes need to know the host
on which the master process is running.
@type use_name_server: C{bool}
@returns: a process object on which the methods requestTask()
and retrieveResult() can be called.
@rtype: L{MasterProcess}
"""
import atexit
import os
process = MasterProcess(label, use_name_server)
atexit.register(process.shutdown)
if slave_script is not None or slave_module is not None:
if slave_script is not None:
source = file(slave_script).read()
else:
source = """
import Scientific.DistributedComputing.MasterSlave
from %s import *
""" % slave_module
if debug:
source += "print 'Slave definitions:'\n"
source += "print dir()\n"
source += "Scientific.DistributedComputing.MasterSlave.debug=True\n"
source += """
Scientific.DistributedComputing.MasterSlave.startSlaveProcess()
"""
process.task_manager.storeData(slave_code = source,
cwd = os.getcwd())
if debug:
print "Slave source code:"
print 50*'-'
print source
print 50*'-'
return process
def startSlaveProcess(label=None, master_host=None):
"""
Starts a slave process. Must be called at the end of a script
that defines or imports all task handlers.
@param label: the label that identifies the task manager. May be
omitted if the slave process is started through
the task_manager script.
@type label: C{str} or C{NoneType}
@param master_host: If C{None} (default), the task manager of the
master process is located using the Pyro name
server. If no name server is used, this parameter
must be the hostname of the machine on which the
master process runs, plus the port number if it
is different from the default (7766).
@type master_host: C{str} or C{NoneType}
"""
import sys
main_module = sys.modules['__main__']
if label is None:
label = main_module.SLAVE_PROCESS_LABEL
namespace = main_module.SLAVE_NAMESPACE
else:
namespace = main_module.__dict__
if debug:
print "Initializing slave process", label
process = SlaveProcess(label, master_host=None)
process.start(namespace)
|