/usr/lib/codeaster/asrun/unittest/160_parametric.py is in code-aster-run 1.13.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os.path as osp
import re
import unittest
from math import log10
from common import dict_conf, execcmd, tmpdir
from data import parametric_export, parametric_poursuite_export
import asrun
from asrun.run import AsRunFactory
from asrun.profil import AsterProfil
from asrun.repart import get_hostrc
from asrun.parametric import is_list_of_dict
from asrun.thread import Dispatcher
from asrun.distrib import DistribParametricTask
class TestParametric(unittest.TestCase):
def test01_cmdline(self):
export = osp.join(tmpdir, "parametric.export")
open(export, "w").write(parametric_export % dict_conf)
cmd = dict_conf["as_run"] + [export]
iret, output = execcmd(cmd, "parametric.1", return_output=True)
assert iret == 0
assert len(re.findall("DIAGNOSTIC JOB *: *<A>", output)) == 1 # repe != base
assert len(re.findall("0 err", output)) == 1
assert osp.exists(osp.join(tmpdir, 'parametric.resu/calc_1/base/glob.1'))
content = open(osp.join(tmpdir, 'parametric.resu/calc_1/command_0.comm'), 'r').read()
assert content.find("INCLUDE") > -1
def test02_cmdline_poursuite(self):
export = osp.join(tmpdir, "parametric_poursuite.export")
open(export, "w").write(parametric_poursuite_export % dict_conf)
cmd = dict_conf["as_run"] + [export]
iret, output = execcmd(cmd, "parametric.2", return_output=True)
assert iret == 0
assert len(re.findall("DIAGNOSTIC JOB *: *OK", output)) == 1
def test03_using_api(self):
from asrun.core import magic
run = AsRunFactory()
magic.set_stdout(osp.join(tmpdir, "parametric.3" + ".out"))
prof = AsterProfil()
prof['actions'] = 'distribution'
prof['nomjob'] = 'parametric_api'
prof['mode'] = 'interactif'
prof['version'] = dict_conf['ASTER_VERSION']
prof['debug'] = 'nodebug'
user, host = run.system.getuser_host()
prof['origine'] = 'as_run %s' % run['version']
prof['mclient'] = host
prof['uclient'] = user
prof['serveur'] = host
prof['username'] = user
prof.args['memjeveux'] = 32
prof.args['tpmax'] = 60
prof['memjob'] = 65536
prof['tpsjob'] = 1
prof.Set('D', {
'type' : 'comm', 'isrep' : False, 'ul' : 1, 'compr' : False,
'path' : "%(DATA)s/study.comm" % dict_conf })
prof.Set('D', {
'type' : 'hostfile', 'isrep' : False, 'ul' : 0, 'compr' : False,
'path' : "%(DATA)s/hostfile" % dict_conf })
# result directories
resudir = "%(TMPDIR)s/parametric_api.resu" % dict_conf
flashdir = "%(TMPDIR)s/parametric_api.flash" % dict_conf
prof.Set('R', {
'type' : 'repe', 'isrep' : True, 'ul' : 0, 'compr' : False,
'path' : resudir })
# add a result
prof.Set('R', {
'type' : 'libr', 'isrep' : False, 'ul' : 6, 'compr' : False,
'path' : "/unused_directory_name/study.mess" })
# get hostrc object
hostrc = get_hostrc(run, prof)
# timeout before rejected a job
timeout = prof.get_timeout()
# list of parameters
list_val = [
{ 'P1' : 111., }, { 'P1' : 222., }, { 'P1' : 0., }
]
assert is_list_of_dict(list_val)
nbval = len(list_val)
assert nbval == 3
# number of threads to follow execution
numthread = 1
# ----- Execute calcutions in parallel using a Dispatcher object
# elementary task...
task = DistribParametricTask(run=run, prof=prof, # IN
hostrc=hostrc,
nbmaxitem=0, timeout=timeout,
resudir=resudir, flashdir=flashdir,
reptrav=tmpdir,
info=1,
keywords={ 'POST_CALCUL' : """print 'test USE_POST_CALCUL'""" },
nbnook=[0,]*numthread, exec_result=[]) # OUT
# ... and dispatch task on 'list_val'
etiq = 'calc_%%0%dd' % (int(log10(nbval)) + 1)
labels = [etiq % (i+1) for i in range(nbval)]
couples = zip(labels, list_val)
execution = Dispatcher(couples, task, numthread=numthread)
# expect one error
assert task.nbnook[0] == 1, task.nbnook
list_output = []
for result in task.exec_result:
calc, diag, output_filename = result[0], result[2], result[7]
gravity = run.GetGrav(diag)
if gravity > run.GetGrav("<A>"):
list_output.append((calc, diag, output_filename))
# check that output files exist
assert osp.isfile(osp.join(resudir, calc, "study.mess"))
assert len(list_output) == 1, list_output
if __name__ == "__main__":
unittest.main()
|