This file is indexed.

/usr/share/pyshared/gnatpython/mainloop.py is in python-gnatpython 54-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
 ############################################################################
 #                                                                          #
 #                          MAINLOOP.PY                                     #
 #                                                                          #
 #           Copyright (C) 2008 - 2011 Ada Core Technologies, Inc.          #
 #                                                                          #
 # This program is free software: you can redistribute it and/or modify     #
 # it under the terms of the GNU General Public License as published by     #
 # the Free Software Foundation, either version 3 of the License, or        #
 # (at your option) any later version.                                      #
 #                                                                          #
 # This program is distributed in the hope that it will be useful,          #
 # but WITHOUT ANY WARRANTY; without even the implied warranty of           #
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the            #
 # GNU General Public License for more details.                             #
 #                                                                          #
 # You should have received a copy of the GNU General Public License        #
 # along with this program.  If not, see <http://www.gnu.org/licenses/>     #
 #                                                                          #
 ############################################################################

"""Generic loop for testsuites

This package provides a class called MainLoop that provides a generic
implementation of a testsuite main loop. Parallelism, abortion and time
control are the key features.

Each MainLoop instance controls a set of Workers whose number is set
by the user. The list of tasks/tests to be achieved by the workers,
is provided by a list. The mainloop distribute the elements to the
the workers when they have nothing to do. Usually, an element is a
string identifying the test to be run. An element can also be a list
in that case the worker will execute sequentially each "subelement".
This case is used to adress dependency between tests (occurs for
example with the ACATS).

When a worker is asked to run a test, the command is executed by
calling run_testcase (testid). Once a test is finished the function
collect_result will be called with test id, and process (a
gnatpython.ex.Run object) and the job_info as parameters. Both
run_testcase and collect_result are user defined functions.

Note also that from the user point view there is no parallelism to handle.
The two user defined function run_testcase and collect_result are called
sequentially.
"""

from time import sleep, strftime

import logging
import os
import re
import sys

from gnatpython.env import Env
from gnatpython.ex import Run
from gnatpython.fileutils import (echo_to_file, FileUtilsError,
        mkdir, mv, rm, split_file)

logger = logging.getLogger('gnatpython.mainloop')


SKIP_EXECUTION = -1
# Ask the mainloop to stop execution for this test
# See MainLoop documentation


class NeedRequeue (Exception):
    """Raised by collect_result if a test need to be requeued"""
    pass


class Worker (object):
    """Run run_testcase and collect_result"""

    def __init__(self, items, run_testcase, collect_result, slot):
        """Worker constructor

        PARAMETERS
          items: item or list of items to be run by the worker
          run_testcase: command builder function (see MainLoop doc)
          collect_result: result processing function (see MailLoop doc)

        RETURN VALUE
          a Worker instance

        REMARKS
          None
        """
        self.run_testcase = run_testcase
        self.collect_result = collect_result
        self.slot = slot

        # Count the number of retry for the current test
        self.nb_retry = 0

        if isinstance(items, list):
            items.reverse()
            self.jobs = items
        else:
            self.jobs = [items]

        logger.debug('Init worker %d with %r' % (self.slot, self.jobs))
        self.current_process = None
        self.current_job = None
        self.execute_next()

    def execute_next(self):
        """Execute next worker item/test

        PARAMETERS
          None

        RETURN VALUE
          return False if the worker has nothing to do. True if a test is
          launched.

        REMARKS
          None
        """
        if len(self.jobs) == 0:
            return False
        else:
            self.current_job = self.jobs.pop()

            job_info = (self.slot, self.nb_retry)
            self.current_process = self.run_testcase(self.current_job,
                                                      job_info)
            return True

    def poll(self):
        """Test if a test/item is still executing

        PARAMETERS
          None

        RETURN VALUE
          True if busy, False otherwise.

        REMARKS
          None
        """
        if self.current_process == SKIP_EXECUTION:
            # Test not run by run_testcase
            # Call directly wait()
            self.wait()
            return False
        else:
            if self.current_process.poll() is not None:
                # Current process has finished
                self.wait()
                return False
            else:
                return True

    def wait(self):
        """Wait for a test/item to finish

        PARAMETERS
          None

        RETURN VALUE
          None

        REMARKS
          The collect_result function is called upon test/item termination
        """
        if self.current_process != SKIP_EXECUTION:
            self.current_process.wait()

        try:
            job_info = (self.slot, self.nb_retry)
            self.collect_result(self.current_job,
                                self.current_process,
                                job_info)
            self.current_job = None
            self.current_process = None

        except NeedRequeue:
            # Reinsert the current job in the job list
            self.nb_retry += 1
            self.jobs.append(self.current_job)


class MainLoop (object):
    """Run a list of jobs"""

    def __init__(self,
                 item_list,
                 run_testcase,
                 collect_result,
                 parallelism=None,
                 abort_file=None,
                 dyn_poll_interval=True):
        """Launch loop

        PARAMETERS
          item_list: a list of jobs
          run_testcase: a function that takes a job for argument and return
            the spawned process (ex.Run object). Its prototype should be
            func (name, job_info) with name the job identifier and job_info the
            related information, passed in a tuple (slot_number, job_retry)
            Note that if you want to take advantage of the parallelism the
            spawned process should be launched in background (ie with bg=True
            when using ex.Run)

            If run_testcase returns SKIP_EXECUTION instead of an ex.Run object
            the mainloop with directly call collect_result without waiting.
          collect_result: a function called when a job is finished. The
            prototype should be func (name, process, job_info). If
            collect_result raise NeedRequeue then the test will be requeued.
            job_info is a tuple: (slot_number, job_nb_retry)
          parallelism: number of workers
          abort_file: If specified, the loop will abort if the file is present
          dyn_poll_interval: If True the interval between each polling
            iteration is automatically updated. Otherwise it's set to 0.1
            seconds.

        RETURN VALUE
          a MainLoop instance

        REMARKS
          None
        """
        e = Env()
        self.parallelism = e.get_attr("main_options.mainloop_jobs",
                                      default_value=1,
                                      forced_value=parallelism)
        self.abort_file = e.get_attr("main_options.mainloop_abort_file",
                                     default_value=None,
                                     forced_value=abort_file)

        logger.debug("start main loop with %d workers (abort on %s)"
                      % (self.parallelism, self.abort_file))
        self.workers = [None] * self.parallelism
        iterator = item_list.__iter__()
        active_workers = 0
        max_active_workers = self.parallelism
        poll_sleep = 0.1

        try:
            while True:
                # Check for abortion
                if self.abort_file is not None and \
                  os.path.isfile(self.abort_file):
                    logger.info('Aborting: file %s has been found'
                                % self.abort_file)
                    self.abort()
                    return      # Exit the loop

                # Find free workers
                for slot, worker in enumerate(self.workers):
                    if worker is None:
                        # a worker slot is free so use it for next job
                        next_job = iterator.next()
                        self.workers[slot] = Worker(next_job,
                                                    run_testcase,
                                                    collect_result,
                                                    slot)
                        active_workers += 1

                poll_counter = 0
                logger.debug('Wait for free worker')
                while active_workers >= max_active_workers:
                    # All worker are occupied so wait for one to finish
                    poll_counter += 1
                    for slot, worker in enumerate(self.workers):
                        # Test if the worker is still active and have more
                        # job pending
                        if not (worker.poll() or worker.execute_next()):
                            # If not the case free the worker slot
                            active_workers -= 1
                            self.workers[slot] = None

                    sleep(poll_sleep)

                if dyn_poll_interval:
                    poll_sleep = compute_next_dyn_poll(poll_counter,
                                                       poll_sleep)

        except StopIteration:
            # All the tests are finished.
            while active_workers > 0:
                for slot, worker in enumerate(self.workers):
                    if not (worker is None
                            or worker.poll()
                            or worker.execute_next()):
                        active_workers -= 1
                        self.workers[slot] = None
                    sleep(0.1)

    def abort(self):
        """Abort the loop"""
        if self.abort_file is not None and os.path.isfile(self.abort_file):
            for worker in self.workers:
                if worker is not None:
                    worker.wait()


def generate_collect_result(result_dir=None, results_file=None,
        output_diff=False, use_basename=True, metrics=None, options=None):
    """Generate a collect result function

    The generated collect_result function is known to work with gnatpython
    default test driver: gnatpython.testdriver.TestRunner

    PARAMETERS
      result_dir: directory containing test results,
        if None use options.output_dir
      results_file: file containing the list of test status,
        if None use options.results_file
      output_diff: if True, output the .diff in case of failure (useful when
        debugging)
      use_basename: if True use the test basename to get the test name
        else use the relative path
      metrics: to collect metrics, just pass an empty dictionary or
        a dictionary containing a key named 'total' with an integer
        value equal to the number of test to run
      options: test driver and Main options

     NOTES
      When collecting metrics, a file named status will be created in
      result_dir and will contain some metrics
    """
    # Set result_dir and results_file if needed
    if options is not None and result_dir is None:
        result_dir = options.output_dir
    if results_file is None:
        results_file = options.results_file

    # Save the startup time
    start_time_str = strftime('%Y-%m-%d %H:%M:%S')

    if metrics is not None:
        for m in ('run', 'failed', 'crashed', 'new_failed', 'new_crashed'):
            metrics[m] = 0
        for m in ('old_diffs', 'old_crashes'):
            if m not in metrics:
                metrics[m] = []
        if not 'total' in metrics:
            metrics['total'] = 0

        DIFF_STATUS = ('DIFF', 'FAILED', 'PROBLEM')
        CRASH_STATUS = ('CRASH', )
        XFAIL_STATUS = ('XFAIL', )

        # Compute old metrics if needed
        if hasattr(options, 'old_output_dir') \
                and options.old_output_dir is not None:
            old_results = [k.split(':') for k in split_file(
                os.path.join(options.old_output_dir, 'results'),
                ignore_errors=True)]
            if 'old_diffs' not in metrics:
                metrics['old_diffs'] = [
                        k[0] for k in old_results if k[1] in DIFF_STATUS]
            if 'old_crashes' not in metrics:
                metrics['old_crashes'] = [
                        k[0] for k in old_results if k[1] in CRASH_STATUS]

    def collect_result(name, process, _job_info):
        """Default collect result function

        Read .result and .note file in {result_dir}/{test_name} dir
        Then append result to {result_file}

        If output_diff is True, print the content of .diff files

        REMARKS:
          name should be the path to the test directory
        """
        if metrics is not None:
            # Increment number of run tests
            metrics['run'] += 1

        if use_basename:
            test_name = os.path.basename(name)
        else:
            test_name = os.path.relpath(name, os.getcwd())
        try:
            test_result = split_file(
                result_dir + '/' + test_name + '.result')[0]
        except FileUtilsError:
            test_result = 'CRASH: cannot read result file'

        test_note = split_file(result_dir + '/' + test_name + '.note',
                               ignore_errors=True)

        if not test_note:
            test_note = ""
        else:
            test_note = test_note[0]

        # Append result to results file
        echo_to_file(results_file,
                     "%s:%s %s\n" % (test_name, test_result, test_note),
                     append=True)

        if metrics is not None:

            diffs_format = options.diffs_format if hasattr(
                    options, 'diffs_format') else None

            # Set last test name
            metrics['last'] = test_name

            # Update metrics and diffs or xfail_diffs file
            diffs_file = os.path.join(result_dir, 'diffs')
            xfail_diffs_file = os.path.join(result_dir, 'xfail_diffs')

            test_status = test_result.split(':')[0]
            if test_status in DIFF_STATUS:
                metrics['failed'] += 1
                if test_name not in metrics['old_diffs']:
                    metrics['new_failed'] += 1
                get_test_diff(result_dir, test_name, test_note,
                        test_result, diffs_file, diffs_format)
            elif test_status in CRASH_STATUS:
                metrics['crashed'] += 1
                if test_name not in metrics['old_crashes']:
                    metrics['new_crashed'] += 1
                get_test_diff(result_dir, test_name, test_note,
                        test_result, diffs_file, diffs_format)
            elif test_status in XFAIL_STATUS:
                get_test_diff(result_dir, test_name, test_note,
                        test_result, xfail_diffs_file, diffs_format)

            # Update global status
            s = []
            if "JOB_ID" in os.environ:
                s.append("%s running tests since %s\n" % (
                    os.environ['JOB_ID'], start_time_str))

            s.append("%(run)s out of %(total)s processed (now at %(last)s)"
                % metrics)
            s.append("%(new_failed)s new potential regression(s)"
                " among %(failed)s" % metrics)
            s.append("%(new_crashed)s new crash(es) among %(crashed)s"
                    % metrics)
            echo_to_file(os.path.join(result_dir, 'status'),
                    '\n'.join(s) + '\n')

        if process != SKIP_EXECUTION:
            # else the test has been skipped. No need to print its status.
            if test_result.startswith(
                    'DIFF') or test_result.startswith('CRASH'):
                logging_func = logging.error
            else:
                logging_func = logging.info

            logging_func("%-30s %s %s" % (test_name, test_result, test_note))

            if output_diff:
                diff_filename = result_dir + '/' + test_name + '.diff'
                if os.path.exists(diff_filename):
                    with open(diff_filename) as diff_file:
                        logging_func(diff_file.read().strip())
    return collect_result


def generate_run_testcase(driver, discs, options, use_basename=True):
    """Generate a basic run_test command

    PARAMETERS
       driver: test script to run
       discs: list of discriminants
       options: test driver and Main options
       use_basename: if True use the test basename to get the test name
         else use the relative path
    """
    def run_testcase(test, job_info):
        """Run the given test

        See mainloop documentation
        """
        skip_if_ok = hasattr(options, 'skip_if_ok') and options.skip_if_ok
        skip_if_run = hasattr(options,
                'skip_if_already_run') and options.skip_if_already_run
        skip_if_dead = hasattr(
                options, 'skip_if_dead') and options.skip_if_dead

        result_dir = options.output_dir

        if skip_if_ok or skip_if_run or skip_if_dead:
            try:
                if use_basename:
                    test_name = os.path.basename(test)
                else:
                    test_name = os.path.relpath(test, os.getcwd())

                old_result_file = os.path.join(
                    result_dir, test_name + '.result')
                if os.path.exists(old_result_file):
                    if skip_if_run:
                        return SKIP_EXECUTION
                    old_result = split_file(old_result_file)[0].split(':')[0]
                    if skip_if_ok and old_result in ('OK', 'UOK', 'PASSED'):
                        return SKIP_EXECUTION
                    if skip_if_dead and old_result == 'DEAD':
                        return SKIP_EXECUTION
            except FileUtilsError:
                logging.debug("Cannot get old result for %s" % test)
                pass

        # VxWorks tests needs WORKER_ID to be set in order to have an id for
        # vxsim that will not collide with other instances.
        os.environ['WORKER_ID'] = str(job_info[0])

        cmd = [sys.executable, driver,
               '-d', ",".join(discs),
               '-o', result_dir,
               '-t', options.tmp,
               test]
        if options.verbose:
            cmd.append('-v')
        if hasattr(options, 'host'):
            if options.host:
                cmd.append('--host=' + options.host)
            if options.target:
                cmd.append('--target=' + options.target)
        if not options.enable_cleanup:
            cmd.append('--disable-cleanup')
        if options.failed_only:
            cmd.append('--failed-only')
        if options.timeout:
            cmd.append('--timeout=' + options.timeout)
        if options.use_basename:
            cmd.append('--use-basename')
        return Run(cmd, bg=True, output=None)
    return run_testcase


def setup_result_dir(options):
    """Save old results and create new result dir

    PARAMETERS
      options: test driver and Main options. This dictionary will be modified
               in place to set: `results_file`, the path to the results file,
               `report_file`, the path to the report file. Note that
               `output_dir` and `old_output_dir` might be modified if
               keep_old_output_dir is True

    NOTES
      Required options are `output_dir`, `keep_old_output_dir`,
      `old_output_dir`, `skip_if_ok` and `skip_if_already_run`.
      Where:

      - output_dir: directory containing test result
      - keep_old_output_dir: if True, move last results in
        old_output_dir
      - old_output_dir:directory where the last results are kept.
        Note that if old_output_dir is None, and keep_old_output_dir
        is True, the last tests results will be moved in
        output_dir/old and the new ones in output_dir/new
      - skip_if_ok, skip_if_already_run: if one of these options is set to
        True, then just remove the results file.
    """
    output_dir = options.output_dir

    if options.keep_old_output_dir and options.old_output_dir is None:
        options.old_output_dir = os.path.join(output_dir, 'old')
        options.output_dir = os.path.join(output_dir, 'new')

    options.results_file = os.path.join(options.output_dir, 'results')
    options.report_file = os.path.join(options.output_dir, 'report')

    if options.skip_if_ok or options.skip_if_already_run:
        # Remove only the results file
        rm(options.results_file)
    else:
        if not options.keep_old_output_dir:
            # We don't want to keep old results. Just clean the new output_dir
            if os.path.exists(options.output_dir):
                rm(options.output_dir, True)
        else:
            # Move output_dir to old_output_dir
            if os.path.exists(options.old_output_dir):
                rm(options.old_output_dir, True)
            if os.path.exists(options.output_dir):
                mv(options.output_dir, options.old_output_dir)
            else:
                mkdir(options.old_output_dir)

    mkdir(options.output_dir)


def compute_next_dyn_poll(poll_counter, poll_sleep):
    """Adjust the polling delay"""
    # if two much polling is done, the loop might consume too
    # much resources. In the opposite case, we might wait too
    # much to launch new jobs. Adjust accordingly.
    if poll_counter > 8 and poll_sleep < 1.0:
        poll_sleep *= 1.25
        logger.debug('Increase poll interval to %f' % poll_sleep)
    elif poll_sleep > 0.0001:
        poll_sleep *= 0.75
        logger.debug('Decrease poll interval to %f' % poll_sleep)
    return poll_sleep


def get_test_diff(
    result_dir, name, note, result_str, filename, diffs_format):
    """Update diffs and xfail_diffs files

    PARAMETERS
        name: test name
        note: annotation
        result_str: content of the test .result file
        filename: file to update

    RETURN VALUE
        None
    """
    result = ["================ Bug %s %s" % (name, note)]
    if diffs_format == 'diff':
        result += split_file(result_dir + '/' + name + '.diff',
                                ignore_errors=True)[0:2000]
    else:
        if re.match("DIFF:unexpected", result_str):
            result.append("---------------- unexpected output")
            result += split_file(result_dir + '/' + name + '.out',
                                ignore_errors=True)[0:100]

        elif re.match("CRASH:", result_str):
            result.append("---------------- unexpected output")
            result += split_file(result_dir + '/' + name + '.out',
                                ignore_errors=True)[0:30]

        elif re.match("DIFF:output|XFAIL:|FAILED:|PROBLEM:", result_str):
            result.append("---------------- expected output")
            result += split_file(result_dir + '/' + name + '.expected',
                                ignore_errors=True)[0:2000]
            result.append("---------------- actual output")
            result += split_file(result_dir + '/' + name + '.out',
                                ignore_errors=True)

    echo_to_file(filename, result, append=True)


def add_mainloop_options(main, extended_options=False):
    """Add command line options to control mainloop default

    PARAMETERS
      main : a gnatpython.main.Main instance
      extended_options: if True, add additional options that require
        using the gnatpython testdriver and the generate_run_testcase,
        generate_collect_result functions.

    RETURN VALUE
      None
    """
    mainloop_opts = main.create_option_group("Mainloop control""")

    mainloop_opts.add_option(
        "-j", "--jobs",
        dest="mainloop_jobs",
        type="int",
        metavar="N",
        default=1,
        help="Specify the number of jobs to run simultaneously")
    mainloop_opts.add_option(
        "--abort-file",
        dest="mainloop_abort_file",
        metavar="FILE",
        default="",
        help="Specify a file whose presence cause loop abortion")

    if extended_options:
        mainloop_opts.add_option(
            "--skip-if-ok",
            action="store_true",
            default=False,
            help="If the test result is found and is OK skip the test")
        mainloop_opts.add_option(
            "--skip-if-dead",
            action="store_true",
            default=False,
            help="If the test result is found and is DEAD skip the test")
        mainloop_opts.add_option(
            "--skip-if-already-run",
            action="store_true",
            default=False,
            help="If the test result is found skip the test")
        mainloop_opts.add_option(
            "--old-output-dir",
            dest="old_output_dir",
            metavar="DIR",
            default=None,
            help="Select old output dir")
        mainloop_opts.add_option(
            "--keep-old-output-dir",
            dest="keep_old_output_dir",
            action="store_true",
            help="Keep old output dir. Note that if --old-output-dir is not"
            " set, the old output dir will be stored in OUTPUT_DIR/old and"
            " the new test outputs in OUTPUT_DIR/new")

    main.add_option_group(mainloop_opts)