/usr/share/pyshared/pebl/taskcontroller/xgrid.py is in python-pebl 1.0.2-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | import time
import os.path
import shutil
import tempfile
import cPickle
try:
import xg
except:
xg = False
from pebl import config, result
from pebl.taskcontroller.base import _BaseSubmittingController, DeferredResult
class XgridDeferredResult(DeferredResult):
def __init__(self, grid, task):
self.grid = grid
self.job = task.job
self.taskid = self.job.jobID
@property
def result(self):
tmpdir = tempfile.mkdtemp('pebl')
self.job.results(
stdout = os.path.join(tmpdir, 'stdout'),
stderr = os.path.join(tmpdir, 'stderr'),
outdir = tmpdir
)
self.job.delete()
rst = result.fromfile(os.path.join(tmpdir,'result.pebl'))
shutil.rmtree(tmpdir)
return rst
@property
def finished(self):
return self.job.info(update=1).get('jobStatus') in ('Finished',)
class XgridController(_BaseSubmittingController):
#
# Parameters
#
_params = (
config.StringParameter(
'xgrid.controller',
'Hostname or IP of the Xgrid controller.',
default=''
),
config.StringParameter(
'xgrid.password',
'Password for the Xgrid controller.',
default=''
),
config.StringParameter(
'xgrid.grid',
'Id of the grid to use at the Xgrid controller.',
default='0'
),
config.FloatParameter(
'xgrid.pollinterval',
'Time (in secs) to wait between polling the Xgrid controller.',
default=60.0
),
config.StringParameter(
'xgrid.peblpath',
'Full path to the pebl script on Xgrid agents.',
default='pebl'
)
)
def __init__(self, **options):
"""Create a XGridController.
Any config param for 'xgrid' can be passed in via options.
Use just the option part of the parameter name.
"""
config.setparams(self, options)
@property
def _grid(self):
if xg:
cn = xg.Connection(self.controller, self.password)
ct = xg.Controller(cn)
return ct.grid(self.grid)
return None
#
# Public interface
#
def submit(self, tasks):
grid = self._grid
drs = []
for task in tasks:
task.cwd = tempfile.mkdtemp()
cPickle.dump(task, open(os.path.join(task.cwd, 'task.pebl'), 'w'))
task.job = grid.submit(self.peblpath, 'runtask task.pebl',
indir=task.cwd)
drs.append(XgridDeferredResult(grid, task))
return drs
def retrieve(self, deferred_results):
drs = deferred_results
# poll for job results
# i'd rather select() or wait() but xgrid doesn't offer that via the
# xgrid command line app
done = []
while drs:
for i,dr in enumerate(drs):
if dr.finished:
done.append(drs.pop(i))
break # modified drs, so break and re-iterate
else:
time.sleep(self.pollinterval)
return [dr.result for dr in done]
|