diff --git a/Lib/test/libregrtest/runtest.py b/Lib/test/libregrtest/runtest.py deleted file mode 100644 index 8b7844cb4b3bae..00000000000000 --- a/Lib/test/libregrtest/runtest.py +++ /dev/null @@ -1,577 +0,0 @@ -import dataclasses -import doctest -import faulthandler -import gc -import importlib -import io -import os -import sys -import time -import traceback -import unittest - -from test import support -from test.support import TestStats -from test.support import os_helper -from test.support import threading_helper -from test.libregrtest.cmdline import Namespace -from test.libregrtest.save_env import saved_test_environment -from test.libregrtest.utils import clear_caches, format_duration, print_warning - - -MatchTests = list[str] -MatchTestsDict = dict[str, MatchTests] - - -# Avoid enum.Enum to reduce the number of imports when tests are run -class State: - PASSED = "PASSED" - FAILED = "FAILED" - SKIPPED = "SKIPPED" - UNCAUGHT_EXC = "UNCAUGHT_EXC" - REFLEAK = "REFLEAK" - ENV_CHANGED = "ENV_CHANGED" - RESOURCE_DENIED = "RESOURCE_DENIED" - INTERRUPTED = "INTERRUPTED" - MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR" - DID_NOT_RUN = "DID_NOT_RUN" - TIMEOUT = "TIMEOUT" - - @staticmethod - def is_failed(state): - return state in { - State.FAILED, - State.UNCAUGHT_EXC, - State.REFLEAK, - State.MULTIPROCESSING_ERROR, - State.TIMEOUT} - - @staticmethod - def has_meaningful_duration(state): - # Consider that the duration is meaningless for these cases. - # For example, if a whole test file is skipped, its duration - # is unlikely to be the duration of executing its tests, - # but just the duration to execute code which skips the test. - return state not in { - State.SKIPPED, - State.RESOURCE_DENIED, - State.INTERRUPTED, - State.MULTIPROCESSING_ERROR, - State.DID_NOT_RUN} - - @staticmethod - def must_stop(state): - return state in { - State.INTERRUPTED, - State.MULTIPROCESSING_ERROR} - - -# gh-90681: When rerunning tests, we might need to rerun the whole -# class or module suite if some its life-cycle hooks fail. -# Test level hooks are not affected. -_TEST_LIFECYCLE_HOOKS = frozenset(( - 'setUpClass', 'tearDownClass', - 'setUpModule', 'tearDownModule', -)) - -def normalize_test_name(test_full_name, *, is_error=False): - short_name = test_full_name.split(" ")[0] - if is_error and short_name in _TEST_LIFECYCLE_HOOKS: - if test_full_name.startswith(('setUpModule (', 'tearDownModule (')): - # if setUpModule() or tearDownModule() failed, don't filter - # tests with the test file name, don't use use filters. - return None - - # This means that we have a failure in a life-cycle hook, - # we need to rerun the whole module or class suite. - # Basically the error looks like this: - # ERROR: setUpClass (test.test_reg_ex.RegTest) - # or - # ERROR: setUpModule (test.test_reg_ex) - # So, we need to parse the class / module name. - lpar = test_full_name.index('(') - rpar = test_full_name.index(')') - return test_full_name[lpar + 1: rpar].split('.')[-1] - return short_name - - -@dataclasses.dataclass(slots=True) -class TestResult: - test_name: str - state: str | None = None - # Test duration in seconds - duration: float | None = None - xml_data: list[str] | None = None - stats: TestStats | None = None - - # errors and failures copied from support.TestFailedWithDetails - errors: list[tuple[str, str]] | None = None - failures: list[tuple[str, str]] | None = None - - def is_failed(self, fail_env_changed: bool) -> bool: - if self.state == State.ENV_CHANGED: - return fail_env_changed - return State.is_failed(self.state) - - def _format_failed(self): - if self.errors and self.failures: - le = len(self.errors) - lf = len(self.failures) - error_s = "error" + ("s" if le > 1 else "") - failure_s = "failure" + ("s" if lf > 1 else "") - return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})" - - if self.errors: - le = len(self.errors) - error_s = "error" + ("s" if le > 1 else "") - return f"{self.test_name} failed ({le} {error_s})" - - if self.failures: - lf = len(self.failures) - failure_s = "failure" + ("s" if lf > 1 else "") - return f"{self.test_name} failed ({lf} {failure_s})" - - return f"{self.test_name} failed" - - def __str__(self) -> str: - match self.state: - case State.PASSED: - return f"{self.test_name} passed" - case State.FAILED: - return self._format_failed() - case State.SKIPPED: - return f"{self.test_name} skipped" - case State.UNCAUGHT_EXC: - return f"{self.test_name} failed (uncaught exception)" - case State.REFLEAK: - return f"{self.test_name} failed (reference leak)" - case State.ENV_CHANGED: - return f"{self.test_name} failed (env changed)" - case State.RESOURCE_DENIED: - return f"{self.test_name} skipped (resource denied)" - case State.INTERRUPTED: - return f"{self.test_name} interrupted" - case State.MULTIPROCESSING_ERROR: - return f"{self.test_name} process crashed" - case State.DID_NOT_RUN: - return f"{self.test_name} ran no tests" - case State.TIMEOUT: - return f"{self.test_name} timed out ({format_duration(self.duration)})" - case _: - raise ValueError("unknown result state: {state!r}") - - def has_meaningful_duration(self): - return State.has_meaningful_duration(self.state) - - def set_env_changed(self): - if self.state is None or self.state == State.PASSED: - self.state = State.ENV_CHANGED - - def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool: - if State.must_stop(self.state): - return True - if fail_fast and self.is_failed(fail_env_changed): - return True - return False - - def get_rerun_match_tests(self): - match_tests = [] - - errors = self.errors or [] - failures = self.failures or [] - for error_list, is_error in ( - (errors, True), - (failures, False), - ): - for full_name, *_ in error_list: - match_name = normalize_test_name(full_name, is_error=is_error) - if match_name is None: - # 'setUpModule (test.test_sys)': don't filter tests - return None - if not match_name: - error_type = "ERROR" if is_error else "FAIL" - print_warning(f"rerun failed to parse {error_type} test name: " - f"{full_name!r}: don't filter tests") - return None - match_tests.append(match_name) - - return match_tests - - -@dataclasses.dataclass(slots=True, frozen=True) -class RunTests: - tests: list[str] - match_tests: MatchTestsDict | None = None - rerun: bool = False - forever: bool = False - - def get_match_tests(self, test_name) -> MatchTests | None: - if self.match_tests is not None: - return self.match_tests.get(test_name, None) - else: - return None - - def iter_tests(self): - tests = tuple(self.tests) - if self.forever: - while True: - yield from tests - else: - yield from tests - - -# Minimum duration of a test to display its duration or to mention that -# the test is running in background -PROGRESS_MIN_TIME = 30.0 # seconds - -#If these test directories are encountered recurse into them and treat each -# test_ .py or dir as a separate test module. This can increase parallelism. -# Beware this can't generally be done for any directory with sub-tests as the -# __init__.py may do things which alter what tests are to be run. - -SPLITTESTDIRS = { - "test_asyncio", - "test_concurrent_futures", - "test_future_stmt", - "test_gdb", - "test_multiprocessing_fork", - "test_multiprocessing_forkserver", - "test_multiprocessing_spawn", -} - - -def findtestdir(path=None): - return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir - - -def findtests(*, testdir=None, exclude=(), - split_test_dirs=SPLITTESTDIRS, base_mod=""): - """Return a list of all applicable test modules.""" - testdir = findtestdir(testdir) - tests = [] - for name in os.listdir(testdir): - mod, ext = os.path.splitext(name) - if (not mod.startswith("test_")) or (mod in exclude): - continue - if mod in split_test_dirs: - subdir = os.path.join(testdir, mod) - mod = f"{base_mod or 'test'}.{mod}" - tests.extend(findtests(testdir=subdir, exclude=exclude, - split_test_dirs=split_test_dirs, base_mod=mod)) - elif ext in (".py", ""): - tests.append(f"{base_mod}.{mod}" if base_mod else mod) - return sorted(tests) - - -def split_test_packages(tests, *, testdir=None, exclude=(), - split_test_dirs=SPLITTESTDIRS): - testdir = findtestdir(testdir) - splitted = [] - for name in tests: - if name in split_test_dirs: - subdir = os.path.join(testdir, name) - splitted.extend(findtests(testdir=subdir, exclude=exclude, - split_test_dirs=split_test_dirs, - base_mod=name)) - else: - splitted.append(name) - return splitted - - -def abs_module_name(test_name: str, test_dir: str | None) -> str: - if test_name.startswith('test.') or test_dir: - return test_name - else: - # Import it from the test package - return 'test.' + test_name - - -def setup_support(ns: Namespace): - support.PGO = ns.pgo - support.PGO_EXTENDED = ns.pgo_extended - support.set_match_tests(ns.match_tests, ns.ignore_tests) - support.failfast = ns.failfast - support.verbose = ns.verbose - if ns.xmlpath: - support.junit_xml_list = [] - else: - support.junit_xml_list = None - - -def _runtest(result: TestResult, ns: Namespace) -> None: - # Capture stdout and stderr, set faulthandler timeout, - # and create JUnit XML report. - verbose = ns.verbose - output_on_failure = ns.verbose3 - timeout = ns.timeout - - use_timeout = ( - timeout is not None and threading_helper.can_start_thread - ) - if use_timeout: - faulthandler.dump_traceback_later(timeout, exit=True) - - try: - setup_support(ns) - - if output_on_failure: - support.verbose = True - - stream = io.StringIO() - orig_stdout = sys.stdout - orig_stderr = sys.stderr - print_warning = support.print_warning - orig_print_warnings_stderr = print_warning.orig_stderr - - output = None - try: - sys.stdout = stream - sys.stderr = stream - # print_warning() writes into the temporary stream to preserve - # messages order. If support.environment_altered becomes true, - # warnings will be written to sys.stderr below. - print_warning.orig_stderr = stream - - _runtest_env_changed_exc(result, ns, display_failure=False) - # Ignore output if the test passed successfully - if result.state != State.PASSED: - output = stream.getvalue() - finally: - sys.stdout = orig_stdout - sys.stderr = orig_stderr - print_warning.orig_stderr = orig_print_warnings_stderr - - if output is not None: - sys.stderr.write(output) - sys.stderr.flush() - else: - # Tell tests to be moderately quiet - support.verbose = verbose - _runtest_env_changed_exc(result, ns, display_failure=not verbose) - - xml_list = support.junit_xml_list - if xml_list: - import xml.etree.ElementTree as ET - result.xml_data = [ET.tostring(x).decode('us-ascii') - for x in xml_list] - finally: - if use_timeout: - faulthandler.cancel_dump_traceback_later() - support.junit_xml_list = None - - -def runtest(ns: Namespace, test_name: str) -> TestResult: - """Run a single test. - - ns -- regrtest namespace of options - test_name -- the name of the test - - Returns a TestResult. - - If ns.xmlpath is not None, xml_data is a list containing each - generated testsuite element. - """ - start_time = time.perf_counter() - result = TestResult(test_name) - try: - _runtest(result, ns) - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - result.state = State.UNCAUGHT_EXC - result.duration = time.perf_counter() - start_time - return result - - -def run_unittest(test_mod): - loader = unittest.TestLoader() - tests = loader.loadTestsFromModule(test_mod) - for error in loader.errors: - print(error, file=sys.stderr) - if loader.errors: - raise Exception("errors while loading tests") - return support.run_unittest(tests) - - -def save_env(ns: Namespace, test_name: str): - return saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) - - -def regrtest_runner(result, test_func, ns) -> None: - # Run test_func(), collect statistics, and detect reference and memory - # leaks. - if ns.huntrleaks: - from test.libregrtest.refleak import dash_R - refleak, test_result = dash_R(ns, result.test_name, test_func) - else: - test_result = test_func() - refleak = False - - if refleak: - result.state = State.REFLEAK - - match test_result: - case TestStats(): - stats = test_result - case unittest.TestResult(): - stats = TestStats.from_unittest(test_result) - case doctest.TestResults(): - stats = TestStats.from_doctest(test_result) - case None: - print_warning(f"{result.test_name} test runner returned None: {test_func}") - stats = None - case _: - print_warning(f"Unknown test result type: {type(test_result)}") - stats = None - - result.stats = stats - - -# Storage of uncollectable objects -FOUND_GARBAGE = [] - - -def _load_run_test(result: TestResult, ns: Namespace) -> None: - # Load the test function, run the test function. - module_name = abs_module_name(result.test_name, ns.testdir) - - # Remove the module from sys.module to reload it if it was already imported - sys.modules.pop(module_name, None) - - test_mod = importlib.import_module(module_name) - - if hasattr(test_mod, "test_main"): - # https://github.com/python/cpython/issues/89392 - raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest") - def test_func(): - return run_unittest(test_mod) - - try: - with save_env(ns, result.test_name): - regrtest_runner(result, test_func, ns) - finally: - # First kill any dangling references to open files etc. - # This can also issue some ResourceWarnings which would otherwise get - # triggered during the following test run, and possibly produce - # failures. - support.gc_collect() - - remove_testfn(result.test_name, ns.verbose) - - if gc.garbage: - support.environment_altered = True - print_warning(f"{result.test_name} created {len(gc.garbage)} " - f"uncollectable object(s)") - - # move the uncollectable objects somewhere, - # so we don't see them again - FOUND_GARBAGE.extend(gc.garbage) - gc.garbage.clear() - - support.reap_children() - - -def _runtest_env_changed_exc(result: TestResult, ns: Namespace, - display_failure: bool = True) -> None: - # Detect environment changes, handle exceptions. - - # Reset the environment_altered flag to detect if a test altered - # the environment - support.environment_altered = False - - if ns.pgo: - display_failure = False - - test_name = result.test_name - try: - clear_caches() - support.gc_collect() - - with save_env(ns, test_name): - _load_run_test(result, ns) - except support.ResourceDenied as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - result.state = State.RESOURCE_DENIED - return - except unittest.SkipTest as msg: - if not ns.quiet and not ns.pgo: - print(f"{test_name} skipped -- {msg}", flush=True) - result.state = State.SKIPPED - return - except support.TestFailedWithDetails as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - result.state = State.FAILED - result.errors = exc.errors - result.failures = exc.failures - result.stats = exc.stats - return - except support.TestFailed as exc: - msg = f"test {test_name} failed" - if display_failure: - msg = f"{msg} -- {exc}" - print(msg, file=sys.stderr, flush=True) - result.state = State.FAILED - result.stats = exc.stats - return - except support.TestDidNotRun: - result.state = State.DID_NOT_RUN - return - except KeyboardInterrupt: - print() - result.state = State.INTERRUPTED - return - except: - if not ns.pgo: - msg = traceback.format_exc() - print(f"test {test_name} crashed -- {msg}", - file=sys.stderr, flush=True) - result.state = State.UNCAUGHT_EXC - return - - if support.environment_altered: - result.set_env_changed() - # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) - if result.state is None: - result.state = State.PASSED - - -def remove_testfn(test_name: str, verbose: int) -> None: - # Try to clean up os_helper.TESTFN if left behind. - # - # While tests shouldn't leave any files or directories behind, when a test - # fails that can be tedious for it to arrange. The consequences can be - # especially nasty on Windows, since if a test leaves a file open, it - # cannot be deleted by name (while there's nothing we can do about that - # here either, we can display the name of the offending test, which is a - # real help). - name = os_helper.TESTFN - if not os.path.exists(name): - return - - if os.path.isdir(name): - import shutil - kind, nuker = "directory", shutil.rmtree - elif os.path.isfile(name): - kind, nuker = "file", os.unlink - else: - raise RuntimeError(f"os.path says {name!r} exists but is neither " - f"directory nor file") - - if verbose: - print_warning(f"{test_name} left behind {kind} {name!r}") - support.environment_altered = True - - try: - import stat - # fix possible permissions problems that might prevent cleanup - os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - nuker(name) - except Exception as exc: - print_warning(f"{test_name} left behind {kind} {name!r} " - f"and it couldn't be removed: {exc}") diff --git a/Lib/test/libregrtest/runtest_mp.py b/Lib/test/libregrtest/runtest_mp.py deleted file mode 100644 index 60089554cab5dd..00000000000000 --- a/Lib/test/libregrtest/runtest_mp.py +++ /dev/null @@ -1,631 +0,0 @@ -import dataclasses -import faulthandler -import json -import os.path -import queue -import signal -import subprocess -import sys -import tempfile -import threading -import time -import traceback -from typing import NamedTuple, NoReturn, Literal, Any, TextIO - -from test import support -from test.support import os_helper -from test.support import TestStats - -from test.libregrtest.cmdline import Namespace -from test.libregrtest.main import Regrtest -from test.libregrtest.runtest import ( - runtest, TestResult, State, PROGRESS_MIN_TIME, - MatchTests, RunTests) -from test.libregrtest.setup import setup_tests -from test.libregrtest.utils import format_duration, print_warning - -if sys.platform == 'win32': - import locale - - -# Display the running tests if nothing happened last N seconds -PROGRESS_UPDATE = 30.0 # seconds -assert PROGRESS_UPDATE >= PROGRESS_MIN_TIME - -# Kill the main process after 5 minutes. It is supposed to write an update -# every PROGRESS_UPDATE seconds. Tolerate 5 minutes for Python slowest -# buildbot workers. -MAIN_PROCESS_TIMEOUT = 5 * 60.0 -assert MAIN_PROCESS_TIMEOUT >= PROGRESS_UPDATE - -# Time to wait until a worker completes: should be immediate -JOIN_TIMEOUT = 30.0 # seconds - -USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg")) - - -@dataclasses.dataclass(slots=True) -class WorkerJob: - test_name: str - namespace: Namespace - rerun: bool = False - match_tests: MatchTests | None = None - - -class _EncodeWorkerJob(json.JSONEncoder): - def default(self, o: Any) -> dict[str, Any]: - match o: - case WorkerJob(): - result = dataclasses.asdict(o) - result["__worker_job__"] = True - return result - case Namespace(): - result = vars(o) - result["__namespace__"] = True - return result - case _: - return super().default(o) - - -def _decode_worker_job(d: dict[str, Any]) -> WorkerJob | dict[str, Any]: - if "__worker_job__" in d: - d.pop('__worker_job__') - return WorkerJob(**d) - if "__namespace__" in d: - d.pop('__namespace__') - return Namespace(**d) - else: - return d - - -def _parse_worker_args(worker_json: str) -> tuple[Namespace, str]: - return json.loads(worker_json, - object_hook=_decode_worker_job) - - -def run_test_in_subprocess(worker_job: WorkerJob, - output_file: TextIO, - tmp_dir: str | None = None) -> subprocess.Popen: - ns = worker_job.namespace - python = ns.python - worker_args = json.dumps(worker_job, cls=_EncodeWorkerJob) - - if python is not None: - executable = python - else: - executable = [sys.executable] - cmd = [*executable, *support.args_from_interpreter_flags(), - '-u', # Unbuffered stdout and stderr - '-m', 'test.regrtest', - '--worker-args', worker_args] - - env = dict(os.environ) - if tmp_dir is not None: - env['TMPDIR'] = tmp_dir - env['TEMP'] = tmp_dir - env['TMP'] = tmp_dir - - # Running the child from the same working directory as regrtest's original - # invocation ensures that TEMPDIR for the child is the same when - # sysconfig.is_python_build() is true. See issue 15300. - kw = dict( - env=env, - stdout=output_file, - # bpo-45410: Write stderr into stdout to keep messages order - stderr=output_file, - text=True, - close_fds=(os.name != 'nt'), - cwd=os_helper.SAVEDCWD, - ) - if USE_PROCESS_GROUP: - kw['start_new_session'] = True - return subprocess.Popen(cmd, **kw) - - -def run_tests_worker(worker_json: str) -> NoReturn: - worker_job = _parse_worker_args(worker_json) - ns = worker_job.namespace - test_name = worker_job.test_name - rerun = worker_job.rerun - match_tests = worker_job.match_tests - - setup_tests(ns) - - if rerun: - if match_tests: - matching = "matching: " + ", ".join(match_tests) - print(f"Re-running {test_name} in verbose mode ({matching})", flush=True) - else: - print(f"Re-running {test_name} in verbose mode", flush=True) - ns.verbose = True - - if match_tests is not None: - ns.match_tests = match_tests - - result = runtest(ns, test_name) - print() # Force a newline (just in case) - - # Serialize TestResult as dict in JSON - print(json.dumps(result, cls=EncodeTestResult), flush=True) - sys.exit(0) - - -# We do not use a generator so multiple threads can call next(). -class MultiprocessIterator: - - """A thread-safe iterator over tests for multiprocess mode.""" - - def __init__(self, tests_iter): - self.lock = threading.Lock() - self.tests_iter = tests_iter - - def __iter__(self): - return self - - def __next__(self): - with self.lock: - if self.tests_iter is None: - raise StopIteration - return next(self.tests_iter) - - def stop(self): - with self.lock: - self.tests_iter = None - - -class MultiprocessResult(NamedTuple): - result: TestResult - # bpo-45410: stderr is written into stdout to keep messages order - worker_stdout: str | None = None - err_msg: str | None = None - - -ExcStr = str -QueueOutput = tuple[Literal[False], MultiprocessResult] | tuple[Literal[True], ExcStr] - - -class ExitThread(Exception): - pass - - -class TestWorkerProcess(threading.Thread): - def __init__(self, worker_id: int, runner: "MultiprocessTestRunner") -> None: - super().__init__() - self.worker_id = worker_id - self.runtests = runner.runtests - self.pending = runner.pending - self.output = runner.output - self.ns = runner.ns - self.timeout = runner.worker_timeout - self.regrtest = runner.regrtest - self.rerun = runner.rerun - self.current_test_name = None - self.start_time = None - self._popen = None - self._killed = False - self._stopped = False - - def __repr__(self) -> str: - info = [f'TestWorkerProcess #{self.worker_id}'] - if self.is_alive(): - info.append("running") - else: - info.append('stopped') - test = self.current_test_name - if test: - info.append(f'test={test}') - popen = self._popen - if popen is not None: - dt = time.monotonic() - self.start_time - info.extend((f'pid={self._popen.pid}', - f'time={format_duration(dt)}')) - return '<%s>' % ' '.join(info) - - def _kill(self) -> None: - popen = self._popen - if popen is None: - return - - if self._killed: - return - self._killed = True - - if USE_PROCESS_GROUP: - what = f"{self} process group" - else: - what = f"{self}" - - print(f"Kill {what}", file=sys.stderr, flush=True) - try: - if USE_PROCESS_GROUP: - os.killpg(popen.pid, signal.SIGKILL) - else: - popen.kill() - except ProcessLookupError: - # popen.kill(): the process completed, the TestWorkerProcess thread - # read its exit status, but Popen.send_signal() read the returncode - # just before Popen.wait() set returncode. - pass - except OSError as exc: - print_warning(f"Failed to kill {what}: {exc!r}") - - def stop(self) -> None: - # Method called from a different thread to stop this thread - self._stopped = True - self._kill() - - def mp_result_error( - self, - test_result: TestResult, - stdout: str | None = None, - err_msg=None - ) -> MultiprocessResult: - return MultiprocessResult(test_result, stdout, err_msg) - - def _run_process(self, worker_job, output_file: TextIO, - tmp_dir: str | None = None) -> int: - self.current_test_name = worker_job.test_name - try: - popen = run_test_in_subprocess(worker_job, output_file, tmp_dir) - - self._killed = False - self._popen = popen - except: - self.current_test_name = None - raise - - try: - if self._stopped: - # If kill() has been called before self._popen is set, - # self._popen is still running. Call again kill() - # to ensure that the process is killed. - self._kill() - raise ExitThread - - try: - # gh-94026: stdout+stderr are written to tempfile - retcode = popen.wait(timeout=self.timeout) - assert retcode is not None - return retcode - except subprocess.TimeoutExpired: - if self._stopped: - # kill() has been called: communicate() fails on reading - # closed stdout - raise ExitThread - - # On timeout, kill the process - self._kill() - - # None means TIMEOUT for the caller - retcode = None - # bpo-38207: Don't attempt to call communicate() again: on it - # can hang until all child processes using stdout - # pipes completes. - except OSError: - if self._stopped: - # kill() has been called: communicate() fails - # on reading closed stdout - raise ExitThread - raise - except: - self._kill() - raise - finally: - self._wait_completed() - self._popen = None - self.current_test_name = None - - def _runtest(self, test_name: str) -> MultiprocessResult: - if sys.platform == 'win32': - # gh-95027: When stdout is not a TTY, Python uses the ANSI code - # page for the sys.stdout encoding. If the main process runs in a - # terminal, sys.stdout uses WindowsConsoleIO with UTF-8 encoding. - encoding = locale.getencoding() - else: - encoding = sys.stdout.encoding - - match_tests = self.runtests.get_match_tests(test_name) - - # gh-94026: Write stdout+stderr to a tempfile as workaround for - # non-blocking pipes on Emscripten with NodeJS. - with tempfile.TemporaryFile('w+', encoding=encoding) as stdout_file: - worker_job = WorkerJob(test_name, - namespace=self.ns, - rerun=self.rerun, - match_tests=match_tests) - # gh-93353: Check for leaked temporary files in the parent process, - # since the deletion of temporary files can happen late during - # Python finalization: too late for libregrtest. - if not support.is_wasi: - # Don't check for leaked temporary files and directories if Python is - # run on WASI. WASI don't pass environment variables like TMPDIR to - # worker processes. - tmp_dir = tempfile.mkdtemp(prefix="test_python_") - tmp_dir = os.path.abspath(tmp_dir) - try: - retcode = self._run_process(worker_job, stdout_file, tmp_dir) - finally: - tmp_files = os.listdir(tmp_dir) - os_helper.rmtree(tmp_dir) - else: - retcode = self._run_process(worker_job, stdout_file) - tmp_files = () - stdout_file.seek(0) - - try: - stdout = stdout_file.read().strip() - except Exception as exc: - # gh-101634: Catch UnicodeDecodeError if stdout cannot be - # decoded from encoding - err_msg = f"Cannot read process stdout: {exc}" - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, err_msg=err_msg) - - if retcode is None: - result = TestResult(test_name, state=State.TIMEOUT) - return self.mp_result_error(result, stdout) - - err_msg = None - if retcode != 0: - err_msg = "Exit code %s" % retcode - else: - stdout, _, worker_json = stdout.rpartition("\n") - stdout = stdout.rstrip() - if not worker_json: - err_msg = "Failed to parse worker stdout" - else: - try: - # deserialize run_tests_worker() output - result = json.loads(worker_json, - object_hook=decode_test_result) - except Exception as exc: - err_msg = "Failed to parse worker JSON: %s" % exc - - if err_msg: - result = TestResult(test_name, state=State.MULTIPROCESSING_ERROR) - return self.mp_result_error(result, stdout, err_msg) - - if tmp_files: - msg = (f'\n\n' - f'Warning -- {test_name} leaked temporary files ' - f'({len(tmp_files)}): {", ".join(sorted(tmp_files))}') - stdout += msg - result.set_env_changed() - - return MultiprocessResult(result, stdout) - - def run(self) -> None: - fail_fast = self.ns.failfast - fail_env_changed = self.ns.fail_env_changed - while not self._stopped: - try: - try: - test_name = next(self.pending) - except StopIteration: - break - - self.start_time = time.monotonic() - mp_result = self._runtest(test_name) - mp_result.result.duration = time.monotonic() - self.start_time - self.output.put((False, mp_result)) - - if mp_result.result.must_stop(fail_fast, fail_env_changed): - break - except ExitThread: - break - except BaseException: - self.output.put((True, traceback.format_exc())) - break - - def _wait_completed(self) -> None: - popen = self._popen - - try: - popen.wait(JOIN_TIMEOUT) - except (subprocess.TimeoutExpired, OSError) as exc: - print_warning(f"Failed to wait for {self} completion " - f"(timeout={format_duration(JOIN_TIMEOUT)}): " - f"{exc!r}") - - def wait_stopped(self, start_time: float) -> None: - # bpo-38207: MultiprocessTestRunner.stop_workers() called self.stop() - # which killed the process. Sometimes, killing the process from the - # main thread does not interrupt popen.communicate() in - # TestWorkerProcess thread. This loop with a timeout is a workaround - # for that. - # - # Moreover, if this method fails to join the thread, it is likely - # that Python will hang at exit while calling threading._shutdown() - # which tries again to join the blocked thread. Regrtest.main() - # uses EXIT_TIMEOUT to workaround this second bug. - while True: - # Write a message every second - self.join(1.0) - if not self.is_alive(): - break - dt = time.monotonic() - start_time - self.regrtest.log(f"Waiting for {self} thread " - f"for {format_duration(dt)}") - if dt > JOIN_TIMEOUT: - print_warning(f"Failed to join {self} in {format_duration(dt)}") - break - - -def get_running(workers: list[TestWorkerProcess]) -> list[TestWorkerProcess]: - running = [] - for worker in workers: - current_test_name = worker.current_test_name - if not current_test_name: - continue - dt = time.monotonic() - worker.start_time - if dt >= PROGRESS_MIN_TIME: - text = '%s (%s)' % (current_test_name, format_duration(dt)) - running.append(text) - return running - - -class MultiprocessTestRunner: - def __init__(self, regrtest: Regrtest, runtests: RunTests) -> None: - ns = regrtest.ns - timeout = ns.timeout - - self.regrtest = regrtest - self.runtests = runtests - self.rerun = runtests.rerun - self.log = self.regrtest.log - self.ns = ns - self.output: queue.Queue[QueueOutput] = queue.Queue() - tests_iter = runtests.iter_tests() - self.pending = MultiprocessIterator(tests_iter) - if timeout is not None: - # Rely on faulthandler to kill a worker process. This timouet is - # when faulthandler fails to kill a worker process. Give a maximum - # of 5 minutes to faulthandler to kill the worker. - self.worker_timeout = min(timeout * 1.5, timeout + 5 * 60) - else: - self.worker_timeout = None - self.workers = None - - def start_workers(self) -> None: - use_mp = self.ns.use_mp - timeout = self.ns.timeout - self.workers = [TestWorkerProcess(index, self) - for index in range(1, use_mp + 1)] - msg = f"Run tests in parallel using {len(self.workers)} child processes" - if timeout: - msg += (" (timeout: %s, worker timeout: %s)" - % (format_duration(timeout), - format_duration(self.worker_timeout))) - self.log(msg) - for worker in self.workers: - worker.start() - - def stop_workers(self) -> None: - start_time = time.monotonic() - for worker in self.workers: - worker.stop() - for worker in self.workers: - worker.wait_stopped(start_time) - - def _get_result(self) -> QueueOutput | None: - pgo = self.ns.pgo - use_faulthandler = (self.ns.timeout is not None) - timeout = PROGRESS_UPDATE - - # bpo-46205: check the status of workers every iteration to avoid - # waiting forever on an empty queue. - while any(worker.is_alive() for worker in self.workers): - if use_faulthandler: - faulthandler.dump_traceback_later(MAIN_PROCESS_TIMEOUT, - exit=True) - - # wait for a thread - try: - return self.output.get(timeout=timeout) - except queue.Empty: - pass - - # display progress - running = get_running(self.workers) - if running and not pgo: - self.log('running: %s' % ', '.join(running)) - - # all worker threads are done: consume pending results - try: - return self.output.get(timeout=0) - except queue.Empty: - return None - - def display_result(self, mp_result: MultiprocessResult) -> None: - result = mp_result.result - pgo = self.ns.pgo - - text = str(result) - if mp_result.err_msg: - # MULTIPROCESSING_ERROR - text += ' (%s)' % mp_result.err_msg - elif (result.duration >= PROGRESS_MIN_TIME and not pgo): - text += ' (%s)' % format_duration(result.duration) - running = get_running(self.workers) - if running and not pgo: - text += ' -- running: %s' % ', '.join(running) - self.regrtest.display_progress(self.test_index, text) - - def _process_result(self, item: QueueOutput) -> bool: - """Returns True if test runner must stop.""" - rerun = self.runtests.rerun - if item[0]: - # Thread got an exception - format_exc = item[1] - print_warning(f"regrtest worker thread failed: {format_exc}") - result = TestResult("", state=State.MULTIPROCESSING_ERROR) - self.regrtest.accumulate_result(result, rerun=rerun) - return result - - self.test_index += 1 - mp_result = item[1] - result = mp_result.result - self.regrtest.accumulate_result(result, rerun=rerun) - self.display_result(mp_result) - - if mp_result.worker_stdout: - print(mp_result.worker_stdout, flush=True) - - return result - - def run_tests(self) -> None: - fail_fast = self.ns.failfast - fail_env_changed = self.ns.fail_env_changed - timeout = self.ns.timeout - - self.start_workers() - - self.test_index = 0 - try: - while True: - item = self._get_result() - if item is None: - break - - result = self._process_result(item) - if result.must_stop(fail_fast, fail_env_changed): - break - except KeyboardInterrupt: - print() - self.regrtest.interrupted = True - finally: - if timeout is not None: - faulthandler.cancel_dump_traceback_later() - - # Always ensure that all worker processes are no longer - # worker when we exit this function - self.pending.stop() - self.stop_workers() - - -def run_tests_multiprocess(regrtest: Regrtest, runtests: RunTests) -> None: - MultiprocessTestRunner(regrtest, runtests).run_tests() - - -class EncodeTestResult(json.JSONEncoder): - """Encode a TestResult (sub)class object into a JSON dict.""" - - def default(self, o: Any) -> dict[str, Any]: - if isinstance(o, TestResult): - result = dataclasses.asdict(o) - result["__test_result__"] = o.__class__.__name__ - return result - - return super().default(o) - - -def decode_test_result(d: dict[str, Any]) -> TestResult | dict[str, Any]: - """Decode a TestResult (sub)class object from a JSON dict.""" - - if "__test_result__" not in d: - return d - - d.pop('__test_result__') - if d['stats'] is not None: - d['stats'] = TestStats(**d['stats']) - return TestResult(**d) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 37fd9443e18007..ded8ad96606a7c 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -400,19 +400,19 @@ def check_sanitizer(*, address=False, memory=False, ub=False): raise ValueError('At least one of address, memory, or ub must be True') - _cflags = sysconfig.get_config_var('CFLAGS') or '' - _config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' + cflags = sysconfig.get_config_var('CFLAGS') or '' + config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' memory_sanitizer = ( - '-fsanitize=memory' in _cflags or - '--with-memory-sanitizer' in _config_args + '-fsanitize=memory' in cflags or + '--with-memory-sanitizer' in config_args ) address_sanitizer = ( - '-fsanitize=address' in _cflags or - '--with-address-sanitizer' in _config_args + '-fsanitize=address' in cflags or + '--with-address-sanitizer' in config_args ) ub_sanitizer = ( - '-fsanitize=undefined' in _cflags or - '--with-undefined-behavior-sanitizer' in _config_args + '-fsanitize=undefined' in cflags or + '--with-undefined-behavior-sanitizer' in config_args ) return ( (memory and memory_sanitizer) or @@ -916,27 +916,31 @@ def inner(*args, **kwds): MAX_Py_ssize_t = sys.maxsize -def set_memlimit(limit): - global max_memuse - global real_max_memuse +def _parse_memlimit(limit: str) -> int: sizes = { 'k': 1024, 'm': _1M, 'g': _1G, 't': 1024*_1G, } - m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, + m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE) if m is None: - raise ValueError('Invalid memory limit %r' % (limit,)) - memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) - real_max_memuse = memlimit - if memlimit > MAX_Py_ssize_t: - memlimit = MAX_Py_ssize_t + raise ValueError(f'Invalid memory limit: {limit!r}') + return int(float(m.group(1)) * sizes[m.group(2).lower()]) + +def set_memlimit(limit: str) -> None: + global max_memuse + global real_max_memuse + memlimit = _parse_memlimit(limit) if memlimit < _2G - 1: - raise ValueError('Memory limit %r too low to be useful' % (limit,)) + raise ValueError('Memory limit {limit!r} too low to be useful') + + real_max_memuse = memlimit + memlimit = min(memlimit, MAX_Py_ssize_t) max_memuse = memlimit + class _MemoryWatchdog: """An object which periodically watches the process' memory consumption and prints it out. diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py index b9b05fc4306a31..4a93249af313cf 100644 --- a/Lib/test/test_support.py +++ b/Lib/test/test_support.py @@ -764,7 +764,45 @@ def recursive_function(depth): else: self.fail("RecursionError was not raised") - #self.assertEqual(available, 2) + def test_parse_memlimit(self): + parse = support._parse_memlimit + KiB = 1024 + MiB = KiB * 1024 + GiB = MiB * 1024 + TiB = GiB * 1024 + self.assertEqual(parse('0k'), 0) + self.assertEqual(parse('3k'), 3 * KiB) + self.assertEqual(parse('2.4m'), int(2.4 * MiB)) + self.assertEqual(parse('4g'), int(4 * GiB)) + self.assertEqual(parse('1t'), TiB) + + for limit in ('', '3', '3.5.10k', '10x'): + with self.subTest(limit=limit): + with self.assertRaises(ValueError): + parse(limit) + + def test_set_memlimit(self): + _4GiB = 4 * 1024 ** 3 + TiB = 1024 ** 4 + old_max_memuse = support.max_memuse + old_real_max_memuse = support.real_max_memuse + try: + if sys.maxsize > 2**32: + support.set_memlimit('4g') + self.assertEqual(support.max_memuse, _4GiB) + self.assertEqual(support.real_max_memuse, _4GiB) + + big = 2**100 // TiB + support.set_memlimit(f'{big}t') + self.assertEqual(support.max_memuse, sys.maxsize) + self.assertEqual(support.real_max_memuse, big * TiB) + else: + support.set_memlimit('4g') + self.assertEqual(support.max_memuse, sys.maxsize) + self.assertEqual(support.real_max_memuse, _4GiB) + finally: + support.max_memuse = old_max_memuse + support.real_max_memuse = old_real_max_memuse def test_copy_python_src_ignore(self): # Get source directory @@ -813,7 +851,6 @@ def test_copy_python_src_ignore(self): # EnvironmentVarGuard # transient_internet # run_with_locale - # set_memlimit # bigmemtest # precisionbigmemtest # bigaddrspacetest