/usr/share/pyshared/pebl/taskcontroller/multiprocess.py is in python-pebl 1.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | """Module providing a taskcontroller than runs tasks over multiple processes."""
import os, os.path
import cPickle
import thread, time
import shutil
import tempfile
from copy import copy
from pebl import config, result
from pebl.taskcontroller.base import _BaseController
PEBL = "pebl"
class MultiProcessController(_BaseController):
#
# Parameters
#
_params = (
config.IntParameter(
'multiprocess.poolsize',
'Number of processes to run concurrently (0 means no limit)',
default=0
)
)
def __init__(self, poolsize=None):
"""Creates a task controller that runs taks on multiple processes.
This task controller uses a pool of processes rather than spawning all
processes concurrently. poolsize is the size of this pool and by
default it is big enough to run all processes concurrently.
"""
self.poolsize = poolsize or config.get('multiprocess.poolsize')
def run(self, tasks):
"""Run tasks by creating multiple processes.
If poolsize was specified when creating this controller, additional
tasks will be queued.
"""
tasks = copy(tasks) # because we do tasks.pop() below..
numtasks = len(tasks)
poolsize = self.poolsize or numtasks
running = {}
done = []
opjoin = os.path.join
while len(done) < numtasks:
# submit tasks (if below poolsize and tasks remain)
for i in xrange(min(poolsize-len(running), len(tasks))):
task = tasks.pop()
task.cwd = tempfile.mkdtemp()
cPickle.dump(task, open(opjoin(task.cwd, 'task.pebl'), 'w'))
pid = os.spawnlp(os.P_NOWAIT, PEBL, PEBL, "runtask",
opjoin(task.cwd, "task.pebl"))
running[pid] = task
# wait for any child process to finish
pid,status = os.wait()
done.append(running.pop(pid, None))
results = [result.fromfile(opjoin(t.cwd, 'result.pebl')) for t in done]
# to make the results look like deferred results
for r in results:
r.taskid = 0
# clean up
for t in done:
shutil.rmtree(t.cwd)
return results
|