This file is indexed.

/usr/lib/codeaster/asrun/unittest/160_parametric.py is in code-aster-run 1.13.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import os.path as osp
import re
import unittest
from math import log10

from common import dict_conf, execcmd, tmpdir
from data   import parametric_export, parametric_poursuite_export

import asrun
from asrun.run          import AsRunFactory
from asrun.profil       import AsterProfil
from asrun.repart       import get_hostrc
from asrun.parametric   import is_list_of_dict
from asrun.thread       import Dispatcher
from asrun.distrib      import DistribParametricTask


class TestParametric(unittest.TestCase):

    def test01_cmdline(self):
        export = osp.join(tmpdir, "parametric.export")
        open(export, "w").write(parametric_export % dict_conf)
        cmd = dict_conf["as_run"] + [export]
        iret, output = execcmd(cmd, "parametric.1", return_output=True)
        assert iret == 0
        assert len(re.findall("DIAGNOSTIC JOB *: *<A>", output)) == 1   # repe != base
        assert len(re.findall("0 err", output)) == 1
        assert osp.exists(osp.join(tmpdir, 'parametric.resu/calc_1/base/glob.1'))
        content = open(osp.join(tmpdir, 'parametric.resu/calc_1/command_0.comm'), 'r').read()
        assert content.find("INCLUDE") > -1


    def test02_cmdline_poursuite(self):
        export = osp.join(tmpdir, "parametric_poursuite.export")
        open(export, "w").write(parametric_poursuite_export % dict_conf)
        cmd = dict_conf["as_run"] + [export]
        iret, output = execcmd(cmd, "parametric.2", return_output=True)
        assert iret == 0
        assert len(re.findall("DIAGNOSTIC JOB *: *OK", output)) == 1


    def test03_using_api(self):
        from asrun.core import magic
        run = AsRunFactory()
        magic.set_stdout(osp.join(tmpdir, "parametric.3" + ".out"))
        prof = AsterProfil()
        prof['actions'] = 'distribution'
        prof['nomjob'] = 'parametric_api'
        prof['mode']    = 'interactif'
        prof['version'] = dict_conf['ASTER_VERSION']
        prof['debug']   = 'nodebug'
        user, host = run.system.getuser_host()
        prof['origine']  = 'as_run %s' % run['version']
        prof['mclient']  = host
        prof['uclient']  = user
        prof['serveur']  = host
        prof['username'] = user
        prof.args['memjeveux'] = 32
        prof.args['tpmax']     = 60
        prof['memjob'] = 65536
        prof['tpsjob'] = 1
        prof.Set('D', {
            'type' : 'comm', 'isrep' : False, 'ul' : 1, 'compr' : False,
            'path' : "%(DATA)s/study.comm" % dict_conf })
        prof.Set('D', {
            'type' : 'hostfile', 'isrep' : False, 'ul' : 0, 'compr' : False,
            'path' : "%(DATA)s/hostfile" % dict_conf })
        # result directories
        resudir  = "%(TMPDIR)s/parametric_api.resu" % dict_conf
        flashdir = "%(TMPDIR)s/parametric_api.flash" % dict_conf
        prof.Set('R', {
            'type' : 'repe', 'isrep' : True, 'ul' : 0, 'compr' : False,
            'path' : resudir })
        # add a result
        prof.Set('R', {
            'type' : 'libr', 'isrep' : False, 'ul' : 6, 'compr' : False,
            'path' : "/unused_directory_name/study.mess" })
        # get hostrc object
        hostrc = get_hostrc(run, prof)
        # timeout before rejected a job
        timeout = prof.get_timeout()
        # list of parameters
        list_val = [
            { 'P1' : 111., }, { 'P1' : 222., }, { 'P1' : 0., }
        ]
        assert is_list_of_dict(list_val)
        nbval = len(list_val)
        assert nbval == 3
        # number of threads to follow execution
        numthread = 1
        # ----- Execute calcutions in parallel using a Dispatcher object
        # elementary task...
        task = DistribParametricTask(run=run, prof=prof, # IN
                             hostrc=hostrc,
                             nbmaxitem=0, timeout=timeout,
                             resudir=resudir, flashdir=flashdir,
                             reptrav=tmpdir,
                             info=1,
                             keywords={ 'POST_CALCUL' : """print 'test USE_POST_CALCUL'""" },
                             nbnook=[0,]*numthread, exec_result=[])            # OUT
        # ... and dispatch task on 'list_val'
        etiq = 'calc_%%0%dd' % (int(log10(nbval)) + 1)
        labels = [etiq % (i+1) for i in range(nbval)]
        couples = zip(labels, list_val)
        execution = Dispatcher(couples, task, numthread=numthread)
        # expect one error
        assert task.nbnook[0] == 1, task.nbnook
        list_output = []
        for result in task.exec_result:
            calc, diag, output_filename = result[0], result[2], result[7]
            gravity = run.GetGrav(diag)
            if gravity > run.GetGrav("<A>"):
                list_output.append((calc, diag, output_filename))
            # check that output files exist
            assert osp.isfile(osp.join(resudir, calc, "study.mess"))
        assert len(list_output) == 1, list_output


if __name__ == "__main__":
    unittest.main()