This file is indexed.

/usr/share/pyshared/gnatpython/testdriver.py is in python-gnatpython 54-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
 ############################################################################
 #                                                                          #
 #                           TESTDRIVER.PY                                  #
 #                                                                          #
 #           Copyright (C) 2008 - 2010 Ada Core Technologies, Inc.          #
 #                                                                          #
 # This program is free software: you can redistribute it and/or modify     #
 # it under the terms of the GNU General Public License as published by     #
 # the Free Software Foundation, either version 3 of the License, or        #
 # (at your option) any later version.                                      #
 #                                                                          #
 # This program is distributed in the hope that it will be useful,          #
 # but WITHOUT ANY WARRANTY; without even the implied warranty of           #
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the            #
 # GNU General Public License for more details.                             #
 #                                                                          #
 # You should have received a copy of the GNU General Public License        #
 # along with this program.  If not, see <http://www.gnu.org/licenses/>     #
 #                                                                          #
 ############################################################################
"""Run a bugs test located in test_dir

Define a default test driver: TestRunner
"""

from gnatpython.env import Env
from gnatpython.fileutils import (
    split_file, echo_to_file, diff, rm, mkdir, cp, get_rlimit)
from gnatpython.optfileparser import OptFileParse
from gnatpython.stringutils import Filter

import logging
import os
import re
import shutil
import subprocess
import sys

IS_STATUS_FAILURE = {
    'DEAD': False,
    'CRASH': True,
    'INVALID_TEST': True,
    'INVALID_TEST_OPT': True,
    'UNKNOWN': True,
    'OK': False,
    'DIFF': True}
# Dictionary for which keys are the available test status. Associated value
# is a boolean that is True if status should be considered as a failure, False
# otherwise. Note that XFAIL and UOK are handled separately by the script.


class TestRunner(object):
    """Default test driver

    ATTRIBUTES
      test: full path to test location
      discs: a list of discriminants (list of strings)
      cmd_line: the command line to be spawned (list of strings)
      test_name: name of the test
      result_prefix: prefix of files that are written in the result directory
      work_dir: the working directory in which the test will be executed
      output: name of the temporary file that hold the test output
      result: current state of the test. This is a dictionary with 3 keys:
        'result' that contains the test status, 'msg' the associated short
        message and 'is_failure' a boolean that is True if the test should
        be considered as a failure
      opt_results: context dependent variable (dictionnary)
      bareboard_mode: True if in bareboard mode. Default is False

    REMARKS
      For code readability, methods are ordered following the invocation order
      used by the execute 'method'
    """

    def __init__(self,
                 test,
                 discs,
                 result_dir,
                 temp_dir=Env().tmp_dir,
                 enable_cleanup=True,
                 restricted_discs=None,
                 test_args=None,
                 failed_only=False,
                 default_timeout=780,
                 use_basename=True):
        """TestRunner constructor

        PARAMETERS
          test: location of the test
          discs: list of discriminants
          result_dir: directory in which results will be stored
          temp_dir: temporary directory used during test run
          restricted_mode: None or a list of discriminants
          failed_only: run failed only
          use_basename: if True use the test basename to get the test name
            else use the relative path

        RETURN VALUE
          a TestRunner instance
        """
        self.test = test.rstrip('/')
        self.discs = discs
        self.cmd_line = None
        self.test_args = test_args
        self.enable_cleanup = enable_cleanup
        self.restricted_discs = restricted_discs
        self.skip = False  # if True, do not run execute()

        # Test name
        if use_basename:
            self.test_name = os.path.basename(self.test)
        else:
            self.test_name = os.path.relpath(self.test, os.getcwd())

        # Prefix of files holding the test result
        self.result_prefix = result_dir + '/' + self.test_name

        mkdir(os.path.dirname(self.result_prefix))

        # Temp directory in which the test will be run
        self.work_dir = os.path.realpath(os.path.join(temp_dir,
                'tmp-test-%s-%d' % (self.test_name, os.getpid())))
        self.output = self.work_dir + '/tmpout'
        self.diff_output = self.work_dir + '/diff'

        # Initial test status
        self.result = {'result': 'UNKNOWN', 'msg': '', 'is_failure': True}

        # Some tests save the pids of spawned background processes in
        # work_dir/.pids. The TEST_WORK_DIR environment variable is used to
        # pass the working directory location.
        os.environ['TEST_WORK_DIR'] = self.work_dir

        if failed_only:
            # Read old result now
            previous_result = self.read_result()
            if previous_result in IS_STATUS_FAILURE \
               and not IS_STATUS_FAILURE[previous_result]:
                # We don't need to run this test. Return now
                self.skip = True
                return

        # Be sure to be a sane environment
        rm(self.result_prefix + '.result')
        rm(self.result_prefix + '.out')
        rm(self.result_prefix + '.expected')
        rm(self.result_prefix + '.diff')

        # Parse test.opt. By default a test is not DEAD, SKIP nor XFAIL. Its
        # maximum execution time is 780s. Test script is test.cmd and output is
        # compared against test.out.
        self.opt_results = {'RLIMIT': str(default_timeout),
                            'DEAD': None,
                            'XFAIL': False,
                            'SKIP': None,
                            'OUT': 'test.out',
                            'CMD': 'test.cmd',
                            'FILESIZE_LIMIT': None,
                            'NOTE': None}

        # test.cmd have priority, if not found use test.py
        if not os.path.isfile(
            self.test + '/test.cmd') and os.path.isfile(
                self.test + '/test.py'):
            self.opt_results['CMD'] = 'test.py'

    def cleanup(self, force=False):
        """Remove generated files"""
        rm(self.result_prefix + '.result')
        rm(self.result_prefix + '.out')
        rm(self.result_prefix + '.expected')
        rm(self.result_prefix + '.diff')

    def execute(self):
        """Complete test execution

        REMARKS
          Calls all the steps that are needed to run the test.
        """
        if self.skip:
            logging.debug("SKIP %s - failed only mode" % self.test)
            return

        # Adjust test context
        self.adjust_to_context()

        for key in ('CMD', 'OUT'):
            # Read command file and expected output from working directory
            self.opt_results[key] = self.work_dir + '/src/' + \
                    self.opt_results[key]

        # Keep track of the discriminants that activate the test
        if self.opt_results['NOTE']:
            echo_to_file(self.result_prefix + '.note',
                         '(' + self.opt_results['NOTE'] + ')\n')

        # If a test is either DEAD or SKIP then do not execute it. The only
        # difference is that a SKIP test will appear in the report whereas a
        # DEAD test won't.

        for opt_cmd in ('DEAD', 'SKIP'):
            if self.opt_results[opt_cmd] is not None:
                echo_to_file(self.result_prefix + '.result',
                             opt_cmd + ':%s\n' % self.opt_results[opt_cmd])
                return

        if self.result['result'] != 'UNKNOWN':
            self.write_results()
            return

        # Run the test
        self.prepare_working_space()
        self.compute_cmd_line()
        self.run()

        # Analyze the results and write them into result_dir
        self.set_output_filter()
        self.analyze()
        self.write_results()

        # Clean the working space
        if self.enable_cleanup:
            self.clean()

    def adjust_to_context(self):
        """Adjust test environment to context

        REMARKS
          At this stage we parse the test.opt and adjust the opt_results
          attribute value. The driver will check if the test should be run
          (i.e is DEAD) right after this step.
        """
        if self.restricted_discs is not None:
            opt_file_content = ['ALL DEAD disabled by default']
            if os.path.isfile(self.test + '/test.opt'):
                opt_file_content += split_file(self.test + '/test.opt')

            opt = OptFileParse(self.discs, opt_file_content)
            self.opt_results = opt.get_values(self.opt_results)
            if not self.opt_results['DEAD']:
                activating_tags = opt.get_note(sep='')
                for d in self.restricted_discs:
                    if d not in activating_tags:
                        self.opt_results['DEAD'] = \
                          '%s not in activating tags' % d
        else:
            opt = OptFileParse(self.discs, self.test + '/test.opt')
            self.opt_results = opt.get_values(self.opt_results)

        self.opt_results['NOTE'] = opt.get_note()

        if not os.path.isfile(self.test + '/' + self.opt_results['CMD']):
            self.result = {
              'result': 'INVALID_TEST',
              'msg': 'cannot find script file %s' % (self.opt_results['CMD']),
              'is_failure': True}
            return

        if self.opt_results['OUT'][-8:] != 'test.out' and \
          not os.path.isfile(self.test + '/' + self.opt_results['OUT']):
            tmp = os.path.basename(self.opt_results['OUT'])
            self.result = {
              'result': 'INVALID_TEST',
              'msg': 'cannot find output file %s' % (tmp),
              'is_failure': True}
            return

    def prepare_working_space(self):
        """Prepare working space

        REMARKS
          Set the working space in self.work_dir. This resets the working
          directory and copies the test into <work_dir>/src. This directory
          can be used to hold temp files as it will be automatically deleted
          at the end of the test by the clean method
        """
        # At this stage the test should be executed so start copying test
        # sources in a temporary location.
        rm(self.work_dir, True)
        mkdir(self.work_dir)
        try:
            shutil.copytree(self.test, self.work_dir + '/src')
        except shutil.Error:
            print >> sys.stderr, "Error when copying %s in %s" % (
                    self.test, self.work_dir + '/src')

    def compute_cmd_line_py(self, filesize_limit):
        """Compute self.cmd_line and preprocess the test script

        REMARKS
          This function is called by compute_cmd_line
        """
        self.cmd_line += [sys.executable, self.opt_results['CMD']]
        if self.test_args:
            self.cmd_line += self.test_args

    def compute_cmd_line_cmd(self, filesize_limit):
        """Compute self.cmd_line and preprocess the test script

        REMARKS
          This function is called by compute_cmd_line
        """
        cmd = self.opt_results['CMD']
        if Env().host.os.name != 'windows':
            script = split_file(cmd)

            # The test is run on a Unix system but has a 'cmd' syntax.
            # Convert it to Bourne shell syntax.
            cmdfilter = Filter()
            cmdfilter.append([r'-o(.*).exe', r'-o \1'])
            cmdfilter.append([r'%([^ ]*)%', r'"$\1"'])
            cmdfilter.append([r'(\032|\015)', r''])
            cmdfilter.append([r'set *([^ =]+) *= *([^ ]*)',
                              r'\1="\2"; export \1'])
            script = cmdfilter.process(script)

            cmd = self.work_dir + '/__test.sh'
            echo_to_file(cmd, 'PATH=.:$PATH; export PATH\n')

            # Compute effective file size limit on Unix system.
            if filesize_limit > 0:
                # File size limit can be specified either by a default or by
                # mean of FILESIZE_LIMIT command in the test test.opt. When
                # both are specified use the upper limit (note that 0 means
                # unlimited).
                opt_limit = self.opt_results['FILESIZE_LIMIT']
                if opt_limit is not None:
                    try:
                        opt_limit = int(opt_limit)
                    except TypeError:
                        opt_limit = filesize_limit
                else:
                    opt_limit = filesize_limit

                if opt_limit != 0:
                    if filesize_limit < opt_limit:
                        filesize_limit = opt_limit

                    # Limit filesize. Argument to ulimit is a number of blocks
                    # (512 bytes) so multiply by two the argument given by the
                    # user. Filesize limit is not supported on Windows.
                    echo_to_file(cmd,
                                 'ulimit -f %s\n' % (filesize_limit * 2),
                                 True)

            # Source support.sh in TEST_SUPPORT_DIR if set
            if 'TEST_SUPPORT_DIR' in os.environ and \
              os.path.isfile(os.environ['TEST_SUPPORT_DIR'] + '/support.sh'):
                echo_to_file(cmd, '. $TEST_SUPPORT_DIR/support.sh\n', True)

            echo_to_file(cmd, script, True)

            self.cmd_line += ['bash', cmd]
        else:
            # On windows system, use cmd to run the script.
            if cmd[-4:] != '.cmd':
                # We are about to use cmd.exe to run a test. In this case,
                # ensure that the file extension is .cmd otherwise a dialog box
                # will popup asking to choose the program that should be used
                # to run the script.
                cp(cmd, self.work_dir + '/test__.cmd')
                cmd = self.work_dir + '/test__.cmd'

            self.cmd_line += ['cmd.exe', '/q', '/c', cmd]

    def compute_cmd_line(self, filesize_limit=36000):
        """Compute command line

        PARAMETERS
          filesize_limit: if set to something greater than 0 then a
          "ulimit -f" is inserted in the scripts. The unit of filesize_limit
          is Kb.

        REMARKS
          When this step is called we assume that we have all the context set
          and that the working space is in place. The main goal of this step
          is to compute self.cmd_line and do any processing on the test script
          file.

          If the script is in Windows CMD format, convert it to Bourne shell
          syntax on UNIX system and source TEST_SUPPORT_DIR/support.sh if exist
        """
        # Find which script language is used. The default is to consider it
        # in Windows CMD format.
        _, ext = os.path.splitext(self.opt_results['CMD'])
        if ext in ['.cmd', '.py']:
            cmd_type = ext[1:]
        else:
            cmd_type = 'cmd'

        rlimit = get_rlimit()
        assert rlimit, 'rlimit not found'
        self.cmd_line = [rlimit, self.opt_results['RLIMIT']]
        if cmd_type == 'py':
            self.compute_cmd_line_py(filesize_limit)
        elif cmd_type == 'cmd':
            self.compute_cmd_line_cmd(filesize_limit)

    def run(self):
        """Run the test

        REMARKS
          This step should spawn the test using self.cmd_line and save its
          output in self.output.
        """
        # Run the test
        # Here we are calling directly subprocess function as it is a bit
        # faster than using gnatpython.ex.Run
        logging.debug("RUN: %s" % " ".join(self.cmd_line))
        fd = open(self.output, 'w')
        subprocess.call(self.cmd_line,
                        cwd=self.work_dir + '/src',
                        stdout=fd,
                        bufsize=-1,
                        stderr=subprocess.STDOUT)
        fd.close()

    def apply_output_filter(self, str_list):
        """Apply the output filters

        PARAMETERS
          str_list: a list of strings

        RETURN VALUE
          a list of string
        """
        return self.output_filter.process(str_list)

    def set_output_filter(self):
        """Set output filters

        REMARKS
          output filters are applied both to expected output and test output
          before comparing them.
        """
        self.output_filter = Filter()
        # General filters. Filter out CR and '.exe' and work_dir and replace
        # \ by /
        self.output_filter.append([r'\\', r'/'])
        self.output_filter.append([r'(.exe|\015)', r''])
        self.output_filter.append([r'[^ \'"]*%s/src/' %
                                   os.path.basename(self.work_dir), r''])

    def get_status_filter(self):
        """Get the status filters

        RETURN VALUE
          a list. Each element is a list containing two items. The first is a
          regexp, the second a dictionnary used to update self.result.

        REMARKS
          the return value will be used the following way. For each entry, if
          the test output match the regexp then we update self.result with its
          dictionnary. Only the first match is taken into account.
        """
        result = [['Segmentation fault',
                   {'result': 'CRASH', 'msg': 'Segmentation fault'}],
                  ['Bus error',
                   {'result': 'CRASH', 'msg': 'Bus error'}],
                  ['Cputime limit exceeded',
                   {'result': 'CRASH', 'msg': 'Cputime limit exceeded'}],
                  ['Filesize limit exceeded',
                   {'result': 'CRASH', 'msg': 'Filesize limit exceeded'}]]
        return result

    def analyze(self):
        """Compute test status

        REMARKS
          This method should set the final value of 'result' attribute
        """
        # Retrieve the outputs and see if we match some of the CRASH or DEAD
        # patterns
        output = split_file(self.output, ignore_errors=True)
        if output:
            tmp = "\n".join(output)
            for pattern in self.get_status_filter():
                if re.search(pattern[0], tmp):
                    self.result.update(pattern[1])
                    break

        # If the test status has not been updated compare output with the
        # baseline
        if self.result['result'] == 'UNKNOWN':
            # Retrieve expected output
            expected = split_file(self.opt_results['OUT'], ignore_errors=True)

            # Process output and expected output with registered filters
            expected = self.apply_output_filter(expected)
            output = self.apply_output_filter(output)

            d = diff(expected, output)
            if d:
                logging.debug(d)
                self.result['result'] = 'DIFF'
                if len(expected) == 0:
                    self.result['msg'] = 'unexpected output'
                else:
                    self.result['msg'] = 'output'
                diff_file = open(self.diff_output, 'w')
                diff_file.write(d)
                diff_file.close()
            else:
                self.result = {'result': 'OK',
                               'msg': '',
                               'is_failure': False}

        self.result['is_failure'] = IS_STATUS_FAILURE[self.result['result']]

        # self.opt_results['XFAIL'] contains the XFAIL comment or False
        # The status should be set to XFAIL even if the comment is empty
        if self.opt_results['XFAIL'] != False:
            if self.result['result'] in ['DIFF', 'CRASH']:
                self.result.update({'result': 'XFAIL',
                                    'msg': self.opt_results['XFAIL']})
            elif self.result['result'] == 'OK':
                self.result.update({'result': 'UOK',
                                    'msg': self.opt_results['XFAIL']})

    def write_results(self):
        """Write results on disk

        REMARKS
          Write at least .result and maybe .out and .expected files in the
          result directory.
        """
        echo_to_file(self.result_prefix + '.result',
                 self.result['result'] + ':' + self.result['msg'] + '\n')

        if self.result['is_failure']:
            if os.path.isfile(self.opt_results['OUT']):
                cp(self.opt_results['OUT'], self.result_prefix + '.expected')
            if os.path.isfile(self.output):
                cp(self.output, self.result_prefix + '.out')
            if os.path.isfile(self.diff_output):
                cp(self.diff_output, self.result_prefix + '.diff')

    def read_result(self):
        """Read last result"""
        if os.path.exists(self.result_prefix + '.result'):
            with open(self.result_prefix + '.result') as f_res:
                return f_res.read().strip().split(':')[0]

    def clean(self):
        """Clean up working space

        REMARKS
          Clean any temporary files
        """
        # Clean up before exiting
        rm(self.work_dir, True)


def add_run_test_options(m):
    run_test_opts = m.create_option_group("Test driver options")
    run_test_opts.add_option(
        "-o", "--output-dir",
        dest="output_dir",
        metavar="DIR",
        default="./out",
        help="select output dir")
    run_test_opts.add_option(
        "--timeout",
        default='780',
        metavar="SECONDS",
        help="Default timeout")
    run_test_opts.add_option(
        "-d", "--discriminants",
        dest="discs",
        metavar="DISCS",
        default="ALL",
        help="set discriminants")
    run_test_opts.add_option(
        "-t", "--temp-dir",
        dest="tmp",
        metavar="DIR",
        default=Env().tmp_dir)
    run_test_opts.add_option(
        "-e", "--env-file",
        dest="env_file",
        metavar="FILE",
        default="load env file")
    run_test_opts.add_option(
        "--disable-cleanup",
        dest="enable_cleanup",
        action="store_false",
        default=True,
        help="disable cleanup of working space")
    run_test_opts.add_option(
        "-r", "--restricted-mode",
        dest="restricted_discs",
        metavar="DISCS",
        default=None,
        help="enable restricted mode")
    run_test_opts.add_option(
        '-f', '--failed-only',
        action="store_true",
        help="run failed only - skip the test is last result is OK")
    run_test_opts.add_option(
        '--use-basename',
        action='store_true',
        help="Use os.path.basename to get the real name of a test. "
        "Note that this will only work if you don't have two tests with "
        "the same name in your test directories")
    m.add_option_group(run_test_opts)