/usr/lib/codeaster/asrun/unittest/110_server.py is in code-aster-run 1.13.1-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import unittest
from common import dict_conf, execcmd, tmpdir
from data import server_export
from asrun.calcul import parse_submission_result
from asrun.job import parse_actu_result, parse_tail_result
from asrun.run import AsRunFactory
from asrun.profil import AsterProfil
from asrun.calcul import AsterCalcul
class TestServerFunctions(unittest.TestCase):
def setUp(self):
self.export = os.path.join(tmpdir, "server.export")
dico = dict_conf.copy()
dico['nomjob'] = 'server_cmdline'
open(self.export, "w").write(server_export % dico)
def test01_from_cmdline(self):
# the same test from the client side is 200_client.test02_proxy
cmd = dict_conf["as_run"] + ["--serv", self.export]
iret, out = execcmd(cmd, "server.1", return_output=True)
nomjob = "server_cmdline"
assert iret == 0
jobid, queue, studyid = parse_submission_result(out)
assert jobid.strip() != ""
assert studyid.strip() != ""
time.sleep(0.2)
cmd = dict_conf["as_run"] + ["--actu", jobid, nomjob, "interactif"]
iret, out = execcmd(cmd, "server.2", return_output=True)
assert iret == 0
etat, diag, node, tcpu, wrk, queue = parse_actu_result(out)
assert etat == "RUN", etat
ftail = os.path.join(tmpdir, "server.3.tail")
cmd = dict_conf["as_run"] + ["--tail", jobid, nomjob, "interactif", ftail, "10"]
iret, out = execcmd(cmd, "server.3", return_output=True)
assert iret == 0
job, jobid_, etat, diag = parse_tail_result(out)
assert etat == "RUN", etat
etat = None
while etat not in ('ENDED', '_'):
time.sleep(4)
cmd = dict_conf["as_run"] + ["--actu", jobid, nomjob, "interactif"]
iret, out = execcmd(cmd, "server.4", return_output=True)
assert iret == 0
etat, diag, node, tcpu, wrk, queue = parse_actu_result(out)
assert etat == "ENDED", etat
assert diag == "<A>_ALARM", diag
cmd = dict_conf["as_run"] + ["--del", jobid, nomjob, "interactif"]
iret = execcmd(cmd, "server.5")
assert iret == 0
def test02_using_api(self):
run = AsRunFactory()
prof = AsterProfil(run=run, filename=self.export)
# change job name and job number
prof['nomjob'] = "server_api"
calc = AsterCalcul(run, prof=prof, pid=run.get_pid())
jret, out = calc.start()
assert jret == 0, "submission problem : %s" % out
time.sleep(0.2)
etat, diag, node, tcpu, wrk, queue = calc.get_state()
assert etat == "RUN", etat
etat, diag, s_out = calc.tail(expression='CODE_ASTER')
assert etat == "RUN", etat
calc.wait()
etat, diag, node, tcpu, wrk, queue = calc.get_state()
assert etat == "ENDED", etat
assert diag == "<A>_ALARM", diag
assert run.GetGrav(calc.diag) in (0, 1), calc.diag
assert os.path.isfile(calc.flash('output'))
calc.kill()
if __name__ == "__main__":
unittest.main()
|