This file is indexed.

/usr/lib/python2.7/dist-packages/asrun/unittest/060_thread.py is in code-aster-run 1.13.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Test native multi-threading and using Task object of asrun.
"""

import sys
import os
import time
import random
import unittest

from common import tmpdir

from asrun.run    import AsRunFactory
from asrun.thread import Task, Dispatcher


# "native"
from threading import Thread
class SleepThread(Thread):
    def __init__(self, item, maxi):
        Thread.__init__(self)
        self.item  = item
        self.maxi  = maxi
        self.delay = 0.

    def run(self):
        delay = random.uniform(0., 1.) * self.maxi
        print 'thread #%d waits for %.2f s' % (self.item, delay)
        time.sleep(delay)
        self.delay = delay


class SleepTask(Task):
    def execute(self, item, **kwargs):
        delay = random.uniform(0., 1.) * self.maxi
        print 'thread waits for %.2f s' % delay
        time.sleep(delay)
        return (delay,)

    def result(self, delay, **kwargs):
        self.wait   += delay
        self.niter += 1



mult = 2
maxi = 0.5

class TestMultithreading(unittest.TestCase):

    def setUp(self):
        run = AsRunFactory()
        self.numthread = run.GetCpuInfo("numthread")


    def test01_native(self):
        if self.numthread is None or self.numthread <= 1:
            return
        f_output = open(os.path.join(tmpdir, "thread.1.out"), "w")
        sys.stdout = f_output
        t0 = time.time()
        l_thread = []
        for i in range(self.numthread * mult):
            th = SleepThread(i, maxi)
            l_thread.append(th)
            th.start()
        wait = 0.
        niter = 0
        for th in l_thread:
            th.join()
            wait += th.delay
            niter += 1
        dt = time.time() - t0
        print 'wallclock time = %.2f s' % dt
        print 'total wait time = %.2f s (%d iter)' % (wait, niter)
        f_output.close()
        sys.stdout = sys.__stdout__
        assert dt < wait
        assert self.numthread * mult == niter


    def test02_task(self):
        if self.numthread is None or self.numthread <= 1:
            return
        f_output = open(os.path.join(tmpdir, "thread.2.out"), "w")
        sys.stdout = f_output
        t0 = time.time()
        task = SleepTask(maxi=maxi,                             # IN
                       niter=0, wait=0.)                      # OUT
        sleep = Dispatcher(range(self.numthread * mult), task, self.numthread)
        dt = time.time() - t0
        print sleep.report()
        print 'wallclock time = %.2f s' % dt
        print 'total wait time = %.2f s (%d iter)' % (task.wait, task.niter)
        f_output.close()
        sys.stdout = sys.__stdout__
        assert dt < task.wait
        assert self.numthread * mult == task.niter



if __name__ == "__main__":
    unittest.main()