This file is indexed.

/usr/share/pyshared/pebl/taskcontroller/multiprocess.py is in python-pebl 1.0.2-2build1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""Module providing a taskcontroller than runs tasks over multiple processes."""

import os, os.path
import cPickle
import thread, time
import shutil 
import tempfile
from copy import copy

from pebl import config, result
from pebl.taskcontroller.base import _BaseController

PEBL = "pebl"

class MultiProcessController(_BaseController):
    #
    # Parameters
    # 
    _params = (
            config.IntParameter(
            'multiprocess.poolsize',
            'Number of processes to run concurrently (0 means no limit)',
            default=0
        )
    )
        
    def __init__(self, poolsize=None):
        """Creates a task controller that runs taks on multiple processes.

        This task controller uses a pool of processes rather than spawning all
        processes concurrently. poolsize is the size of this pool and by
        default it is big enough to run all processes concurrently.

        """
        self.poolsize = poolsize or config.get('multiprocess.poolsize')

    def run(self, tasks):
        """Run tasks by creating multiple processes.

        If poolsize was specified when creating this controller, additional
        tasks will be queued.

        """
        tasks = copy(tasks) # because we do tasks.pop() below..
        numtasks = len(tasks)
        poolsize = self.poolsize or numtasks
        running = {}
        done = []
        opjoin = os.path.join

        while len(done) < numtasks:
            # submit tasks (if below poolsize and tasks remain)
            for i in xrange(min(poolsize-len(running), len(tasks))):
                task = tasks.pop()
                task.cwd = tempfile.mkdtemp()
                cPickle.dump(task, open(opjoin(task.cwd, 'task.pebl'), 'w'))
                pid = os.spawnlp(os.P_NOWAIT, PEBL, PEBL, "runtask", 
                                 opjoin(task.cwd, "task.pebl"))
                running[pid] = task
            
            # wait for any child process to finish
            pid,status = os.wait() 
            done.append(running.pop(pid, None))

        results = [result.fromfile(opjoin(t.cwd, 'result.pebl')) for t in done]

        # to make the results look like deferred results
        for r in results:
            r.taskid = 0
        
        # clean up 
        for t in done:
            shutil.rmtree(t.cwd)

        return results