This file is indexed.

/usr/lib/codeaster/asrun/unittest/110_server.py is in code-aster-run 1.13.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import time
import unittest

from common import dict_conf, execcmd, tmpdir
from data   import server_export

from asrun.calcul import parse_submission_result
from asrun.job    import parse_actu_result, parse_tail_result
from asrun.run    import AsRunFactory
from asrun.profil import AsterProfil
from asrun.calcul import AsterCalcul



class TestServerFunctions(unittest.TestCase):

    def setUp(self):
        self.export = os.path.join(tmpdir, "server.export")
        dico = dict_conf.copy()
        dico['nomjob'] = 'server_cmdline'
        open(self.export, "w").write(server_export % dico)


    def test01_from_cmdline(self):
        # the same test from the client side is 200_client.test02_proxy
        cmd = dict_conf["as_run"] + ["--serv", self.export]
        iret, out = execcmd(cmd, "server.1", return_output=True)
        nomjob = "server_cmdline"
        assert iret == 0
        jobid, queue, studyid = parse_submission_result(out)
        assert jobid.strip() != ""
        assert studyid.strip() != ""

        time.sleep(0.2)
        cmd = dict_conf["as_run"] + ["--actu", jobid, nomjob, "interactif"]
        iret, out = execcmd(cmd, "server.2", return_output=True)
        assert iret == 0
        etat, diag, node, tcpu, wrk, queue = parse_actu_result(out)
        assert etat == "RUN", etat

        ftail = os.path.join(tmpdir, "server.3.tail")
        cmd = dict_conf["as_run"] + ["--tail", jobid, nomjob, "interactif", ftail, "10"]
        iret, out = execcmd(cmd, "server.3", return_output=True)
        assert iret == 0
        job, jobid_, etat, diag = parse_tail_result(out)
        assert etat == "RUN", etat

        etat = None
        while etat not in ('ENDED', '_'):
            time.sleep(4)
            cmd = dict_conf["as_run"] + ["--actu", jobid, nomjob, "interactif"]
            iret, out = execcmd(cmd, "server.4", return_output=True)
            assert iret == 0
            etat, diag, node, tcpu, wrk, queue = parse_actu_result(out)
        assert etat == "ENDED", etat
        assert diag == "<A>_ALARM", diag

        cmd = dict_conf["as_run"] + ["--del", jobid, nomjob, "interactif"]
        iret = execcmd(cmd, "server.5")
        assert iret == 0


    def test02_using_api(self):
        run = AsRunFactory()
        prof = AsterProfil(run=run, filename=self.export)
        # change job name and job number
        prof['nomjob'] = "server_api"
        calc = AsterCalcul(run, prof=prof, pid=run.get_pid())
        jret, out = calc.start()
        assert jret == 0, "submission problem : %s" % out

        time.sleep(0.2)
        etat, diag, node, tcpu, wrk, queue = calc.get_state()
        assert etat == "RUN", etat

        etat, diag, s_out = calc.tail(expression='CODE_ASTER')
        assert etat == "RUN", etat

        calc.wait()

        etat, diag, node, tcpu, wrk, queue = calc.get_state()
        assert etat == "ENDED", etat
        assert diag == "<A>_ALARM", diag
        assert run.GetGrav(calc.diag) in (0, 1), calc.diag
        assert os.path.isfile(calc.flash('output'))
        calc.kill()



if __name__ == "__main__":
    unittest.main()