/usr/lib/python3/dist-packages/plainbox/impl/commands/inv_run.py is in python3-plainbox 0.25-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 | # This file is part of Checkbox.
#
# Copyright 2012-2015 Canonical Ltd.
# Written by:
# Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3,
# as published by the Free Software Foundation.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox. If not, see <http://www.gnu.org/licenses/>.
"""
:mod:`plainbox.impl.commands.run` -- run sub-command
====================================================
.. warning::
THIS MODULE DOES NOT HAVE STABLE PUBLIC API
"""
from shutil import copyfileobj
import collections
import datetime
import io
import itertools
import logging
import os
import sys
import time
from plainbox.abc import IJobResult
from plainbox.abc import IJobRunnerUI
from plainbox.i18n import gettext as _
from plainbox.i18n import ngettext
from plainbox.i18n import pgettext as C_
from plainbox.impl.color import Colorizer
from plainbox.impl.commands.inv_checkbox import CheckBoxInvocationMixIn
from plainbox.impl.depmgr import DependencyDuplicateError
from plainbox.impl.exporter import ByteStringStreamTranslator
from plainbox.impl.exporter.text import TextSessionStateExporter
from plainbox.impl.result import JobResultBuilder
from plainbox.impl.result import MemoryJobResult
from plainbox.impl.result import tr_outcome
from plainbox.impl.runner import JobRunner
from plainbox.impl.session import SessionManager
from plainbox.impl.session import SessionMetaData
from plainbox.impl.session import SessionPeekHelper
from plainbox.impl.session import SessionResumeError
from plainbox.impl.session import SessionStorageRepository
from plainbox.impl.transport import get_all_transports
from plainbox.impl.transport import TransportError
logger = logging.getLogger("plainbox.commands.run")
Action = collections.namedtuple("Action", "accel label cmd")
class ActionUI:
"""
A simple user interface to display a list of actions and let the user to
pick one
"""
def __init__(self, action_list, prompt=None, color=None):
"""
:param action_list:
A list of 3-tuples (accel, label, cmd)
:prompt:
An optional prompt string
:returns:
cmd of the selected action or None
"""
if prompt is None:
prompt = _("Pick an action")
self.action_list = action_list
self.prompt = prompt
self.C = Colorizer(color)
def run(self):
long_hint = "\n".join(
" {accel} => {label}".format(
accel=self.C.BLUE(action.accel) if action.accel else ' ',
label=action.label)
for action in self.action_list)
short_hint = ''.join(action.accel for action in self.action_list)
while True:
try:
print(self.C.BLUE(self.prompt))
print(long_hint)
choice = input("[{}]: ".format(self.C.BLUE(short_hint)))
except EOFError:
return None
else:
for action in self.action_list:
if choice == action.accel or choice == action.label:
return action.cmd
class SilentUI(IJobRunnerUI):
def considering_job(self, job, job_state):
pass
def about_to_start_running(self, job, job_state):
pass
def wait_for_interaction_prompt(self, job):
pass
def started_running(self, job, job_state):
pass
def about_to_execute_program(self, args, kwargs):
pass
def finished_executing_program(self, returncode):
pass
def got_program_output(self, stream_name, line):
pass
def finished_running(self, job, job_state, job_result):
pass
def notify_about_description(self, job):
pass
def notify_about_purpose(self, job):
pass
def notify_about_steps(self, job):
pass
def notify_about_verification(self, job):
pass
def job_cannot_start(self, job, job_state, job_result):
pass
def finished(self, job, job_state, job_result):
pass
def pick_action_cmd(self, action_list, prompt=None):
pass
def noreturn_job(self):
pass
class NormalUI(IJobRunnerUI):
STREAM_MAP = {
'stdout': sys.stdout,
'stderr': sys.stderr
}
def __init__(self, color, show_cmd_output=True):
self.show_cmd_output = show_cmd_output
self.C = Colorizer(color)
self._color = color
def considering_job(self, job, job_state):
print(self.C.header(job.tr_summary(), fill='-'))
print(_("ID: {0}").format(job.id))
print(_("Category: {0}").format(job_state.effective_category_id))
def about_to_start_running(self, job, job_state):
pass
def wait_for_interaction_prompt(self, job):
return self.pick_action_cmd([
Action('', _("press ENTER to continue"), 'run'),
Action('c', _('add a comment'), 'comment'),
Action('s', _("skip this job"), 'skip'),
Action('q', _("save the session and quit"), 'quit')
])
def started_running(self, job, job_state):
pass
def about_to_execute_program(self, args, kwargs):
if self.show_cmd_output:
print(self.C.BLACK("... 8< -".ljust(80, '-')))
else:
print(self.C.BLACK("(" + _("Command output hidden") + ")"))
def got_program_output(self, stream_name, line):
if not self.show_cmd_output:
return
stream = self.STREAM_MAP[stream_name]
stream = {
'stdout': sys.stdout,
'stderr': sys.stderr
}[stream_name]
if stream_name == 'stdout':
print(self.C.GREEN(line.decode("UTF-8", "ignore")),
end='', file=stream)
elif stream_name == 'stderr':
print(self.C.RED(line.decode("UTF-8", "ignore")),
end='', file=stream)
stream.flush()
def finished_executing_program(self, returncode):
if self.show_cmd_output:
print(self.C.BLACK("- >8 ---".rjust(80, '-')))
def finished_running(self, job, state, result):
pass
def notify_about_description(self, job):
if job.tr_description() is not None:
print(self.C.CYAN(job.tr_description()))
def notify_about_purpose(self, job):
if job.tr_purpose() is not None:
print(self.C.WHITE(_("Purpose:")))
print()
print(self.C.CYAN(job.tr_purpose()))
print()
else:
self.notify_about_description(job)
def notify_about_steps(self, job):
if job.tr_steps() is not None:
print(self.C.WHITE(_("Steps:")))
print()
print(self.C.CYAN(job.tr_steps()))
print()
def notify_about_verification(self, job):
if job.tr_verification() is not None:
print(self.C.WHITE(_("Verification:")))
print()
print(self.C.CYAN(job.tr_verification()))
print()
def job_cannot_start(self, job, job_state, result):
print(_("Job cannot be started because:"))
for inhibitor in job_state.readiness_inhibitor_list:
print(" - {}".format(self.C.YELLOW(inhibitor)))
def finished(self, job, job_state, result):
self._print_result_outcome(result)
def _print_result_outcome(self, result):
print(_("Outcome") + ": " + self.C.result(result))
def pick_action_cmd(self, action_list, prompt=None):
return ActionUI(action_list, prompt, self._color).run()
def noreturn_job(self):
print(self.C.RED(_("Waiting for the system to shut down or"
" reboot...")))
class ReRunJob(Exception):
"""
Exception raised from _interaction_callback to indicate that a job should
be re-started.
"""
class RunInvocation(CheckBoxInvocationMixIn):
"""
Invocation of the 'plainbox run' command.
attr ns:
The argparse namespace obtained from RunCommand
attr _manager:
The SessionManager object
attr _runner:
The JobRunner object
attr _exporter:
A ISessionStateExporter of some kind
attr _transport:
A ISessionStateTransport of some kind (optional)
attr _backtrack_and_run_missing:
A flag indicating that we should run over all the jobs in the
self.state.run_list again, set every time a job is added. Reset every
time the loop-over-all-jobs is started.
"""
def __init__(self, provider_loader, config_loader, ns, color):
super().__init__(provider_loader, config_loader)
self.ns = ns
self._manager = None
self._runner = None
self._exporter = None
self._transport = None
self._backtrack_and_run_missing = True
self._color = color
self._test_plan = self.find_test_plan()
self.C = Colorizer(color)
@property
def manager(self):
"""
SessionManager object of the current session
"""
return self._manager
@property
def runner(self):
"""
JobRunner object of the current session
"""
return self._runner
@property
def state(self):
"""
SessionState object of the current session
"""
return self.manager.state
@property
def metadata(self):
"""
SessionMetaData object of the current session
"""
return self.state.metadata
@property
def storage(self):
"""
SessionStorage object of the current session
"""
return self.manager.storage
@property
def exporter(self):
"""
The ISessionStateExporter of the current session
"""
return self._exporter
@property
def transport(self):
"""
The ISessionStateTransport of the current session (optional)
"""
return self._transport
@property
def is_interactive(self):
"""
Flag indicating that this is an interactive invocation and we can
interact with the user when we encounter OUTCOME_UNDECIDED
"""
return (sys.stdin.isatty() and sys.stdout.isatty() and not
self.ns.non_interactive)
def run(self):
ns = self.ns
if ns.transport == _('?'):
self._print_transport_list(ns)
return 0
else:
return self.do_normal_sequence()
def do_normal_sequence(self):
"""
Proceed through normal set of steps that are required to runs jobs
"""
# Create transport early so that we can handle bugs before starting the
# session.
self.create_transport()
if self.is_interactive:
resumed = self.maybe_resume_session()
else:
self.create_manager(None)
resumed = False
if self.ns.output_options == _('?'):
self._print_output_option_list(self.ns)
return 0
elif self.ns.output_format == _('?'):
self._print_output_format_list(self.ns)
return 0
if self.ns.output_format not in self.manager.exporter_map:
print(_("invalid choice: '{}'".format(self.ns.output_format)))
self._print_output_format_list(self.ns)
return 1
# Create exporter after we get a session to query the manager and get
# all exporter units
self.create_exporter()
# Create the job runner so that we can do stuff
self.create_runner()
# Set the effective category for each job
self.set_effective_categories()
# If we haven't resumed then do some one-time initialization
if not resumed:
# Store the application-identifying meta-data and checkpoint the
# session.
self.store_application_metadata()
self.metadata.flags.add(SessionMetaData.FLAG_INCOMPLETE)
self.manager.checkpoint()
# Select all the jobs that we are likely to run. This is the
# initial selection as we haven't started any jobs yet. Local jobs
# will cause that to happen again.
self.do_initial_job_selection()
# Print out our estimates
self.print_estimated_duration()
# Maybe ask the secure launcher to prompt for the password now. This is
# imperfect as we are going to run local jobs and we cannot see if they
# might need root or not. This cannot be fixed before template jobs are
# added and local jobs deprecated and removed (at least not being a
# part of the session we want to execute).
self.maybe_warm_up_authentication()
# Iterate through the run list and run jobs if possible. This function
# also implements backtrack to run new jobs that were added (and
# selected) at runtime. When it exits all the jobs on the run list have
# a result.
self.run_all_selected_jobs()
self.metadata.flags.remove(SessionMetaData.FLAG_INCOMPLETE)
self.manager.checkpoint()
# Export the result of the session and pass it to the transport to
# finish the test run.
self.export_and_send_results()
self.metadata.flags.add(SessionMetaData.FLAG_SUBMITTED)
self.manager.checkpoint()
# FIXME: sensible return value
return 0
def maybe_resume_session(self):
# Try to use the first session that can be resumed if the user agrees
resume_storage_list = self.get_resume_candidates()
resume_storage = None
resumed = False
if resume_storage_list:
print(self.C.header(_("Resume Incomplete Session")))
print(ngettext(
"There is {0} incomplete session that might be resumed",
"There are {0} incomplete sessions that might be resumed",
len(resume_storage_list)
).format(len(resume_storage_list)))
for resume_storage in resume_storage_list:
# Skip sessions that the user doesn't want to resume
cmd = self._pick_action_cmd([
Action('r', _("resume this session"), 'resume'),
Action('n', _("next session"), 'next'),
Action('c', _("create new session"), 'create')
], _("Do you want to resume session {0!a}?").format(
resume_storage.id))
if cmd == 'resume':
pass
elif cmd == 'next':
continue
elif cmd == 'create' or cmd is None:
self.create_manager(None)
break
# Skip sessions that cannot be resumed
try:
self.create_manager(resume_storage)
except SessionResumeError:
cmd = self._pick_action_cmd([
Action('i', _("ignore this problem"), 'ignore'),
Action('e', _("erase this session"), 'erase')])
if cmd == 'erase':
resume_storage.remove()
print(_("Session removed"))
continue
else:
resumed = True
# If we resumed maybe not rerun the same, probably broken job
if resume_storage is not None:
self.handle_last_job_after_resume()
# Finally ignore other sessions that can be resumed
break
else:
if resume_storage is not None and not self.ask_for_new_session():
# TRANSLATORS: This is the exit message
raise SystemExit(_("Session not resumed"))
# Create a fresh session if nothing got resumed
self.create_manager(None)
return resumed
def _print_output_format_list(self, ns):
print(_("Available output formats:"))
for id, exporter in self.manager.exporter_map.items():
print("{} - {}".format(id, exporter.summary))
def _print_output_option_list(self, ns):
print(_("Each format may support a different set of options"))
for name, exporter in self.manager.exporter_map.items():
print("{}: {}".format(
name, ", ".join(exporter.exporter_cls.supported_option_list)))
def _print_transport_list(self, ns):
print(_("Available transports: {}").format(
', '.join(get_all_transports())))
def get_resume_candidates(self):
"""
Look at all of the suspended sessions and pick a list of candidates
that could be used to resume the session now.
"""
storage_list = []
for storage in SessionStorageRepository().get_storage_list():
data = storage.load_checkpoint()
if len(data) == 0:
continue
try:
metadata = SessionPeekHelper().peek(data)
except SessionResumeError as exc:
logger.warning(_("Corrupted session %s: %s"), storage.id, exc)
else:
if (metadata.app_id == self.expected_app_id
and metadata.title == self.expected_session_title
and SessionMetaData.FLAG_INCOMPLETE in metadata.flags):
storage_list.append(storage)
return storage_list
def ask_for_confirmation(self, message):
return self._pick_action_cmd([
Action('y', _("yes"), True),
Action('n', _("no"), False)
], message)
def ask_for_new_session(self):
return self.ask_for_confirmation(
_("Do you want to start a new session?"))
def handle_last_job_after_resume(self):
last_job = self.metadata.running_job_name
if last_job is None:
return
print(_("Previous session run tried to execute job: {}").format(
last_job))
cmd = self._pick_action_cmd([
Action('s', _("skip that job"), 'skip'),
Action('p', _("mark it as passed and continue"), 'pass'),
Action('f', _("mark it as failed and continue"), 'fail'),
Action('r', _("run it again"), 'run'),
], _("What do you want to do with that job?"))
if cmd == 'skip' or cmd is None:
result = MemoryJobResult({
'outcome': IJobResult.OUTCOME_SKIP,
'comments': _("Skipped after resuming execution")
})
elif cmd == 'pass':
result = MemoryJobResult({
'outcome': IJobResult.OUTCOME_PASS,
'comments': _("Passed after resuming execution")
})
elif cmd == 'fail':
result = MemoryJobResult({
'outcome': IJobResult.OUTCOME_FAIL,
'comments': _("Failed after resuming execution")
})
elif cmd == 'run':
result = None
if result:
self.state.update_job_result(
self.state.job_state_map[last_job].job, result)
self.metadata.running_job_name = None
self.manager.checkpoint()
def create_exporter(self):
"""
Create the ISessionStateExporter based on the command line options
This sets the attr:`_exporter`.
"""
if self.ns.output_options:
option_list = self.ns.output_options.split(',')
else:
option_list = None
self._exporter = self.manager.create_exporter(
self.ns.output_format, option_list)
def create_transport(self):
"""
Create the ISessionStateTransport based on the command line options
This sets the attr:`_transport`.
"""
if self.ns.transport is None:
return
# XXX: perhaps we should be more vocal about it?
if self.ns.transport not in get_all_transports():
logger.error("The selected transport %r is not available",
self.ns.transport)
return
transport_cls = get_all_transports()[self.ns.transport]
try:
self._transport = transport_cls(
self.ns.transport_where, self.ns.transport_options)
except ValueError as exc:
raise SystemExit(str(exc))
def create_manager(self, storage):
"""
Create or resume a session that handles most of the stuff needed to run
jobs.
This sets the attr:`_manager` which enables :meth:`manager`,
:meth:`state` and :meth:`storage` properties.
The created session state has the on_job_added signal connected to
:meth:`on_job_added()`.
:raises SessionResumeError:
If the session cannot be resumed for any reason.
"""
all_units = list(
itertools.chain(*[p.unit_list for p in self.provider_list]))
try:
if storage is not None:
self._manager = SessionManager.load_session(all_units, storage)
else:
self._manager = SessionManager.create_with_unit_list(all_units)
except DependencyDuplicateError as exc:
# Handle possible DependencyDuplicateError that can happen if
# someone is using plainbox for job development.
print(self.C.RED(
_("The job database you are currently using is broken")))
print(self.C.RED(
_("At least two jobs contend for the id {0}").format(
exc.job.id)))
print(self.C.RED(
_("First job defined in: {0}").format(exc.job.origin)))
print(self.C.RED(
_("Second job defined in: {0}").format(
exc.duplicate_job.origin)))
raise SystemExit(exc)
except SessionResumeError as exc:
print(self.C.RED(exc))
print(self.C.RED(_("This session cannot be resumed")))
raise
else:
# Connect the on_job_added signal. We use it to mark the test loop
# for re-execution and to update the list of desired jobs.
self.state.on_job_added.connect(self.on_job_added)
def create_runner(self):
"""
Create a job runner.
This sets the attr:`_runner` which enables :meth:`runner` property.
Requires the manager to be created (we need the storage object)
"""
self._runner = JobRunner(
self.storage.location, self.provider_list,
# TODO: tie this with well-known-dirs helper
os.path.join(self.storage.location, 'io-logs'),
command_io_delegate=self, dry_run=self.ns.dry_run)
def store_application_metadata(self):
"""
Store application meta-data (app_id, app_blob) and session title
"""
self.metadata.title = self.expected_session_title
self.metadata.app_id = self.expected_app_id
self.metadata.app_blob = b''
@property
def expected_app_id(self):
return 'plainbox'
@property
def expected_session_title(self):
return " ".join([os.path.basename(sys.argv[0])] + sys.argv[1:])
def find_test_plan(self):
# This is using getattr because the code is shared with checkbox-ng
# that doesn't support the same set of command line options.
test_plan_id = getattr(self.ns, "test_plan", None)
if test_plan_id is None:
return
for provider in self.provider_list:
for unit in provider.id_map[test_plan_id]:
if unit.Meta.name == 'test plan':
return unit
def set_effective_categories(self):
if self._test_plan is None:
return
ecm = self._test_plan.get_effective_category_map(self.state.job_list)
for job_id, effective_category_id in ecm.items():
job_state = self.state.job_state_map[job_id]
job_state.effective_category_id = effective_category_id
def do_initial_job_selection(self):
"""
Compute the initial list of desired jobs
"""
# Compute the desired job list, this can give us notification about
# problems in the selected jobs. Currently we just display each problem
desired_job_list = self._get_matching_job_list(
self.ns, self.state.job_list)
print(self.C.header(_("Analyzing Jobs")))
self._update_desired_job_list(desired_job_list)
# Search each provider for the desired test plan
if self.ns.test_plan is not None:
# TODO: add high-level unit lookup functions
for provider in self.provider_list:
for unit in provider.id_map.get(self.ns.test_plan, ()):
if unit.Meta.name == 'test plan':
self.manager.test_plans = (unit,)
break
def maybe_warm_up_authentication(self):
"""
Ask the password before anything else in order to run jobs requiring
privileges
"""
warm_up_list = self.runner.get_warm_up_sequence(self.state.run_list)
if warm_up_list:
print(self.C.header(_("Authentication")))
for warm_up_func in warm_up_list:
warm_up_func()
def run_all_selected_jobs(self):
"""
Run all jobs according to the run list.
"""
print(self.C.header(_("Running Selected Jobs")))
self._backtrack_and_run_missing = True
while self._backtrack_and_run_missing:
self._backtrack_and_run_missing = False
jobs_to_run = []
estimated_time = 0
# gather jobs that we want to run and skip the jobs that already
# have result, this is only needed when we run over the list of
# jobs again, after discovering new jobs via the local job output
for job in self.state.run_list:
job_state = self.state.job_state_map[job.id]
if job_state.result.outcome is None:
jobs_to_run.append(job)
if (job.estimated_duration is not None
and estimated_time is not None):
estimated_time += job.estimated_duration
else:
estimated_time = None
for job_no, job in enumerate(jobs_to_run, start=1):
print(self.C.header(
_('Running job {} / {}. Estimated time left: {}').format(
job_no, len(jobs_to_run),
seconds_to_human_duration(max(0, estimated_time))
if estimated_time is not None else _("unknown")),
fill='-'))
self.run_single_job(job)
if (job.estimated_duration is not None
and estimated_time is not None):
estimated_time -= job.estimated_duration
def run_single_job(self, job):
self.run_single_job_with_ui(job, self.get_ui_for_job(job))
def get_ui_for_job(self, job):
if self.ns.dont_suppress_output is False and job.plugin in (
'local', 'resource', 'attachment'):
return NormalUI(self.C.c, show_cmd_output=False)
else:
return NormalUI(self.C.c, show_cmd_output=True)
def run_single_job_with_ui(self, job, ui):
job_start_time = time.time()
job_state = self.state.job_state_map[job.id]
ui.considering_job(job, job_state)
if job_state.can_start():
ui.about_to_start_running(job, job_state)
self.metadata.running_job_name = job.id
self.manager.checkpoint()
ui.started_running(job, job_state)
result_builder = self._run_single_job_with_ui_loop(
job, job_state, ui)
assert result_builder is not None
result_builder.execution_duration = time.time() - job_start_time
job_result = result_builder.get_result()
self.metadata.running_job_name = None
self.manager.checkpoint()
ui.finished_running(job, job_state, job_result)
else:
result_builder = JobResultBuilder(
outcome=IJobResult.OUTCOME_NOT_SUPPORTED,
comments=job_state.get_readiness_description(),
execution_duration=time.time() - job_start_time)
job_result = result_builder.get_result()
ui.job_cannot_start(job, job_state, job_result)
self.state.update_job_result(job, job_result)
ui.finished(job, job_state, job_result)
def _run_single_job_with_ui_loop(self, job, job_state, ui):
comments = ""
while True:
if job.plugin in ('user-interact', 'user-interact-verify',
'user-verify', 'manual'):
ui.notify_about_purpose(job)
if (self.is_interactive and
job.plugin in ('user-interact',
'user-interact-verify',
'manual')):
ui.notify_about_steps(job)
if job.plugin == 'manual':
cmd = 'run'
else :
cmd = ui.wait_for_interaction_prompt(job)
if cmd == 'run' or cmd is None:
result_builder = self.runner.run_job(
job, job_state, self.config, ui
).get_builder()
elif cmd == 'comment':
new_comment = input(self.C.BLUE(
_('Please enter your comments:') + '\n'))
if new_comment:
comments += new_comment + '\n'
continue
elif cmd == 'skip':
result_builder = JobResultBuilder(
outcome=IJobResult.OUTCOME_SKIP,
comments=_("Explicitly skipped before"
" execution"))
if comments != "":
result_builder.comments = comments
break
elif cmd == 'quit':
raise SystemExit()
else:
result_builder = self.runner.run_job(
job, job_state, self.config, ui
).get_builder()
else:
if 'noreturn' in job.get_flag_set():
ui.noreturn_job()
result_builder = self.runner.run_job(
job, job_state, self.config, ui
).get_builder()
if (self.is_interactive and
result_builder.outcome == IJobResult.OUTCOME_UNDECIDED):
try:
if comments != "":
result_builder.comments = comments
ui.notify_about_verification(job)
self._interaction_callback(
self.runner, job, result_builder, self.config)
except ReRunJob:
continue
break
return result_builder
def export_and_send_results(self):
# Get a stream with exported session data.
exported_stream = io.BytesIO()
self.exporter.dump_from_session_manager(self.manager, exported_stream)
exported_stream.seek(0) # Need to rewind the file, puagh
# Write the stream to file if requested
self._save_results(self.ns.output_file, exported_stream)
# Invoke the transport?
if self.transport is not None:
exported_stream.seek(0)
try:
self._transport.send(
exported_stream.read(), self.config, self.state)
except TransportError as exc:
print(str(exc))
def _save_results(self, output_file, input_stream):
if output_file is sys.stdout:
print(self.C.header(_("Results")))
# This requires a bit more finesse, as exporters output bytes
# and stdout needs a string.
translating_stream = ByteStringStreamTranslator(
output_file, "utf-8")
copyfileobj(input_stream, translating_stream)
else:
print(_("Saving results to {}").format(output_file.name))
copyfileobj(input_stream, output_file)
if output_file is not sys.stdout:
output_file.close()
def _pick_action_cmd(self, action_list, prompt=None):
return ActionUI(action_list, prompt, self._color).run()
def _interaction_callback(self, runner, job, result_builder, config,
prompt=None, allowed_outcome=None):
result = result_builder.get_result()
if prompt is None:
prompt = _("Select an outcome or an action: ")
if allowed_outcome is None:
allowed_outcome = [IJobResult.OUTCOME_PASS,
IJobResult.OUTCOME_FAIL,
IJobResult.OUTCOME_SKIP]
allowed_actions = [
Action('c', _('add a comment'), 'set-comments')
]
if IJobResult.OUTCOME_PASS in allowed_outcome:
allowed_actions.append(
Action('p', _('set outcome to {0}').format(
self.C.GREEN(C_('set outcome to <pass>', 'pass'))),
'set-pass'))
if IJobResult.OUTCOME_FAIL in allowed_outcome:
allowed_actions.append(
Action('f', _('set outcome to {0}').format(
self.C.RED(C_('set outcome to <fail>', 'fail'))),
'set-fail'))
if IJobResult.OUTCOME_SKIP in allowed_outcome:
allowed_actions.append(
Action('s', _('set outcome to {0}').format(
self.C.YELLOW(C_('set outcome to <skip>', 'skip'))),
'set-skip'))
if job.command is not None:
allowed_actions.append(
Action('r', _('re-run this job'), 're-run'))
if result.return_code is not None:
if result.return_code == 0:
suggested_outcome = IJobResult.OUTCOME_PASS
else:
suggested_outcome = IJobResult.OUTCOME_FAIL
allowed_actions.append(
Action('', _('set suggested outcome [{0}]').format(
tr_outcome(suggested_outcome)), 'set-suggested'))
while result.outcome not in allowed_outcome:
print(_("Please decide what to do next:"))
print(" " + _("outcome") + ": {0}".format(
self.C.result(result)))
if result.comments is None:
print(" " + _("comments") + ": {0}".format(
C_("none comment", "none")))
else:
print(" " + _("comments") + ": {0}".format(
self.C.CYAN(result.comments, bright=False)))
cmd = self._pick_action_cmd(allowed_actions)
# let's store new_comment early for verification if comment has
# already been added in the current UI step
new_comment = ''
if cmd == 'set-pass':
result_builder.outcome = IJobResult.OUTCOME_PASS
elif cmd == 'set-fail':
if 'explicit-fail' in job.get_flag_set() and not new_comment:
new_comment = input(self.C.BLUE(
_('Please enter your comments:') + '\n'))
if new_comment:
result_builder.add_comment(new_comment)
result_builder.outcome = IJobResult.OUTCOME_FAIL
elif cmd == 'set-skip' or cmd is None:
result_builder.outcome = IJobResult.OUTCOME_SKIP
elif cmd == 'set-suggested':
result_builder.outcome = suggested_outcome
elif cmd == 'set-comments':
new_comment = input(self.C.BLUE(
_('Please enter your comments:') + '\n'))
if new_comment:
result_builder.add_comment(new_comment)
elif cmd == 're-run':
raise ReRunJob
result = result_builder.get_result()
def _update_desired_job_list(self, desired_job_list):
problem_list = self.state.update_desired_job_list(desired_job_list)
if problem_list:
print(self.C.header(_("Warning"), 'YELLOW'))
print(_("There were some problems with the selected jobs"))
for problem in problem_list:
print(" * {}".format(problem))
print(_("Problematic jobs will not be considered"))
def print_estimated_duration(self):
print(self.C.header(_("Session Statistics")))
print(_("This session is about {0:.2f}{percent} complete").format(
self.get_completion_ratio() * 100, percent='%'))
(estimated_duration_auto,
estimated_duration_manual) = self.state.get_estimated_duration()
if estimated_duration_auto:
print(_("Estimated duration is {:.2f} for automated jobs.").format(
estimated_duration_auto))
else:
print(_(
"Estimated duration cannot be determined for automated jobs."))
if estimated_duration_manual:
print(_("Estimated duration is {:.2f} for manual jobs.").format(
estimated_duration_manual))
else:
print(_(
"Estimated duration cannot be determined for manual jobs."))
print(_("Size of the desired job list: {0}").format(
len(self.state.desired_job_list)))
print(_("Size of the effective execution plan: {0}").format(
len(self.state.run_list)))
def get_completion_ratio(self):
total_cnt = len(self.state.run_list)
total_time = 0
done_cnt = 0
done_time = 0
time_reliable = True
for job in self.state.run_list:
inc = job.estimated_duration
if inc is None:
time_reliable = False
continue
total_time += inc
if self.state.job_state_map[job.id].result.outcome is not None:
done_cnt += 1
done_time += inc
if time_reliable:
if total_time == 0:
return 0
else:
return done_time / total_time
else:
if total_cnt == 0:
return 0
else:
return done_cnt / total_cnt
def on_job_added(self, job):
"""
Handler connected to SessionState.on_job_added()
The goal of this handler is to re-select all desired jobs (based on
original command line arguments and new list of known jobs) and set the
backtrack_and_run_missing flag that is observed by
_run_all_selected_jobs()
"""
new_matching_job_list = self._get_matching_job_list(
self.ns, self.state.job_list)
self._update_desired_job_list(new_matching_job_list)
if self._test_plan is not None:
job_state = self.state.job_state_map[job.id]
job_state.effective_category_id = (
self._test_plan.get_effective_category(job))
self._backtrack_and_run_missing = True
def seconds_to_human_duration(seconds: float) -> str:
""" Convert ammount of seconds to human readable duration string. """
delta = datetime.timedelta(seconds=round(seconds))
return str(delta)
|