X Tutup
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@
To enable all resources except one, use '-uall,-<resource>'. For
example, to run all the tests except for the gui tests, give the
option '-uall,-gui'.

--matchfile filters tests using a text file, one pattern per line.
Pattern examples:

- test method: test_stat_attributes
- test class: FileTests
- test identifier: test_os.FileTests.test_stat_attributes
"""


Expand Down Expand Up @@ -189,8 +196,12 @@ def _create_parser():
help='single step through a set of tests.' +
more_details)
group.add_argument('-m', '--match', metavar='PAT',
dest='match_tests',
dest='match_tests', action='append',
help='match test cases and methods with glob pattern PAT')
group.add_argument('--matchfile', metavar='FILENAME',
dest='match_filename',
help='similar to --match but get patterns from a '
'text file, one pattern per line')
group.add_argument('-G', '--failfast', action='store_true',
help='fail as soon as a test fails (only with -v or -W)')
group.add_argument('-u', '--use', metavar='RES1,RES2,...',
Expand Down Expand Up @@ -239,6 +250,9 @@ def _create_parser():
group.add_argument('--list-tests', action='store_true',
help="only write the name of tests that will be run, "
"don't execute them")
group.add_argument('--list-cases', action='store_true',
help='only write the name of test cases that will be run'
' , don\'t execute them')
group.add_argument('-P', '--pgo', dest='pgo', action='store_true',
help='enable Profile Guided Optimization training')

Expand Down Expand Up @@ -343,10 +357,19 @@ def _parse_args(args, **kwargs):
ns.use_resources.append(r)
if ns.random_seed is not None:
ns.randomize = True
if ns.verbose:
ns.header = True
if ns.huntrleaks and ns.verbose3:
ns.verbose3 = False
print("WARNING: Disable --verbose3 because it's incompatible with "
"--huntrleaks: see http://bugs.python.org/issue27103",
file=sys.stderr)
if ns.match_filename:
if ns.match_tests is None:
ns.match_tests = []
filename = os.path.join(support.SAVEDCWD, ns.match_filename)
with open(filename) as fp:
for line in fp:
ns.match_tests.append(line.strip())

return ns
119 changes: 80 additions & 39 deletions Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import tempfile
import textwrap
import time
import unittest
from test.libregrtest.cmdline import _parse_args
from test.libregrtest.runtest import (
findtests, runtest,
findtests, runtest, get_abs_module,
STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
INTERRUPTED, CHILD_ERROR,
PROGRESS_MIN_TIME, format_test_result)
Expand All @@ -28,7 +29,13 @@
# to keep the test files in a subfolder. This eases the cleanup of leftover
# files using the "make distclean" command.
if sysconfig.is_python_build():
TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
TEMPDIR = sysconfig.get_config_var('abs_builddir')
if TEMPDIR is None:
# bpo-30284: On Windows, only srcdir is available. Using abs_builddir
# mostly matters on UNIX when building Python out of the source tree,
# especially when the source tree is read only.
TEMPDIR = sysconfig.get_config_var('srcdir')
TEMPDIR = os.path.join(TEMPDIR, 'build')
else:
TEMPDIR = tempfile.gettempdir()
TEMPDIR = os.path.abspath(TEMPDIR)
Expand Down Expand Up @@ -107,7 +114,7 @@ def accumulate_result(self, test, result):
self.test_times.append((test_time, test))
if ok == PASSED:
self.good.append(test)
elif ok == FAILED:
elif ok in (FAILED, CHILD_ERROR):
self.bad.append(test)
elif ok == ENV_CHANGED:
self.environment_changed.append(test)
Expand All @@ -116,22 +123,28 @@ def accumulate_result(self, test, result):
elif ok == RESOURCE_DENIED:
self.skipped.append(test)
self.resource_denieds.append(test)
elif ok != INTERRUPTED:
raise ValueError("invalid test result: %r" % ok)

def display_progress(self, test_index, test):
if self.ns.quiet:
return

# "[ 51/405/1] test_tcl passed"
line = f"{test_index:{self.test_count_width}}{self.test_count}"
if self.bad and not self.ns.pgo:
fmt = "{time} [{test_index:{count_width}}{test_count}/{nbad}] {test_name}"
else:
fmt = "{time} [{test_index:{count_width}}{test_count}] {test_name}"
line = f"{line}/{len(self.bad)}"
line = f"[{line}] {test}"

# add the system load prefix: "load avg: 1.80 "
if hasattr(os, 'getloadavg'):
load_avg_1min = os.getloadavg()[0]
line = f"load avg: {load_avg_1min:.2f} {line}"

# add the timestamp prefix: "0:01:05 "
test_time = time.monotonic() - self.start_time
test_time = datetime.timedelta(seconds=int(test_time))
line = fmt.format(count_width=self.test_count_width,
test_index=test_index,
test_count=self.test_count,
nbad=len(self.bad),
test_name=test,
time=test_time)
line = f"{test_time} {line}"
print(line, flush=True)

def parse_args(self, kwargs):
Expand Down Expand Up @@ -179,19 +192,14 @@ def find_tests(self, tests):
self.tests = []
# regex to match 'test_builtin' in line:
# '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec'
regex = (r'^(?:[0-9]+:[0-9]+:[0-9]+ *)?'
r'(?:\[[0-9/ ]+\] *)?'
r'(test_[a-zA-Z0-9_]+)')
regex = re.compile(regex)
regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
for line in fp:
line = line.split('#', 1)[0]
line = line.strip()
if line.startswith('#'):
continue
match = regex.match(line)
if match is None:
continue
self.tests.append(match.group(1))
match = regex.search(line)
if match is not None:
self.tests.append(match.group())

removepy(self.tests)

Expand Down Expand Up @@ -241,6 +249,29 @@ def list_tests(self):
for name in self.selected:
print(name)

def _list_cases(self, suite):
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
self._list_cases(test)
elif isinstance(test, unittest.TestCase):
print(test.id())

def list_cases(self):
for test in self.selected:
abstest = get_abs_module(self.ns, test)
try:
suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
self._list_cases(suite)
except unittest.SkipTest:
self.skipped.append(test)

if self.skipped:
print(file=sys.stderr)
print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
printlist(self.skipped, file=sys.stderr)

def rerun_failed_tests(self):
self.ns.verbose = True
self.ns.failfast = False
Expand Down Expand Up @@ -381,23 +412,28 @@ def _test_forever(self, tests):
if self.bad:
return

def display_header(self):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== hash algorithm:", sys.hash_info.algorithm,
"64bit" if sys.maxsize > 2**32 else "32bit")
print("== cwd:", os.getcwd())
cpu_count = os.cpu_count()
if cpu_count:
print("== CPU count:", cpu_count)
print("== encodings: locale=%s, FS=%s"
% (locale.getpreferredencoding(False),
sys.getfilesystemencoding()))
print("Testing with flags:", sys.flags)

def run_tests(self):
# For a partial run, we do not need to clutter the output.
if (self.ns.verbose
or self.ns.header
or not (self.ns.pgo or self.ns.quiet or self.ns.single
or self.tests or self.ns.args)):
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("== ", platform.platform(aliased=True),
"%s-endian" % sys.byteorder)
print("== ", "hash algorithm:", sys.hash_info.algorithm,
"64bit" if sys.maxsize > 2**32 else "32bit")
print("== cwd:", os.getcwd())
print("== encodings: locale=%s, FS=%s"
% (locale.getpreferredencoding(False),
sys.getfilesystemencoding()))
print("Testing with flags:", sys.flags)
if (self.ns.header
or not(self.ns.pgo or self.ns.quiet or self.ns.single
or self.tests or self.ns.args)):
self.display_header()

if self.ns.randomize:
print("Using random seed", self.ns.random_seed)
Expand Down Expand Up @@ -487,6 +523,10 @@ def _main(self, tests, kwargs):
self.list_tests()
sys.exit(0)

if self.ns.list_cases:
self.list_cases()
sys.exit(0)

self.run_tests()
self.display_result()

Expand All @@ -513,7 +553,7 @@ def count(n, word):
return "%d %ss" % (n, word)


def printlist(x, width=70, indent=4):
def printlist(x, width=70, indent=4, file=None):
"""Print the elements of iterable x to stdout.

Optional arg width (default 70) is the maximum line length.
Expand All @@ -524,7 +564,8 @@ def printlist(x, width=70, indent=4):
blanks = ' ' * indent
# Print the sorted list: 'x' may be a '--random' list or a set()
print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
initial_indent=blanks, subsequent_indent=blanks))
initial_indent=blanks, subsequent_indent=blanks),
file=file)


def main(tests=None, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/libregrtest/refleak.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def dash_R(the_module, test, indirect_test, huntrleaks):
indirect_test()
alloc_after, rc_after, fd_after = dash_R_cleanup(fs, ps, pic, zdc,
abcs)
print('.', end='', flush=True)
print('.', end='', file=sys.stderr, flush=True)
if i >= nwarmup:
rc_deltas[i] = rc_after - rc_before
alloc_deltas[i] = alloc_after - alloc_before
Expand Down
14 changes: 9 additions & 5 deletions Lib/test/libregrtest/runtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
return stdtests + sorted(tests)


def get_abs_module(ns, test):
if test.startswith('test.') or ns.testdir:
return test
else:
# Always import it from the test package
return 'test.' + test


def runtest(ns, test):
"""Run a single test.

Expand Down Expand Up @@ -141,11 +149,7 @@ def runtest_inner(ns, test, display_failure=True):
test_time = 0.0
refleak = False # True if the test leaked references.
try:
if test.startswith('test.') or ns.testdir:
abstest = test
else:
# Always import it from the test package
abstest = 'test.' + test
abstest = get_abs_module(ns, test)
clear_caches()
with saved_test_environment(test, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
start_time = time.time()
Expand Down
11 changes: 5 additions & 6 deletions Lib/test/libregrtest/runtest_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def run_test_in_subprocess(testname, ns):
slaveargs = json.dumps(slaveargs)

cmd = [sys.executable, *support.args_from_interpreter_flags(),
'-X', 'faulthandler',
'-u', # Unbuffered stdout and stderr
'-m', 'test.regrtest',
'--slaveargs', slaveargs]
if ns.pgo:
Expand Down Expand Up @@ -124,13 +124,13 @@ def _runtest(self):
finally:
self.current_test = None

stdout, _, result = stdout.strip().rpartition("\n")
if retcode != 0:
result = (CHILD_ERROR, "Exit code %s" % retcode)
self.output.put((test, stdout.rstrip(), stderr.rstrip(),
result))
return True
return False

stdout, _, result = stdout.strip().rpartition("\n")
if not result:
self.output.put((None, None, None, None))
return True
Expand Down Expand Up @@ -203,6 +203,8 @@ def get_running(workers):
and test_time >= PROGRESS_MIN_TIME
and not regrtest.ns.pgo):
text += ' (%.0f sec)' % test_time
elif ok == CHILD_ERROR:
text = '%s (%s)' % (text, test_time)
running = get_running(workers)
if running and not regrtest.ns.pgo:
text += ' -- running: %s' % ', '.join(running)
Expand All @@ -216,9 +218,6 @@ def get_running(workers):

if result[0] == INTERRUPTED:
raise KeyboardInterrupt
if result[0] == CHILD_ERROR:
msg = "Child error on {}: {}".format(test, result[1])
raise Exception(msg)
test_index += 1
except KeyboardInterrupt:
regrtest.interrupted = True
Expand Down
10 changes: 8 additions & 2 deletions Lib/test/support/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1915,9 +1915,15 @@ def run_unittest(*classes):
def case_pred(test):
if match_tests is None:
return True
for name in test.id().split("."):
if fnmatch.fnmatchcase(name, match_tests):
test_id = test.id()

for match_test in match_tests:
if fnmatch.fnmatchcase(test_id, match_test):
return True

for name in test_id.split("."):
if fnmatch.fnmatchcase(name, match_test):
return True
return False
_filter_suite(suite, case_pred)
_run_suite(suite)
Expand Down
Loading
X Tutup