/usr/share/pyshared/dtest/core.py is in python-dtest 0.4.0-0ubuntu2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 | #!/usr/bin/python
#
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
============
Test Running
============
This module contains the run_test() function and the associated Queue
class, which together provide the functionality for executing tests in
a threaded manner while properly handling ordering implied by
dependencies. Output is specified by passing an instance of
DTestOutput to run().
If this file is executed directly, the main() function--which first
calls explore(), then returns the result of run()--is called, and its
return value will be passed to sys.exit(). Command line arguments are
also available, and the module can be executed by passing "-m
dtest.core" to the Python interpreter.
"""
import imp
from optparse import OptionParser
import os
import os.path
import sys
import traceback
from eventlet import spawn_n, monkey_patch
from eventlet.corolocal import local
from eventlet.event import Event
from eventlet.semaphore import Semaphore
from dtest import capture
from dtest.constants import *
from dtest.exceptions import DTestException
from dtest import test
# Default line width
DEF_LINEWIDTH = 78
# Current output for issuing status messages
_output = local()
class _DTestStatus(object):
"""
_DTestStatus
============
The _DTestStatus class is a stream look-alike class, an instance
of which implements the special ``dtest.status`` stream. Data
written to the stream will be passed to the status() method of the
current DTestOutput object. Thread-local data is used to store
the current DTestOutput object, so multiple output objects may be
safely used simultaneously.
"""
def write(self, msg):
"""
Emits ``msg`` as a status message to the current DTestOutput
object. This can be used to notify the user of the status of
a test which takes a long time to complete.
"""
# Write to the registered output
_output.out.status(_output.test, msg)
def flush(self):
"""
Provided for compatibility with normal output streams. Does
nothing; the DTestOutput object's status() method is assumed
to perform a flush after every call.
"""
pass
@property
def output(self):
"""
Retrieve the current DTestOutput object, which is stored in a
per-thread manner. This property is provided to allow the
status stream to be set up in threads started within the
individual tests. See the setup() method.
"""
# This is simple...
return _output.out
@property
def test(self):
"""
Retrieve the current test object, which is stored in a
per-thread manner. This property is provided to allow the
status stream to be set up in threads started within the
individual tests. See the setup() method.
"""
# Also simple...
return _output.test
def setup(self, output, test):
"""
Initializes the status stream within a new thread of control.
This routine should be called as the first action of a new
thread.
"""
# Set up thread-local data
_output.out = output
_output.test = test
# A stream for export
status = _DTestStatus()
class DTestOutput(object):
"""
DTestOutput
===========
The DTestOutput class is a utility class for grouping together all
output generation for the test framework. The ``output``
attribute contains a stream-like object to which output may be
sent, and defaults to sys.__stdout__ (note that sys.stdout may be
captured as the output of a test). The notify() method is called
whenever a test or test fixture transitions to an alternate state;
the result() method is called to output the results of a test; and
the summary() method is called to output a summary of the results
of a test. The default implementations of these methods send
their output to the stream in the ``output`` attribute, but each
may be overridden to perform alternate output. This could, for
instance, be used to display test framework output in a GUI or to
generate a web page.
"""
def __init__(self, output=sys.__stdout__, linewidth=DEF_LINEWIDTH):
"""
Initialize a DTestOutput object with the given ``output``
stream (defaults to sys.__stdout__) and linewidth.
"""
# Save the output and linewidth
self.output = output
self.linewidth = linewidth
def notify(self, test, state):
"""
Called when a test or test fixture, identified by ``test``,
transitions to ``state``. The default implementation ignores
state transitions by test fixtures or transitions to the
RUNNING state.
"""
# Are we interested in this test?
if not test.istest() or state == RUNNING:
return
# Determine the name of the test
name = str(test)
# Determine the width of the test name field
width = self.linewidth - len(state) - 1
# Truncate the name, if necessary
if len(name) > width:
name = name[:width - 3] + '...'
# Emit the status message
print >>self.output, "%-*s %s" % (width, name, state)
# Flush the output
self.output.flush()
def result(self, result, debug=False):
"""
Called at the end of a test run to emit ``result`` information
for a given test. Called once for each result. Should emit
all exception and captured output information, if any. Will
also be called for results from test fixtures, in order to
emit errors encountered while executing them. The default
implementation ignores results containing no messages, and
only emits results from successful tests if debug is True.
"""
# Helper for reporting output
def out_msg(msg, hdr=None):
# Output header information
if hdr:
print >>self.output, (hdr.center(self.linewidth) + "\n" +
('-' * self.linewidth))
# Output the test ID
if hasattr(msg, 'id'):
id_hdr = " (%s) " % msg.id
print >>self.output, id_hdr.center(self.linewidth, ':')
# Output exception information
if msg.exc_type is not None:
exc_hdr = ' Exception %s ' % msg.exc_type.__name__
tb = ''.join(traceback.format_exception(msg.exc_type,
msg.exc_value,
msg.exc_tb))
print >>self.output, exc_hdr.center(self.linewidth, '-')
print >>self.output, tb.rstrip()
# Format output data
for name, desc, value in msg.captured:
print >>self.output, (' %s ' % desc).center(self.linewidth,
'-')
print >>self.output, value.rstrip()
# Emit a closing line
print >>self.output, '-' * self.linewidth
# Skip results with no messages
if len(result) == 0:
return
# If it's successful or an expected failure, only emit
# messages if debug is True
if not debug and (result.state == OK or result.state == XFAIL):
return
# Emit a banner for the result
print >>self.output, ("\n" + ("=" * self.linewidth) + "\n" +
str(result.test).center(self.linewidth) + "\n" +
("=" * self.linewidth))
# Emit the data for each step
if PRE in result:
out_msg(result[PRE], 'Pre-test Fixture')
if TEST in result:
if result.multi:
for m in result[TEST]:
out_msg(m)
else:
out_msg(result[TEST])
if POST in result:
out_msg(result[POST], 'Post-test Fixture')
# Flush the output
self.output.flush()
def summary(self, counts):
"""
Called at the end of a test run to emit summary information
about the run. The ``counts`` argument is a dictionary
containing the following keys:
OK
The number of tests which passed. This includes the count of
unexpected passes (tests marked with the @failing decorator
which passed).
UOK
The number of tests which unexpectedly passed.
SKIPPED
The number of tests which were skipped in this test run.
FAIL
The number of tests which failed. This includes the count of
expected failures (tests marked with the @failing decorator
which failed).
XFAIL
The number of tests which failed, where failure was expected.
ERROR
The number of tests which experienced an error--an unexpected
exception thrown while executing the test.
DEPFAIL
The number of tests which could not be executed because tests
they were dependent on failed.
'total'
The total number of tests considered for execution.
'threads'
The maximum number of simultaneously executing threads
which were utilized while running tests.
Note that test fixtures are not included in these counts. If a
test fixture fails (raises an AssertionError) or raises any other
exception, all tests dependent on that test fixture will fail due
to dependencies.
"""
# Emit summary data
print >>self.output, ("%d tests run in %d max simultaneous threads" %
(counts['total'], counts['threads']))
if counts[OK] > 0:
unexp = ''
if counts[UOK] > 0:
unexp = ' (%d unexpected)' % counts[UOK]
print >>self.output, (" %d tests successful%s" %
(counts[OK], unexp))
if counts[SKIPPED] > 0:
print >>self.output, " %d tests skipped" % counts[SKIPPED]
if counts[FAIL] + counts[ERROR] + counts[DEPFAIL] > 0:
# Set up the breakdown
bd = []
total = 0
if counts[FAIL] > 0:
exp = ''
if counts[XFAIL] > 0:
exp = ' [%d expected]' % counts[XFAIL]
bd.append('%d failed%s' % (counts[FAIL], exp))
total += counts[FAIL]
if counts[ERROR] > 0:
bd.append('%d errors' % counts[ERROR])
total += counts[ERROR]
if counts[DEPFAIL] > 0:
bd.append('%d failed due to dependencies' % counts[DEPFAIL])
total += counts[DEPFAIL]
print >>self.output, (" %d tests failed (%s)" %
(total, ', '.join(bd)))
# Flush the output
self.output.flush()
def caught(self, exc_list):
"""
Called after emitting summary data to report any exceptions
encountered within the dtest framework itself while running
the test. The ``exc_list`` argument is a list of
three-element tuples. For each tuple, the first element is an
exception type; the second element is the exception value; and
the third element is a traceback object. Under most
circumstances, this function will not be called; if it is, the
exception data reported should be sent back to the dtest
framework developers.
"""
# Emit exception data
print >>self.output, "\nThe following exceptions were encountered:"
for exc_type, exc_value, tb in exc_list:
exc_hdr = ' Exception %s ' % exc_type.__name__
tb = ''.join(traceback.format_exception(exc_type, exc_value, tb))
print >>self.output, exc_hdr.center(self.linewidth, '-')
print >>self.output, tb.rstrip()
print >>self.output, '-' * self.linewidth
print >>self.output, ("Please report the above errors to the "
"developers of the dtest framework.")
# Flush the output
self.output.flush()
def imports(self, exc_list):
"""
Called by main() if import errors were encountered while
discovering tests. The ``exc_list`` argument is a list of
tuples containing three elements: the first element is the
full path to the file for which import was attempted; the
second element is the module path for which import was
attempted; and the third is a three-element tuple returned by
sys.exc_info().
"""
# Emit import error data
print >>self.output, "The following import errors were encountered:"
for path, pkgname, (exc_type, exc_value, tb) in exc_list:
exc_hdr = ' %s (%s) ' % (os.path.relpath(path), pkgname)
tb = ''.join(traceback.format_exception(exc_type, exc_value, tb))
print >>self.output, exc_hdr.center(self.linewidth, '-')
print >>self.output, tb.rstrip()
print >>self.output, ('-' * self.linewidth) + "\n"
# Flush the output
self.output.flush()
def info(self, message):
"""
Called to emit other specialized messages not specifically
categorized. Currently only used in the case of dependency
cycle detection. The ``message`` argument will be an
explanatory message.
"""
# Emit the message
print >>self.output, '\n' + message
# Flush the output
self.output.flush()
def status(self, dt, message):
"""
Called to emit status messages printed to the dtest.result
stream. The ``dt`` argument will be the test descriptor, and
the ``message`` argument will be a message string or, if using
the ``print`` statement, bare whitespace. The default
implementation ignores messages consisting of whitespace and
writes non-whitespace messages, prefixed with the short name
of ``dt``, to the ``self.output`` stream; the output stream
will be flushed to ensure the message is emitted.
"""
# Ignore messages composed only of whitespace...
if message.isspace():
return
# Get the short name of the test...
shname = str(dt)
if '.' in shname:
dummy, shname = shname.rsplit('.', 1)
# Emit the message
print >>self.output, "%s: %s" % (shname, message)
# Flush the output
self.output.flush()
class DTestQueue(object):
"""
DTestQueue
==========
The DTestQueue class maintains a queue of tests waiting to be run.
The constructor initializes the queue to an empty state and stores
a maximum simultaneous thread count ``maxth`` (None means
unlimited); a ``skip`` evaluation routine (defaults to testing the
``skip`` attribute of the test); and an instance of DTestOutput.
The list of all tests in the queue is maintained in the ``tests``
attribute; tests may be added to a queue with add_test() (for a
single test) or add_tests() (for a sequence of tests). The tests
in the queue may be run by invoking the run() method.
"""
def __init__(self, maxth=None, skip=lambda dt: dt.skip,
output=DTestOutput()):
"""
Initialize a DTestQueue. The ``maxth`` argument must be
either None or an integer specifying the maximum number of
simultaneous threads permitted. The ``skip`` arguments is
function references; it should take a test and return True if
the test should be skipped. The ``output`` argument should be
an instance of DTestOutput containing a notify() method, which
takes a test and the state to which it is transitioning, and
may use that information to emit a test result. Note that the
notify() method will receive state transitions to the RUNNING
state, as well as state transitions for test fixtures; callers
may find the DTestBase.istest() method useful for
differentiating between regular tests and test fixtures for
reporting purposes.
"""
# Save our maximum thread count
if maxth is None:
self.sem = None
else:
self.sem = Semaphore(maxth)
# Need to remember the skip routine
self.skip = skip
# Also remember the output
self.output = output
# Initialize the lists of tests
self.tests = set()
self.waiting = None
self.runlist = set()
# Need locks for the waiting and runlist lists
self.waitlock = Semaphore()
self.runlock = Semaphore()
# Set up some statistics...
self.th_count = 0
self.th_event = Event()
self.th_simul = 0
self.th_max = 0
# Place to keep any exceptions we encounter within dtest
# itself
self.caught = []
# We're not yet running
self.running = False
def add_test(self, tst):
"""
Add a test ``tst`` to the queue. Tests can be added multiple
times, but the test will only be run once.
"""
# Can't add a test if the queue is running
if self.running:
raise DTestException("Cannot add tests to a running queue.")
# First we need to get the test object
dt = test._gettest(tst)
# Add it to the set of tests
self.tests.add(dt)
def add_tests(self, tests):
"""
Add a sequence of tests ``tests`` to the queue. Tests can be
added multiple times, but the test will only be run once.
"""
# Can't add a test if the queue is running
if self.running:
raise DTestException("Cannot add tests to a running queue.")
# Run add_test() in a loop
for tst in tests:
self.add_test(tst)
def dot(self, grname='testdeps'):
"""
Constructs a GraphViz-compatible dependency graph with the
given name (``testdeps``, by default). Returns the graph as a
string. The graph can be fed to the ``dot`` tool to generate
a visualization of the dependency graph. Note that red nodes
in the graph indicate test fixtures, and red dashed edges
indicate dependencies associated with test fixtures. If the
node outline is dotted, that indicates that the test was
skipped in the most recent test run.
"""
# Helper to generate node and edge options
def mkopts(opts):
# If there are no options, return an empty string
if not opts:
return ''
# OK, let's do this...
return ' [' + ','.join(['%s="%s"' % (k, opts[k])
for k in opts]) + ']'
# Now, create the graph
nodes = []
edges = []
for dt in sorted(self.tests, key=lambda dt: str(dt)):
# Get the real test function
tfunc = dt.test
# Make the node
opts = dict(label=r'%s\n%s:%d' %
(dt, tfunc.func_code.co_filename,
tfunc.func_code.co_firstlineno))
if dt.state:
opts['label'] += r'\n(Result: %s)' % dt.state
if (dt.state == FAIL or dt.state == XFAIL or dt.state == ERROR or
dt.state == DEPFAIL):
opts['color'] = 'red'
elif isinstance(dt, test.DTestFixture):
opts['color'] = 'blue'
if dt.state == SKIPPED:
opts['style'] = 'dotted'
elif dt.state == DEPFAIL:
opts['style'] = 'dashed'
nodes.append('"%s"%s;' % (dt, mkopts(opts)))
# Make all the edges
for dep in sorted(dt.dependencies, key=lambda dt: str(dt)):
opts = {}
if (isinstance(dt, test.DTestFixture) or
isinstance(dep, test.DTestFixture)):
opts.update(dict(color='blue', style='dashed'))
if dt._partner is not None and dep == dt._partner:
opts['style'] = 'dotted'
edges.append('"%s" -> "%s"%s;' % (dt, dep, mkopts(opts)))
# Return a graph
return (('strict digraph "%s" {\n\t' % grname) +
'\n\t'.join(nodes) + '\n\n\t' + '\n\t'.join(edges) + '\n}')
def run(self, debug=False):
"""
Runs all tests that have been queued up. Does not return
until all tests have been run. Causes test results and
summary data to be emitted using the ``output`` object
registered when the queue was initialized.
"""
# Can't run an already running queue
if self.running:
raise DTestException("Queue is already running.")
# OK, put ourselves into the running state
self.running = True
# Must begin by ensuring we're monkey-patched
monkey_patch()
# OK, let's prepare all the tests...
for dt in self.tests:
dt._prepare()
# Second pass--determine which tests are being skipped
waiting = []
for dt in self.tests:
# Do we skip this one?
willskip = self.skip(dt)
# If not, check if it's a fixture with no dependencies...
if not willskip and not dt.istest():
if dt._partner is None:
if len(dt._revdeps) == 0:
willskip = True
else:
if len(dt._revdeps) == 1:
willskip = True
# OK, mark it skipped if we're skipping
if willskip:
dt._skipped(self.output)
else:
waiting.append(dt)
# OK, last pass: generate list of waiting tests; have to
# filter out SKIPPED tests
self.waiting = set([dt for dt in self.tests if dt.state != SKIPPED])
# Install the capture proxies...
if not debug:
capture.install()
# Spawn waiting tests
self._spawn(self.waiting)
# Wait for all tests to finish
if self.th_count > 0:
self.th_event.wait()
# OK, uninstall the capture proxies
if not debug:
capture.uninstall()
# Walk through the tests and output the results
cnt = {
OK: 0,
UOK: 0,
SKIPPED: 0,
FAIL: 0,
XFAIL: 0,
ERROR: 0,
DEPFAIL: 0,
'total': 0,
'threads': self.th_max,
}
for t in self.tests:
# Get the result object
r = t.result
# Update the counts
cnt[r.state] += int(r.test)
cnt['total'] += int(r.test)
# Special case update for unexpected OKs and expected failures
if r.state == UOK:
cnt[OK] += int(r.test)
elif r.state == XFAIL:
cnt[FAIL] += int(r.test)
try:
# Emit the result messages
self.output.result(r, debug)
except TypeError:
# Maybe the output object is written to the older
# standard?
self.output.result(r)
# Emit summary data
self.output.summary(cnt)
# If we saw exceptions, emit data about them
if self.caught:
self.output.caught(self.caught)
# We're done running; re-running should be legal
self.running = False
# Return False if there were any unexpected OKs, unexpected
# failures, errors, or dependency failures
if (cnt[UOK] > 0 or
(cnt[FAIL] - cnt[XFAIL]) > 0 or
cnt[ERROR] > 0 or cnt[DEPFAIL] > 0):
return False
# All tests passed!
return True
def _spawn(self, tests):
"""
Selects all ready tests from the set or list specified in
``tests`` and spawns threads to execute them. Note that the
maximum thread count restriction is implemented by having the
thread wait on the ``sem`` Semaphore after being spawned.
"""
# Work with a copy of the tests
tests = list(tests)
# Loop through the list
while tests:
# Pop off a test to consider
dt = tests.pop(0)
with self.waitlock:
# Is test waiting?
if dt not in self.waiting:
continue
# OK, check dependencies
elif dt._depcheck(self.output):
# No longer waiting
self.waiting.remove(dt)
# Place test on the run list
with self.runlock:
self.runlist.add(dt)
# Spawn the test
self.th_count += 1
spawn_n(self._run_test, dt)
# Dependencies failed; check if state changed and add
# its dependents if so
elif dt.state is not None:
# No longer waiting
self.waiting.remove(dt)
# Check all its dependents. Note--not trying to
# remove duplicates, because some formerly
# unrunnable tests may now be runnable because of
# the state change
tests.extend(list(dt.dependents))
def _run_test(self, dt):
"""
Execute ``dt``. This method is meant to be run in a new
thread.
Once a test is complete, the thread's dependents will be
passed back to the spawn() method, in order to pick up and
execute any tests that are now ready for execution.
"""
# Acquire the thread semaphore
if self.sem is not None:
self.sem.acquire()
# Increment the simultaneous thread count
self.th_simul += 1
if self.th_simul > self.th_max:
self.th_max = self.th_simul
# Save the output and test relative to this thread, for the
# status stream
status.setup(self.output, dt)
# Execute the test
try:
dt._run(self.output)
except:
# Add the exception to the caught list
self.caught.append(sys.exc_info())
# Manually transition the test to the ERROR state
dt._result._transition(ERROR, output=self.output)
# OK, done running the test; take it off the run list
with self.runlock:
self.runlist.remove(dt)
# Now, walk through its dependents and check readiness
self._spawn(dt.dependents)
# All right, we're done; release the semaphore
if self.sem is not None:
self.sem.release()
# Decrement the thread count
self.th_simul -= 1
self.th_count -= 1
# If thread count is now 0, signal the event
with self.waitlock:
if len(self.waiting) == 0 and self.th_count == 0:
self.th_event.send()
return
# If the run list is empty, that means we have a cycle
with self.runlock:
if len(self.runlist) == 0:
for dt2 in list(self.waiting):
# Manually transition to DEPFAIL
dt2._result._transition(DEPFAIL, output=self.output)
# Emit an error message to let the user know what
# happened
self.output.info("A dependency cycle was discovered. "
"Please examine the dependency graph "
"and correct the cycle. The --dot "
"option may be useful here.")
# Now, let's signal our event
self.th_event.send()
def explore(directory=None, queue=None):
"""
Explore ``directory`` (by default, the current working directory)
for all modules matching the test regular expression and import
them. Each module imported will be further explored for tests.
This function may be used to discover all registered tests prior
to running them. Returns a tuple; the first element is a set of
all discovered tests, and the second element is a list of tuples
containing information about all ImportError exceptions caught.
The elements of this exception information tuple are, in order, a
path, the module name, and a tuple of exception information as
returned by sys.exc_info().
"""
# If no queue is provided, allocate one with the default settings
if queue is None:
queue = DTestQueue()
# Set of all discovered tests
tests = set()
# List of all import exceptions
caught = []
# Need the allowable suffixes
suffixes = [sfx[0] for sfx in imp.get_suffixes()]
# Obtain the canonical directory name
if directory is None:
directory = os.getcwd()
else:
directory = os.path.abspath(directory)
# This is the directory we'll be searching
searchdir = directory
# But does it have an __init__.py?
pkgpath = None
for sfx in suffixes:
if os.path.exists(os.path.join(directory, '__init__' + sfx)):
# Refigure the directory
directory, pkgpath = os.path.split(directory)
# Now, let's jigger the import path
tmppath = sys.path
sys.path = [directory] + sys.path
# Import the package, if necessary
if pkgpath is not None:
try:
__import__(pkgpath)
test.visit_mod(sys.modules[pkgpath], tests)
except ImportError:
# Remember the exception we got
caught.append((searchdir, pkgpath, sys.exc_info()))
# Having done that, we now begin walking the directory tree
for root, dirs, files in os.walk(searchdir):
# Let's determine the module's package path
if root == directory:
pkgpath = ''
else:
sep = root[len(directory)]
subdir = root[len(directory) + 1:]
pkgpath = '.'.join(subdir.split(sep)) + '.'
# Start with files...
for f in files:
# Does it match the testRE?
if not test.testRE.match(f):
continue
# Only interested in files we can load
for sfx in suffixes:
if f.endswith(sfx):
modname = f[:-len(sfx)]
break
else:
# Can't load it, so skip it
continue
# Determine the module's full path
fullmodname = pkgpath + modname
# Let's try to import it
try:
__import__(fullmodname)
mod = sys.modules[fullmodname]
except ImportError:
# Remember the exception we got
caught.append((os.path.join(root, f), fullmodname,
sys.exc_info()))
# Can't import it, so move on
continue
test.visit_mod(mod, tests)
# Now we want to determine which subdirectories are packages;
# they'll contain __init__.py
subdirs = []
for d in dirs:
# Only interested in directories which contain __init__.py
for sfx in suffixes:
if os.path.exists(os.path.join(root, d, '__init__' + sfx)):
break
else:
# Not a package, so skip it
continue
# Does it match the testRE?
if not test.testRE.match(d):
# No, but let's continue exploring under it
subdirs.append(d)
continue
# Determine the package's full path
fullpkgname = pkgpath + d
# Let's try to import it
try:
__import__(fullpkgname)
pkg = sys.modules[fullpkgname]
except ImportError:
# Remember the exception we got
caught.append((os.path.join(root, d), fullpkgname,
sys.exc_info()))
# Can't import it, no point exploring under it
continue
test.visit_mod(pkg, tests)
# We also want to explore under it
subdirs.append(d)
# Make sure to set up our pruned subdirectory list
dirs[:] = subdirs
# We have finished loading all tests; restore the original import
# path
sys.path = tmppath
# Add the discovered tests to the queue
queue.add_tests(tests)
# Output the import errors, if any
if caught:
queue.output.imports(caught)
# Return the queue
return queue
def main(directory=None, maxth=None, skip=lambda dt: dt.skip,
output=DTestOutput(), dryrun=False, debug=False, dotpath=None):
"""
Discover tests under ``directory`` (by default, the current
directory), then run the tests under control of ``maxth``,
``skip``, and ``output`` (see the documentation for the run()
function for more information on these three parameters). Returns
True if all tests (with the exclusion of expected failures)
passed, or False if an unexpect OK, a failure, or an error was
encountered.
"""
# First, allocate a queue
queue = DTestQueue(maxth, skip, output)
# Next, discover the tests of interest
explore(directory, queue)
# Is this a dry run?
if not dryrun:
# Nope, execute the tests
result = queue.run(debug=debug)
else:
result = True
# Print out the names of the tests
print "Discovered tests:\n"
for dt in queue.tests:
if dt.istest():
print str(dt)
# Are we to dump the dependency graph?
if dotpath is not None:
with open(dotpath, 'w') as f:
print >>f, queue.dot()
# Now, let's return the result of the test run
return result
def optparser(*args, **kwargs):
"""
Builds and returns an option parser with the default options
recognized by the dtest framework. All arguments are passed to
the OptionParser constructor.
"""
# Set up an OptionParser
op = OptionParser(*args, **kwargs)
# Set up our default options
op.add_option("-d", "--directory",
action="store", type="string", dest="directory",
help="The directory to search for tests to run.")
op.add_option("-m", "--max-threads",
action="store", type="int", dest="maxth",
help="The maximum number of tests to run simultaneously; if "
"not specified, an unlimited number of tests may run "
"simultaneously.")
op.add_option("-s", "--skip",
action="store", type="string", dest="skip",
help="Specifies a rule to control which tests are skipped. "
"If value contains '=', tests having an attribute with the "
"given value will be skipped. If value does not contain "
"'=', tests that have the attribute will be skipped.")
op.add_option("--no-skip",
action="store_true", dest="noskip",
help="Specifies that no test should be skipped. Overrides "
"--skip, if specified.")
op.add_option("-n", "--dry-run",
action="store_true", dest="dryrun",
help="Performs a dry run. After discovering all tests, "
"the list of tests is printed to standard output.")
op.add_option("-D", "--debug",
action="store_true", dest="debug",
help="Enables debugging mode. Disables output capturing "
"for running tests, causing all output to be emitted "
"immediately.")
op.add_option("--dot",
action="store", type="string", dest="dotpath",
help="After running tests, a text representation of the "
"dependency graph is placed in the indicated file. This "
"file may then be passed to the \"dot\" tool of the "
"GraphViz package to visualize the dependency graph. "
"This option may be used in combination with \"-n\".")
# Return the OptionParser
return op
def opts_to_args(options):
"""
Converts an options object--as returned by calling the
parse_args() method of the return value from the optparser()
function--into a dictionary that can be fed to the main() function
to execute the desired test operation.
"""
# Build the arguments dictionary
args = {}
# Start with the skip-related arguments
if options.noskip is True:
args['skip'] = lambda dt: False
elif options.skip is not None:
if '=' in options.skip:
k, v = options.skip.split('=', 1)
args['skip'] = lambda dt: getattr(dt, k, None) == v
else:
args['skip'] = lambda dt: hasattr(dt, options.skip)
# Now look at max threads
if options.maxth is not None:
args['maxth'] = options.maxth
# Are we doing a dry run?
if options.dryrun is True:
args['dryrun'] = True
# Are we in debug mode?
if options.debug is True:
args['debug'] = True
# How about dumping the dependency graph?
if options.dotpath is not None:
args['dotpath'] = options.dotpath
# And, finally, directory
if options.directory is not None:
args['directory'] = options.directory
# Return the built arguments object
return args
if __name__ == '__main__':
# Obtain the options
opts = optparser(usage="%prog [options]")
# Process command-line arguments
(options, args) = opts.parse_args()
# Execute the test suite
sys.exit(not main(**opts_to_args(options)))
|