This file is indexed.

/usr/lib/python2.7/dist-packages/pyqtgraph/multiprocess/parallelizer.py is in python-pyqtgraph 0.9.10-5.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import os, sys, time, multiprocessing, re
from .processes import ForkedProcess
from .remoteproxy import ClosedError

class CanceledError(Exception):
    """Raised when the progress dialog is canceled during a processing operation."""
    pass

class Parallelize(object):
    """
    Class for ultra-simple inline parallelization on multi-core CPUs
    
    Example::
    
        ## Here is the serial (single-process) task:
        
        tasks = [1, 2, 4, 8]
        results = []
        for task in tasks:
            result = processTask(task)
            results.append(result)
        print(results)
        
        
        ## Here is the parallelized version:
        
        tasks = [1, 2, 4, 8]
        results = []
        with Parallelize(tasks, workers=4, results=results) as tasker:
            for task in tasker:
                result = processTask(task)
                tasker.results.append(result)
        print(results)
        
        
    The only major caveat is that *result* in the example above must be picklable,
    since it is automatically sent via pipe back to the parent process.
    """

    def __init__(self, tasks=None, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
        """
        ===============  ===================================================================
        **Arguments:**
        tasks            list of objects to be processed (Parallelize will determine how to 
                         distribute the tasks). If unspecified, then each worker will receive
                         a single task with a unique id number.
        workers          number of worker processes or None to use number of CPUs in the 
                         system
        progressDialog   optional dict of arguments for ProgressDialog
                         to update while tasks are processed
        randomReseed     If True, each forked process will reseed its random number generator
                         to ensure independent results. Works with the built-in random
                         and numpy.random.
        kwds             objects to be shared by proxy with child processes (they will 
                         appear as attributes of the tasker)
        ===============  ===================================================================
        """
        
        ## Generate progress dialog. 
        ## Note that we want to avoid letting forked child processes play with progress dialogs..
        self.showProgress = False
        if progressDialog is not None:
            self.showProgress = True
            if isinstance(progressDialog, basestring):
                progressDialog = {'labelText': progressDialog}
            from ..widgets.ProgressDialog import ProgressDialog
            self.progressDlg = ProgressDialog(**progressDialog)
        
        if workers is None:
            workers = self.suggestedWorkerCount()
        if not hasattr(os, 'fork'):
            workers = 1
        self.workers = workers
        if tasks is None:
            tasks = range(workers)
        self.tasks = list(tasks)
        self.reseed = randomReseed
        self.kwds = kwds.copy()
        self.kwds['_taskStarted'] = self._taskStarted
        
    def __enter__(self):
        self.proc = None
        if self.workers == 1: 
            return self.runSerial()
        else:
            return self.runParallel()
    
    def __exit__(self, *exc_info):
        
        if self.proc is not None:  ## worker 
            exceptOccurred = exc_info[0] is not None ## hit an exception during processing.
                
            try:
                if exceptOccurred:
                    sys.excepthook(*exc_info)
            finally:
                #print os.getpid(), 'exit'
                os._exit(1 if exceptOccurred else 0)
                
        else:  ## parent
            if self.showProgress:
                self.progressDlg.__exit__(None, None, None)

    def runSerial(self):
        if self.showProgress:
            self.progressDlg.__enter__()
            self.progressDlg.setMaximum(len(self.tasks))
        self.progress = {os.getpid(): []}
        return Tasker(self, None, self.tasks, self.kwds)

    
    def runParallel(self):
        self.childs = []
        
        ## break up tasks into one set per worker
        workers = self.workers
        chunks = [[] for i in xrange(workers)]
        i = 0
        for i in range(len(self.tasks)):
            chunks[i%workers].append(self.tasks[i])
        
        ## fork and assign tasks to each worker
        for i in range(workers):
            proc = ForkedProcess(target=None, preProxy=self.kwds, randomReseed=self.reseed)
            if not proc.isParent:
                self.proc = proc
                return Tasker(self, proc, chunks[i], proc.forkedProxies)
            else:
                self.childs.append(proc)
        
        ## Keep track of the progress of each worker independently.
        self.progress = dict([(ch.childPid, []) for ch in self.childs])
        ## for each child process, self.progress[pid] is a list
        ## of task indexes. The last index is the task currently being
        ## processed; all others are finished.
            
            
        try:
            if self.showProgress:
                self.progressDlg.__enter__()
                self.progressDlg.setMaximum(len(self.tasks))
            ## process events from workers until all have exited.
                
            activeChilds = self.childs[:]
            self.exitCodes = []
            pollInterval = 0.01
            while len(activeChilds) > 0:
                waitingChildren = 0
                rem = []
                for ch in activeChilds:
                    try:
                        n = ch.processRequests()
                        if n > 0:
                            waitingChildren += 1
                    except ClosedError:
                        #print ch.childPid, 'process finished'
                        rem.append(ch)
                        if self.showProgress:
                            self.progressDlg += 1
                #print "remove:", [ch.childPid for ch in rem]
                for ch in rem:
                    activeChilds.remove(ch)
                    while True:
                        try:
                            pid, exitcode = os.waitpid(ch.childPid, 0)
                            self.exitCodes.append(exitcode)
                            break
                        except OSError as ex:
                            if ex.errno == 4:  ## If we get this error, just try again
                                continue
                                #print "Ignored system call interruption"
                            else:
                                raise
                    
                    #print [ch.childPid for ch in activeChilds]
                    
                if self.showProgress and self.progressDlg.wasCanceled():
                    for ch in activeChilds:
                        ch.kill()
                    raise CanceledError()
                    
                ## adjust polling interval--prefer to get exactly 1 event per poll cycle.
                if waitingChildren > 1:
                    pollInterval *= 0.7
                elif waitingChildren == 0:
                    pollInterval /= 0.7
                pollInterval = max(min(pollInterval, 0.5), 0.0005) ## but keep it within reasonable limits
                
                time.sleep(pollInterval)
        finally:
            if self.showProgress:
                self.progressDlg.__exit__(None, None, None)
        if len(self.exitCodes) < len(self.childs):
            raise Exception("Parallelizer started %d processes but only received exit codes from %d." % (len(self.childs), len(self.exitCodes)))
        for code in self.exitCodes:
            if code != 0:
                raise Exception("Error occurred in parallel-executed subprocess (console output may have more information).")
        return []  ## no tasks for parent process.
    
    
    @staticmethod
    def suggestedWorkerCount():
        if 'linux' in sys.platform:
            ## I think we can do a little better here..
            ## cpu_count does not consider that there is little extra benefit to using hyperthreaded cores.
            try:
                cores = {}
                pid = None
                
                for line in open('/proc/cpuinfo'):
                    m = re.match(r'physical id\s+:\s+(\d+)', line)
                    if m is not None:
                        pid = m.groups()[0]
                    m = re.match(r'cpu cores\s+:\s+(\d+)', line)
                    if m is not None:
                        cores[pid] = int(m.groups()[0])
                return sum(cores.values())
            except:
                return multiprocessing.cpu_count()
                
        else:
            return multiprocessing.cpu_count()
        
    def _taskStarted(self, pid, i, **kwds):
        ## called remotely by tasker to indicate it has started working on task i
        #print pid, 'reported starting task', i
        if self.showProgress:
            if len(self.progress[pid]) > 0:
                self.progressDlg += 1
            if pid == os.getpid():  ## single-worker process
                if self.progressDlg.wasCanceled():
                    raise CanceledError()
        self.progress[pid].append(i)
    
    
class Tasker(object):
    def __init__(self, parallelizer, process, tasks, kwds):
        self.proc = process
        self.par = parallelizer
        self.tasks = tasks
        for k, v in kwds.iteritems():
            setattr(self, k, v)
        
    def __iter__(self):
        ## we could fix this up such that tasks are retrieved from the parent process one at a time..
        for i, task in enumerate(self.tasks):
            self.index = i
            #print os.getpid(), 'starting task', i
            self._taskStarted(os.getpid(), i, _callSync='off')
            yield task
        if self.proc is not None:
            #print os.getpid(), 'no more tasks'
            self.proc.close()
    
    def process(self):
        """
        Process requests from parent.
        Usually it is not necessary to call this unless you would like to 
        receive messages (such as exit requests) during an iteration.
        """
        if self.proc is not None:
            self.proc.processRequests()
    
    def numWorkers(self):
        """
        Return the number of parallel workers
        """
        return self.par.workers
    
#class Parallelizer:
    #"""
    #Use::
    
        #p = Parallelizer()
        #with p(4) as i:
            #p.finish(do_work(i))
        #print p.results()
    
    #"""
    #def __init__(self):
        #pass

    #def __call__(self, n):
        #self.replies = []
        #self.conn = None  ## indicates this is the parent process
        #return Session(self, n)
            
    #def finish(self, data):
        #if self.conn is None:
            #self.replies.append((self.i, data))
        #else:
            ##print "send", self.i, data
            #self.conn.send((self.i, data))
            #os._exit(0)
            
    #def result(self):
        #print self.replies
        
#class Session:
    #def __init__(self, par, n):
        #self.par = par
        #self.n = n
        
    #def __enter__(self):
        #self.childs = []
        #for i in range(1, self.n):
            #c1, c2 = multiprocessing.Pipe()
            #pid = os.fork()
            #if pid == 0:  ## child
                #self.par.i = i
                #self.par.conn = c2
                #self.childs = None
                #c1.close()
                #return i
            #else:
                #self.childs.append(c1)
                #c2.close()
        #self.par.i = 0
        #return 0
            
        
        
    #def __exit__(self, *exc_info):
        #if exc_info[0] is not None:
            #sys.excepthook(*exc_info)
        #if self.childs is not None:
            #self.par.replies.extend([conn.recv() for conn in self.childs])
        #else:
            #self.par.finish(None)