This file is indexed.

/usr/share/pyshared/pebl/taskcontroller/xgrid.py is in python-pebl 1.0.2-2build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import time
import os.path
import shutil 
import tempfile
import cPickle

try:
    import xg
except:
    xg = False
    
from pebl import config, result
from pebl.taskcontroller.base import _BaseSubmittingController, DeferredResult

class XgridDeferredResult(DeferredResult):
    def __init__(self, grid, task):
        self.grid = grid
        self.job = task.job
        self.taskid = self.job.jobID

    @property
    def result(self):
        tmpdir = tempfile.mkdtemp('pebl')
        self.job.results(
            stdout = os.path.join(tmpdir, 'stdout'),
            stderr = os.path.join(tmpdir, 'stderr'),
            outdir = tmpdir
        )
        self.job.delete()
        rst = result.fromfile(os.path.join(tmpdir,'result.pebl'))
        shutil.rmtree(tmpdir)  

        return rst
 
    @property
    def finished(self):
        return self.job.info(update=1).get('jobStatus') in ('Finished',)


class XgridController(_BaseSubmittingController):
    #
    # Parameters
    #
    _params = (
        config.StringParameter(
            'xgrid.controller',
            'Hostname or IP of the Xgrid controller.',
            default=''
        ),
        config.StringParameter(
            'xgrid.password',
            'Password for the Xgrid controller.',
            default=''
        ),
        config.StringParameter(
            'xgrid.grid',
            'Id of the grid to use at the Xgrid controller.',
            default='0'
        ),
        config.FloatParameter(
            'xgrid.pollinterval',
            'Time (in secs) to wait between polling the Xgrid controller.',
            default=60.0
        ),
        config.StringParameter(
            'xgrid.peblpath',
            'Full path to the pebl script on Xgrid agents.',
            default='pebl'
        )
    )

    def __init__(self, **options):
        """Create a XGridController.

        Any config param for 'xgrid' can be passed in via options.
        Use just the option part of the parameter name.
        
        """
        config.setparams(self, options)

    @property
    def _grid(self):
        if xg:
            cn = xg.Connection(self.controller, self.password)
            ct = xg.Controller(cn)
            return ct.grid(self.grid)

        return None

    #
    # Public interface
    #
    def submit(self, tasks):
        grid = self._grid

        drs = []
        for task in tasks:
            task.cwd = tempfile.mkdtemp()
            cPickle.dump(task, open(os.path.join(task.cwd, 'task.pebl'), 'w'))
            task.job = grid.submit(self.peblpath, 'runtask task.pebl', 
                                   indir=task.cwd)
            drs.append(XgridDeferredResult(grid, task))
        return drs
   
    def retrieve(self, deferred_results):
        drs = deferred_results

        # poll for job results
        # i'd rather select() or wait() but xgrid doesn't offer that via the
        # xgrid command line app
        done = []
        while drs:
            for i,dr in enumerate(drs):
                if dr.finished:
                    done.append(drs.pop(i))
                    break  # modified drs, so break and re-iterate
            else:
                time.sleep(self.pollinterval)

        return [dr.result for dr in done]