This file is indexed.

/usr/lib/codeaster/asrun/unittest/010_misc.py is in code-aster-run 1.13.1-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# -*- coding: utf-8 -*-

import sys
import os
import os.path as osp
import re
import unittest
import socket
import time

from common import dict_conf, execcmd, tmpdir, on_dev_machine
from data import available_hosts

from asrun.installation import aster_root, confdir
from asrun.common.utils import Singleton, check_joker, find_command
from asrun.parametric   import get_distribution_data, is_list_of_dict
from asrun.parser       import get_option_value
from asrun.common.sysutils import is_localhost, is_localdisplay
from asrun.common.rcfile import read_rcfile
from asrun.system       import local_host


class TestMisc(unittest.TestCase):

    def test00_init_hosts(self):
        assert len(available_hosts) > 0, "all tests which need a remote host will be skipped."

    def test01_as_run_info(self):
        cmd = dict_conf["as_run"] + ["--info"]
        iret, output = execcmd(cmd, "misc.1", return_output=True)
        assert iret == 0
        assert output.find("@SERV_VERS@") > -1
        assert output.find("@FINSERV_VERS@") > -1
        assert output.find("@PARAM@") > -1
        assert output.find("@FINPARAM@") > -1


    def test02_deprecation(self):
        ftmp = os.path.join(tmpdir, "misc.2" + ".err")
        f_error = open(ftmp, "w")
        sys.stderr = f_error
        # search old ASTK_SERV
        old_astk_root = aster_root
        if os.system('hg id > %s 2>&1' % os.devnull) == 0 and on_dev_machine():
            old_astk_root = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, "configuration"))
        sys.path.append(os.path.join(old_astk_root, "ASTK", "ASTK_SERV", "lib"))

        nbdepr = 0
        # must print DeprecationWarnings
        nbdepr += 7
        try:
            from as_run    import AsRunFactory
        except ImportError:
            # old "ASTK_SERV/lib" path not in PYTHONPATH, can not be tested
            return
        from as_profil import AsterProfil
        from as_etude  import SetParser
        from as_list   import FILTER
        from as_param  import Parametric
        from astk_maintenance import AstkUpdate
        from as_exec   import RunAster
        # must print a DeprecationWarning
        nbdepr += 1
        run = AsRunFactory('/opt/aster/ASTK/ASTK_SERV', verbose=False)

        # changes 1.7.* > 1.8.0
        nbdepr += 1
        from asrun.utils import read_rcfile
        # changes 1.8.0 > 1.8.1
        from asrun.utils import find_command, search_enclosed, get_timeout
        from asrun.common_func  import get_hostrc

        # changes 1.9.1 > 1.9.2
        nbdepr += 3
        from asrun.client import CLIENT
        client = CLIENT(rcdir='/tmp/unittest.rcdir', run=1)
        client.rcfile('config')
        # only for AsterProfil, AsterRun are instanciated.
        nbdepr += 5
        from asrun.build import ASTER_BUILD
        from asrun.config import ASTER_CONFIG
        from asrun.calcul import ASTER_CALCUL, ASTER_PARAMETRIC, ASTER_TESTCASE
        from asrun.profil import ASTER_PROFIL
        from asrun.run import ASTER_RUN
        from asrun.system import ASTER_SYSTEM, ASTER_SYSTEM_MINIMAL
        from asrun.timer import ASTER_TIMER
        _ = ASTER_PROFIL()
        _ = ASTER_RUN()
        _ = ASTER_SYSTEM(run)
        _ = ASTER_SYSTEM_MINIMAL()
        _ = ASTER_TIMER()

        f_error.close()
        sys.stderr = sys.__stderr__
        content_error = open(os.path.join(tmpdir, "misc.2" + ".err"), "r").read()
        chk = len(re.findall("DeprecationWarning", content_error))
        assert chk == nbdepr, '%d != %d' % (chk, nbdepr)

    def test03_singleton(self):
        class Test(Singleton):
            pass
        class Test2(Test):
            pass
        objA = Test()
        objB = Test()
        objC = Test2()
        objD = Test2()
        assert objA is objB
        assert objC is objD
        assert objA is not objC

    def test04_parametric_data(self):
        # unittest
        content = """VALE=( _F(P1=1., P2=0.3),     _F(P1=2., P2=0.4),)"""
        dicval = get_distribution_data(content)
        assert is_list_of_dict(dicval['VALE'])
        assert dicval['VALE'][0]['P1'] == 1.
        assert dicval['VALE'][1]['P2'] == 0.4

    def test05_args_parsing(self):
        val = get_option_value(["-g"], "-g", action="store_true")
        assert val is True, val
        val = get_option_value(["--verbose"], "-g", action="store_true")
        assert val is False, val
        val = get_option_value(["--rcdir=REFE"], "--rcdir")
        assert val == "REFE", val
        val = get_option_value(["--bad=ERROR"], "--rcdir")
        assert val is None, val
        val = get_option_value(["--bad=ERROR", "--rcdir=REFE", "-g", "-a", "3", "--bad2=ERROR"], "--rcdir")
        assert val == "REFE", val

    def test06_local(self):
        assert is_localhost("localhost.localdomain")
        assert is_localhost(socket.gethostname())
        assert is_localhost(socket.getfqdn())
        assert not is_localhost("remote.domain.org", ignore_domain=False)
        assert is_localdisplay(":0.0")
        assert is_localdisplay("localhost:0.0")
        assert is_localdisplay(socket.gethostname() + ":0")

    def test07_FileName(self):
        from asrun.common.sysutils import FileName
        fname = FileName("/home/user/fname.ext")
        assert fname.user == fname.passwd == fname.host == '', fname.asdict()
        assert fname.is_local(), fname.repr()
        fname = FileName("server:/home/user/fname.ext")
        assert fname.passwd == '' and fname.host == "server", fname.asdict()
        assert fname.is_remote(), fname.repr()
        fname = FileName("username@server:/home/user/fname.ext")
        assert fname.passwd == '' and fname.host == "server" \
            and fname.user == "username", fname.asdict()
        assert fname.is_remote(), fname.repr()
        fname = FileName("username:password@server:/home/user/fname.ext")
        assert fname.host == "server" and fname.user == "username" \
            and fname.passwd == "password", fname.asdict()
        assert fname.is_remote(), fname.repr()
        fname = FileName("%(localuser)s@%(localhost)s:/home/user/fname.ext" % dict_conf)
        assert fname.is_local(), fname.repr()
        #fname = FileName("username:password_server:/home/user/fname.ext")
        fname = FileName("/home/user/fname.ext")
        fname.host = "server"
        assert fname.repr() == "server:/home/user/fname.ext", fname.repr()
        assert fname.is_remote(), fname.repr()
        fname.user = "username"
        assert fname.repr() == "username@server:/home/user/fname.ext", fname.repr()
        fname.passwd = "password"
        assert fname.passwd == 'password' and fname.host == "server" \
            and fname.user == "username", fname.asdict()
        assert fname.repr() == "username:password@server:/home/user/fname.ext", fname.repr()

    def test08_Enum(self):
        from asrun.common.utils import Enum
        color = Enum("Blue", "White", "Red")
        assert (color.Blue, color.White, color.Red) == (1, 2, 4)
        assert color.get_id(2) == "White"
        assert color.exists(4) is True
        assert color.exists(6) is False

    def test09_Configuration(self):
        from asrun.core.configuration import Configuration, Entry, EntryAlias
        from asrun.common.sysutils import CommandLine, get_display
        # without...
        ls = CommandLine("ls")
        assert ls.get_cmd() == "ls", ls.get_cmd()
        assert ls.get_args() == (), ls.get_args()
        assert ls.get_cmdline() == "ls", ls.get_cmdline()
        # ...and with argument
        edit = CommandLine("/usr/bin/editor", "--display=@D")
        disp = os.environ.get("DISPLAY", ":0.0")
        if ':0' in disp:
            disp = get_display()
        refv = "/usr/bin/editor --display=%s" % disp
        assert edit.get_cmdline() == refv, (edit.get_cmdline(), refv)

        simple_entry = Entry("terminal", "xterm", "doc string")
        edit_entry = Entry("editor", edit, "Entry using a CommandLine object")
        alias = EntryAlias("editeur", edit_entry)    # deprecated entry name
        assert simple_entry.value == "xterm", simple_entry.value
        assert alias.name == "editeur", alias.name
        assert edit_entry.name == alias.aliasto == "editor", (edit_entry.name, alias.aliasto)
        assert edit_entry.value == alias.value == refv, (edit_entry.value, alias.value)

        cfg = Configuration()
        assert cfg["node"] == local_host, cfg["node"]
        cfg.add_entry(simple_entry)
        assert cfg["terminal"] == "xterm", cfg["terminal"]

        #asrun_rc = osp.join(confdir, "asrun")
        #cfg = {}
        #read_rcfile(asrun_rc, cfg, mcsimp=['noeud',])
        #print cfg.keys()
        #print cfg['noeud'], type(cfg['noeud'])

    def test10_check_joker(self):
        assert check_joker("sslp100b.comm", ".comm")
        assert check_joker("sslp100b.comm", ".com?")
        assert not check_joker("sslp100b.comm", ".mail")
        assert not check_joker("sslp100b.com4", ".comm")
        assert check_joker("sslp100b.com4", ".com?")
        assert not check_joker("/opt/aster/NEW10/astest/sslp100b.comm", ".[0-9]*")
        assert not check_joker("/opt/aster/STA10.2/astest/sslp100b.comm", ".[0-9]*")
        assert check_joker("sslp100b.22", ".[0-9]*")
        assert not check_joker("sslp100b.22", ".com?")


    def test11_find_command(self):
        content = """DEBUT(PAR_LOT='NON',
      DEBUG='OUI',
)
ma=LIRE_MAILLAGE()

FIN()
"""
        i1, j1 = find_command(content, 'DEBUT')
        #print i1, j1, content[i:j+1]
        assert i1 == 0 and j1 == 40, content
        content = content[:j1+1] + os.linesep \
               + "INCLUDE(UNITE=unit)" + os.linesep \
               + content[j1+1:]
        i2, j2 = find_command(content, 'FIN')
        #print i2, j2, content[i2:j2+1]
        assert i2 == 83 and j2 == 87, content
        content = content[:i2] + os.linesep \
               + "INCLUDE(UNITE=unit)" + os.linesep \
               + content[i2:]
        i3, j3 = find_command(content, "INCLUDE")
        assert i3 == 42 and j3 == 60, content
        i4, j4 = find_command(content[j3+1:], "INCLUDE")
        #print j3 + 1 + i4, j3 + 1 + j4, content[j3+1:]
        assert j3 + 1 + i4 == 84 and j3 + 1 + j4 == 102, content

    def test12_is_newer(self):
        import shutil
        from asrun.common.sysutils import is_newer, on_linux
        # this file should not change every day
        fileold = "/etc/services"
        if not on_linux() or not osp.exists(fileold):
            return
        # same as cp -p : mtime values are equal
        filecp = osp.join(tmpdir, "services_copied")
        shutil.copy2(fileold, filecp)
        info_m = "fileold (%s), filecp (%s)" % (osp.getmtime(fileold), osp.getmtime(filecp))
        # because they have the same mtime
        assert not is_newer(filecp, fileold) and not is_newer(fileold, filecp), info_m
        # simple copy
        shutil.copy(fileold, filecp)
        info_m = "fileold (%s), filecp (%s)" % (osp.getmtime(fileold), osp.getmtime(filecp))
        assert is_newer(filecp, fileold), info_m
        assert not is_newer(fileold, filecp), info_m

    def test13_plt(self):
        # test plt functions
        from asrun.core.configuration import (
            plt_windows, plt_linux,
            get_plt_exec_name, plt_64bits
        )
        assert plt_linux("LINUX64")
        assert not plt_windows("SOLARIS")
        assert plt_windows("WIN64")
        assert plt_64bits("WIN64")
        assert not plt_64bits("LINUX")
        assert get_plt_exec_name("WIN32", "/path/as_run") == "/path/as_run.bat"

    def test14_lock(self):
        from asrun.common.lockfile import LockedFile, LockError
        # check acquire/release
        lof = LockedFile('/tmp/dummy', 'a', info=0)
        lof._acquire()
        lof._release()
        lof._acquire()
        lof._acquire()
        lof._release()

        def message():
            ct =  time.time()
            msecs = (ct - long(ct)) * 1000
            txt = "%s.%03d written by process %s\n" \
                % (time.strftime('%H:%M:%S'), msecs, os.getpid())
            return txt
        fo1 = LockedFile('/tmp/zzz', 'a')
        fo2 = LockedFile('/tmp/zzz', 'a')
        fo1.write(message())
        fo2.write(message())
        fo1.close()
        fo2.close()

    def test15_renametree(self):
        import shutil
        from asrun.common.utils import renametree
        def build_tree(dst):
            os.makedirs(dst)
            open(osp.join(dst, 'file1'), 'w').write('file 1')
            open(osp.join(dst, 'file2'), 'w').write('file 222222')
            dir1 = osp.join(dst, 'dir1')
            os.makedirs(dir1)
            open(osp.join(dir1, 'file3'), 'w').write('file 333333333333')
        def ls(dir):
            #print "### %s ###" % dir
            #os.system('ls -la %s' % dir)
            pass

        prev = os.getcwd()
        dtmp = osp.join(tmpdir, "misc.15")
        if osp.exists(dtmp):
            shutil.rmtree(dtmp)
        os.makedirs(dtmp)
        os.chdir(dtmp)
        tst = "tstdir"
        new = "newname"
        dup = "duplicate"
        build_tree(tst)
        ls(tst)

        renametree(tst, new)
        ls(new)
        assert osp.isdir(new)
        assert osp.isfile(osp.join(new, 'file1'))
        assert osp.isfile(osp.join(new, 'dir1', 'file3'))

        os.makedirs(dup)
        ok = False
        try:
            renametree(osp.join(new, 'dir1', 'file3'), dup)
        except OSError:
            ok = True
        assert ok

        open(osp.join(dup, 'file4'), 'w').write('file 000')
        ok = False
        try:
            renametree(osp.join(new, 'dir1'), osp.join(dup, 'file4'))
        except OSError:
            ok = True
        assert ok

        build_tree(tst)
        ls(tst)
        renametree(tst, dup)
        ls(dup)
        assert osp.isfile(osp.join(dup, 'file1'))
        assert osp.isfile(osp.join(dup, 'dir1', 'file3'))
        assert not osp.exists(tst)

        renametree(dup, osp.join(new, dup))
        ls(osp.join(new, dup))
        assert osp.isfile(osp.join(new, dup, 'file1'))
        assert osp.isfile(osp.join(new, dup, 'file4'))
        assert osp.isfile(osp.join(new, dup, 'dir1', 'file3'))
        assert osp.isdir(new)

        renametree(osp.join(new, dup), new)
        ls(new)
        assert osp.isfile(osp.join(new, 'file1'))
        assert osp.isfile(osp.join(new, 'file4'))
        assert osp.isfile(osp.join(new, 'dir1', 'file3'))
        assert not osp.exists(osp.join(new, dup))
        os.chdir(prev)


if __name__ == "__main__":
   unittest.main()