diff --git a/tests/aggregate_tests.py b/tests/aggregate_tests.py index ad87769e3c..835ffd10ba 100755 --- a/tests/aggregate_tests.py +++ b/tests/aggregate_tests.py @@ -29,13 +29,16 @@ import sys import unittest -if __name__ == '__main__': - suite = unittest.TestLoader().discover(".") - all_tests_passed = unittest.TextTestRunner( - verbosity=1, buffer=True).run(suite).wasSuccessful() - - if not all_tests_passed: - sys.exit(1) - - else: - sys.exit(0) +if __name__ == "__main__": + suite = unittest.TestLoader().discover(".") + all_tests_passed = ( + unittest.TextTestRunner(verbosity=1, buffer=True) + .run(suite) + .wasSuccessful() + ) + + if not all_tests_passed: + sys.exit(1) + + else: + sys.exit(0) diff --git a/tests/simple_server.py b/tests/simple_server.py index 74e84f0d80..45d481bed1 100755 --- a/tests/simple_server.py +++ b/tests/simple_server.py @@ -25,17 +25,21 @@ http://docs.python.org/library/simplehttpserver.html#module-SimpleHTTPServer """ -import sys -import random import socketserver +import sys from http.server import SimpleHTTPRequestHandler +from typing import Type, Union class QuietHTTPRequestHandler(SimpleHTTPRequestHandler): - """A SimpleHTTPRequestHandler that does not write incoming requests to - stderr. """ - def log_request(self, code='-', size='-'): - pass + """A SimpleHTTPRequestHandler that does not write incoming requests to + stderr.""" + + def log_request( + self, code: Union[int, str] = "-", size: Union[int, str] = "-" + ) -> None: + pass + # NOTE: On Windows/Python2 tests that use this simple_server.py in a # subprocesses hang after a certain amount of requests (~68), if a PIPE is @@ -43,22 +47,20 @@ def log_request(self, code='-', size='-'): # we silence the HTTP messages. # If you decide to receive the HTTP messages, then this bug # could reappear. -use_quiet_http_request_handler = True -if len(sys.argv) > 2: - use_quiet_http_request_handler = sys.argv[2] +# pylint: disable=invalid-name +handler: Type[Union[SimpleHTTPRequestHandler, QuietHTTPRequestHandler]] -if use_quiet_http_request_handler: - handler = QuietHTTPRequestHandler +if len(sys.argv) > 2 and sys.argv[2]: + handler = QuietHTTPRequestHandler else: - handler = SimpleHTTPRequestHandler + handler = SimpleHTTPRequestHandler # Allow re-use so you can re-run tests as often as you want even if the # tests re-use ports. Otherwise TCP TIME-WAIT prevents reuse for ~1 minute socketserver.TCPServer.allow_reuse_address = True -httpd = socketserver.TCPServer(('localhost', 0), handler) -port_message = 'bind succeeded, server port is: ' \ - + str(httpd.server_address[1]) +httpd = socketserver.TCPServer(("localhost", 0), handler) +port_message = "bind succeeded, server port is: " + str(httpd.server_address[1]) print(port_message) httpd.serve_forever() diff --git a/tests/test_utils.py b/tests/test_utils.py index 793db8f060..2f1adeac65 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -20,104 +20,108 @@ Provide tests for some of the functions in utils.py module. """ -import os import logging -import unittest +import os import socket import sys - -import tuf.unittest_toolbox as unittest_toolbox +import unittest from tests import utils +from tuf import unittest_toolbox logger = logging.getLogger(__name__) -class TestServerProcess(unittest_toolbox.Modified_TestCase): - - def tearDown(self): - # Make sure we are calling clean on existing attribute. - if hasattr(self, 'server_process_handler'): - self.server_process_handler.clean() - - def can_connect(self): +def can_connect(port: int) -> bool: + """Check if a socket can connect on the given port""" try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(('localhost', self.server_process_handler.port)) - return True - except: - return False + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(("localhost", port)) + return True + # pylint: disable=broad-except + except Exception: + return False finally: - # The process will always enter in finally even we return. - if sock: - sock.close() - - - def test_simple_server_startup(self): - # Test normal case - self.server_process_handler = utils.TestServerProcess(log=logger) - - # Make sure we can connect to the server - self.assertTrue(self.can_connect()) - - - def test_simple_https_server_startup(self): - # Test normal case - good_cert_path = os.path.join('ssl_certs', 'ssl_cert.crt') - self.server_process_handler = utils.TestServerProcess(log=logger, - server='simple_https_server.py', extra_cmd_args=[good_cert_path]) - - # Make sure we can connect to the server - self.assertTrue(self.can_connect()) - self.server_process_handler.clean() - - # Test when no cert file is provided - self.server_process_handler = utils.TestServerProcess(log=logger, - server='simple_https_server.py') + # The process will always enter in finally even after return. + if sock: + sock.close() - # Make sure we can connect to the server - self.assertTrue(self.can_connect()) - self.server_process_handler.clean() - # Test with a non existing cert file. - non_existing_cert_path = os.path.join('ssl_certs', 'non_existing.crt') - self.server_process_handler = utils.TestServerProcess(log=logger, - server='simple_https_server.py', - extra_cmd_args=[non_existing_cert_path]) - - # Make sure we can connect to the server - self.assertTrue(self.can_connect()) - - - def test_slow_retrieval_server_startup(self): - # Test normal case - self.server_process_handler = utils.TestServerProcess(log=logger, - server='slow_retrieval_server.py') - - # Make sure we can connect to the server - self.assertTrue(self.can_connect()) - - - def test_cleanup(self): - # Test normal case - self.server_process_handler = utils.TestServerProcess(log=logger, - server='simple_server.py') - - self.server_process_handler.clean() - - # Check if the process has successfully been killed. - self.assertFalse(self.server_process_handler.is_process_running()) - - - def test_server_exit_before_timeout(self): - with self.assertRaises(utils.TestServerProcessError): - utils.TestServerProcess(logger, server='non_existing_server.py') - - # Test starting a server which immediately exits." - with self.assertRaises(utils.TestServerProcessError): - utils.TestServerProcess(logger, server='fast_server_exit.py') - - -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +class TestServerProcess(unittest_toolbox.Modified_TestCase): + """Test functionality provided in TestServerProcess from tests/utils.py.""" + + def test_simple_server_startup(self) -> None: + # Test normal case + server_process_handler = utils.TestServerProcess(log=logger) + + # Make sure we can connect to the server + self.assertTrue(can_connect(server_process_handler.port)) + server_process_handler.clean() + + def test_simple_https_server_startup(self) -> None: + # Test normal case + good_cert_path = os.path.join("ssl_certs", "ssl_cert.crt") + server_process_handler = utils.TestServerProcess( + log=logger, + server="simple_https_server.py", + extra_cmd_args=[good_cert_path], + ) + + # Make sure we can connect to the server + self.assertTrue(can_connect(server_process_handler.port)) + server_process_handler.clean() + + # Test when no cert file is provided + server_process_handler = utils.TestServerProcess( + log=logger, server="simple_https_server.py" + ) + + # Make sure we can connect to the server + self.assertTrue(can_connect(server_process_handler.port)) + server_process_handler.clean() + + # Test with a non existing cert file. + non_existing_cert_path = os.path.join("ssl_certs", "non_existing.crt") + server_process_handler = utils.TestServerProcess( + log=logger, + server="simple_https_server.py", + extra_cmd_args=[non_existing_cert_path], + ) + + # Make sure we can connect to the server + self.assertTrue(can_connect(server_process_handler.port)) + server_process_handler.clean() + + def test_slow_retrieval_server_startup(self) -> None: + # Test normal case + server_process_handler = utils.TestServerProcess( + log=logger, server="slow_retrieval_server.py" + ) + + # Make sure we can connect to the server + self.assertTrue(can_connect(server_process_handler.port)) + server_process_handler.clean() + + def test_cleanup(self) -> None: + # Test normal case + server_process_handler = utils.TestServerProcess( + log=logger, server="simple_server.py" + ) + + server_process_handler.clean() + + # Check if the process has successfully been killed. + self.assertFalse(server_process_handler.is_process_running()) + + def test_server_exit_before_timeout(self) -> None: + with self.assertRaises(utils.TestServerProcessError): + utils.TestServerProcess(logger, server="non_existing_server.py") + + # Test starting a server which immediately exits." + with self.assertRaises(utils.TestServerProcessError): + utils.TestServerProcess(logger, server="fast_server_exit.py") + + +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/utils.py b/tests/utils.py index e5c251d0f7..a1ae886c14 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,26 +20,26 @@ Provide common utilities for TUF tests """ -from contextlib import contextmanager -from typing import Any, Callable, Dict, IO, Optional, Callable, List, Iterator -import unittest import argparse import errno import logging +import queue import socket -import sys -import time import subprocess +import sys import threading +import time +import unittest import warnings -import queue +from contextlib import contextmanager +from typing import IO, Any, Callable, Dict, Iterator, List, Optional import tuf.log logger = logging.getLogger(__name__) # Used when forming URLs on the client side -TEST_HOST_ADDRESS = '127.0.0.1' +TEST_HOST_ADDRESS = "127.0.0.1" # DataSet is only here so type hints can be used. @@ -49,35 +49,40 @@ # (where N is number of items in dataset), feeding the actual test # function one test case at a time def run_sub_tests_with_dataset( - dataset: DataSet + dataset: DataSet, ) -> Callable[[Callable], Callable]: + """Decorator starting a unittest.TestCase.subtest() for each of the + cases in dataset""" + def real_decorator( - function: Callable[[unittest.TestCase, Any], None] + function: Callable[[unittest.TestCase, Any], None] ) -> Callable[[unittest.TestCase], None]: def wrapper(test_cls: unittest.TestCase) -> None: for case, data in dataset.items(): with test_cls.subTest(case=case): function(test_cls, data) + return wrapper + return real_decorator class TestServerProcessError(Exception): + def __init__(self, value: str = "TestServerProcess") -> None: + super().__init__() + self.value = value - def __init__(self, value: str="TestServerProcess") -> None: - self.value = value - - def __str__(self) -> str: - return repr(self.value) + def __str__(self) -> str: + return repr(self.value) @contextmanager def ignore_deprecation_warnings(module: str) -> Iterator[None]: - with warnings.catch_warnings(): - warnings.filterwarnings('ignore', - category=DeprecationWarning, - module=module) - yield + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", category=DeprecationWarning, module=module + ) + yield # Wait until host:port accepts connections. @@ -86,253 +91,265 @@ def ignore_deprecation_warnings(module: str) -> Iterator[None]: # but the current blocking connect() seems to work fast on Linux and seems # to at least work on Windows (ECONNREFUSED unfortunately has a 2 second # timeout on Windows) -def wait_for_server(host: str, server: str, port: int, timeout: int=10) -> None: - start = time.time() - remaining_timeout = timeout - succeeded = False - while not succeeded and remaining_timeout > 0: - try: - sock: Optional[socket.socket] = socket.socket( - socket.AF_INET, socket.SOCK_STREAM - ) - assert sock is not None - sock.settimeout(remaining_timeout) - sock.connect((host, port)) - succeeded = True - except socket.timeout as e: - pass - except IOError as e: - # ECONNREFUSED is expected while the server is not started - if e.errno not in [errno.ECONNREFUSED]: - logger.warning("Unexpected error while waiting for server: " + str(e)) - # Avoid pegging a core just for this - time.sleep(0.01) - finally: - if sock: - sock.close() - sock = None - remaining_timeout = int(timeout - (time.time() - start)) - - if not succeeded: - raise TimeoutError("Could not connect to the " + server \ - + " on port " + str(port) + "!") +def wait_for_server( + host: str, server: str, port: int, timeout: int = 10 +) -> None: + """Wait for server start until timeout is reached or server has started""" + start = time.time() + remaining_timeout = timeout + succeeded = False + while not succeeded and remaining_timeout > 0: + try: + sock: Optional[socket.socket] = socket.socket( + socket.AF_INET, socket.SOCK_STREAM + ) + assert sock is not None + sock.settimeout(remaining_timeout) + sock.connect((host, port)) + succeeded = True + except socket.timeout: + pass + except IOError as e: + # ECONNREFUSED is expected while the server is not started + if e.errno not in [errno.ECONNREFUSED]: + logger.warning( + "Unexpected error while waiting for server: %s", str(e) + ) + # Avoid pegging a core just for this + time.sleep(0.01) + finally: + if sock: + sock.close() + sock = None + remaining_timeout = int(timeout - (time.time() - start)) + + if not succeeded: + raise TimeoutError( + "Could not connect to the " + server + " on port " + str(port) + "!" + ) def configure_test_logging(argv: List[str]) -> None: - # parse arguments but only handle '-v': argv may contain - # other things meant for unittest argument parser - parser = argparse.ArgumentParser(add_help=False) - parser.add_argument('-v', '--verbose', action='count', default=0) - args, _ = parser.parse_known_args(argv) - - if args.verbose <= 1: - # 0 and 1 both mean ERROR: this way '-v' makes unittest print test - # names without increasing log level - loglevel = logging.ERROR - elif args.verbose == 2: - loglevel = logging.WARNING - elif args.verbose == 3: - loglevel = logging.INFO - else: - loglevel = logging.DEBUG - - logging.basicConfig(level=loglevel) - tuf.log.set_log_level(loglevel) - - -class TestServerProcess(): - """ - - Creates a child process with the subprocess.Popen object and - uses a thread-safe Queue structure for logging. - - - log: - Logger which will be used for logging. - - server: - Path to the server to run in the subprocess. - Default is "simpler_server.py". - - timeout: - Time in seconds in which the server should start or otherwise - TimeoutError error will be raised. - Default is 10. - - popen_cwd: - Current working directory used when instancing a - subprocess.Popen object. - Default is "." - - extra_cmd_args: - List of additional arguments for the command - which will start the subprocess. - More precisely "python -u ". - When no list is provided, an empty list ("[]") will be assigned to it. - """ - - - def __init__(self, log: logging.Logger, server: str='simple_server.py', - timeout: int=10, popen_cwd: str=".", extra_cmd_args: Optional[List[str]]=None - ): - - self.server = server - self.__logger = log - # Stores popped messages from the queue. - self.__logged_messages: List[str] = [] - if extra_cmd_args is None: - extra_cmd_args = [] - - try: - self._start_server(timeout, extra_cmd_args, popen_cwd) - wait_for_server('localhost', self.server, self.port, timeout) - except Exception as e: - # Clean the resources and log the server errors if any exists. - self.clean() - raise e - - - - def _start_server( - self, timeout: int, extra_cmd_args: List[str], popen_cwd: str - ) -> None: - """ - Start the server subprocess and a thread - responsible to redirect stdout/stderr to the Queue. - Waits for the port message maximum timeout seconds. - """ - - self._start_process(extra_cmd_args, popen_cwd) - self._start_redirect_thread() - - self._wait_for_port(timeout) - - self.__logger.info(self.server + ' serving on ' + str(self.port)) - - - - def _start_process(self, extra_cmd_args: List[str], popen_cwd: str) -> None: - """Starts the process running the server.""" - - # The "-u" option forces stdin, stdout and stderr to be unbuffered. - command = [sys.executable, '-u', self.server] + extra_cmd_args - - # Reusing one subprocess in multiple tests, but split up the logs for each. - self.__server_process = subprocess.Popen(command, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=popen_cwd) - - - - def _start_redirect_thread(self) -> None: - """Starts a thread responsible to redirect stdout/stderr to the Queue.""" - - # Run log_queue_worker() in a thread. - # The thread will exit when the child process dies. - self._log_queue = queue.Queue() - log_thread = threading.Thread(target=self._log_queue_worker, - args=(self.__server_process.stdout, self._log_queue)) - - # "daemon = True" means the thread won't interfere with the process exit. - log_thread.daemon = True - log_thread.start() - - - @staticmethod - def _log_queue_worker(stream: IO, line_queue: queue.Queue) -> None: - """ - Worker function to run in a seprate thread. - Reads from 'stream', puts lines in a Queue (Queue is thread-safe). - """ - - while True: - # readline() is a blocking operation. - # decode to push a string in the queue instead of 8-bit bytes. - log_line = stream.readline().decode('utf-8') - line_queue.put(log_line) - - if len(log_line) == 0: - # This is the end of the stream meaning the server process has exited. - stream.close() - break - - - - def _wait_for_port(self, timeout: int) -> None: - """ - Validates the first item from the Queue against the port message. - If validation is successful, self.port is set. - Raises TestServerProcessError if the process has exited or - TimeoutError if no message was found within timeout seconds. + """Configure logger level for a certain test file""" + # parse arguments but only handle '-v': argv may contain + # other things meant for unittest argument parser + parser = argparse.ArgumentParser(add_help=False) + parser.add_argument("-v", "--verbose", action="count", default=0) + args, _ = parser.parse_known_args(argv) + + if args.verbose <= 1: + # 0 and 1 both mean ERROR: this way '-v' makes unittest print test + # names without increasing log level + loglevel = logging.ERROR + elif args.verbose == 2: + loglevel = logging.WARNING + elif args.verbose == 3: + loglevel = logging.INFO + else: + loglevel = logging.DEBUG + + logging.basicConfig(level=loglevel) + tuf.log.set_log_level(loglevel) + + +class TestServerProcess: + """Helper class used to create a child process with the subprocess.Popen + object and use a thread-safe Queue structure for logging. + + Args: + log: Logger which will be used for logging. + server: Path to the server to run in the subprocess. + timeout: Time in seconds in which the server should start or otherwise + TimeoutError error will be raised. + popen_cwd: Current working directory used when instancing a + subprocess.Popen object. + extra_cmd_args: Additional arguments for the command which will start + the subprocess. More precisely: + "python -u ". + If no list is provided, an empty list ("[]") will be assigned to it. """ - # We have hardcoded the message we expect on a successful server startup. - # This message should be the first message sent by the server! - expected_msg = 'bind succeeded, server port is: ' - try: - line = self._log_queue.get(timeout=timeout) - if len(line) == 0: - # The process has exited. - raise TestServerProcessError(self.server + ' exited unexpectedly ' \ - + 'with code ' + str(self.__server_process.poll()) + '!') - - elif line.startswith(expected_msg): - self.port = int(line[len(expected_msg):]) - else: - # An exception or some other message is printed from the server. - self.__logged_messages.append(line) - # Check if more lines are logged. + def __init__( + self, + log: logging.Logger, + server: str = "simple_server.py", + timeout: int = 10, + popen_cwd: str = ".", + extra_cmd_args: Optional[List[str]] = None, + ): + + self.server = server + self.__logger = log + # Stores popped messages from the queue. + self.__logged_messages: List[str] = [] + self.__server_process: Optional[subprocess.Popen] = None + self._log_queue: Optional[queue.Queue] = None + self.port = -1 + if extra_cmd_args is None: + extra_cmd_args = [] + + try: + self._start_server(timeout, extra_cmd_args, popen_cwd) + wait_for_server("localhost", self.server, self.port, timeout) + except Exception as e: + # Clean the resources and log the server errors if any exists. + self.clean() + raise e + + def _start_server( + self, timeout: int, extra_cmd_args: List[str], popen_cwd: str + ) -> None: + """ + Start the server subprocess and a thread + responsible to redirect stdout/stderr to the Queue. + Waits for the port message maximum timeout seconds. + """ + + self._start_process(extra_cmd_args, popen_cwd) + self._start_redirect_thread() + + self._wait_for_port(timeout) + + self.__logger.info(self.server + " serving on " + str(self.port)) + + def _start_process(self, extra_cmd_args: List[str], popen_cwd: str) -> None: + """Starts the process running the server.""" + + # The "-u" option forces stdin, stdout and stderr to be unbuffered. + command = [sys.executable, "-u", self.server] + extra_cmd_args + + # Reusing one subprocess in multiple tests, but split up the logs + # for each. + self.__server_process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=popen_cwd, + ) + + def _start_redirect_thread(self) -> None: + """Starts a thread redirecting the stdout/stderr to the Queue.""" + + assert isinstance(self.__server_process, subprocess.Popen) + # Run log_queue_worker() in a thread. + # The thread will exit when the child process dies. + self._log_queue = queue.Queue() + log_thread = threading.Thread( + target=self._log_queue_worker, + args=(self.__server_process.stdout, self._log_queue), + ) + + # "daemon = True" means the thread won't interfere with the + # process exit. + log_thread.daemon = True + log_thread.start() + + @staticmethod + def _log_queue_worker(stream: IO, line_queue: queue.Queue) -> None: + """ + Worker function to run in a seprate thread. + Reads from 'stream', puts lines in a Queue (Queue is thread-safe). + """ + + while True: + # readline() is a blocking operation. + # decode to push a string in the queue instead of 8-bit bytes. + log_line = stream.readline().decode("utf-8") + line_queue.put(log_line) + + if len(log_line) == 0: + # This is the end of the stream meaning the server process + # has exited. + stream.close() + break + + def _wait_for_port(self, timeout: int) -> None: + """ + Validates the first item from the Queue against the port message. + If validation is successful, self.port is set. + Raises TestServerProcessError if the process has exited or + TimeoutError if no message was found within timeout seconds. + """ + + assert isinstance(self.__server_process, subprocess.Popen) + assert isinstance(self._log_queue, queue.Queue) + # We have hardcoded the message we expect on a successful server + # startup. This message should be the first message sent by the server! + expected_msg = "bind succeeded, server port is: " + try: + line = self._log_queue.get(timeout=timeout) + if len(line) == 0: + # The process has exited. + raise TestServerProcessError( + self.server + + " exited unexpectedly " + + "with code " + + str(self.__server_process.poll()) + + "!" + ) + + if line.startswith(expected_msg): + self.port = int(line[len(expected_msg) :]) + else: + # An exception or some other message is printed from the server. + self.__logged_messages.append(line) + # Check if more lines are logged. + self.flush_log() + raise TestServerProcessError( + self.server + + " did not print port " + + "message as first stdout line as expected!" + ) + except queue.Empty as e: + raise TimeoutError( + "Failure during " + self.server + " startup!" + ) from e + + def _kill_server_process(self) -> None: + """Kills the server subprocess if it's running.""" + + assert isinstance(self.__server_process, subprocess.Popen) + if self.is_process_running(): + self.__logger.info( + "Server process " + + str(self.__server_process.pid) + + " terminated." + ) + self.__server_process.kill() + self.__server_process.wait() + + def flush_log(self) -> None: + """Flushes the log lines from the logging queue.""" + + assert isinstance(self._log_queue, queue.Queue) + while True: + # Get lines from log_queue + try: + line = self._log_queue.get(block=False) + if len(line) > 0: + self.__logged_messages.append(line) + except queue.Empty: + # No more lines are logged in the queue. + break + + if len(self.__logged_messages) > 0: + title = "Test server (" + self.server + ") output:\n" + message = [title] + self.__logged_messages + self.__logger.info("| ".join(message)) + self.__logged_messages = [] + + def clean(self) -> None: + """ + Kills the subprocess and closes the TempFile. + Calls flush_log to check for logged information, but not yet flushed. + """ + + # If there is anything logged, flush it before closing the resourses. self.flush_log() - raise TestServerProcessError(self.server + ' did not print port ' \ - + 'message as first stdout line as expected!') - except queue.Empty: - raise TimeoutError('Failure during ' + self.server + ' startup!') - - - - def _kill_server_process(self) -> None: - """Kills the server subprocess if it's running.""" - - if self.is_process_running(): - self.__logger.info('Server process ' + str(self.__server_process.pid) + - ' terminated.') - self.__server_process.kill() - self.__server_process.wait() - - - - def flush_log(self) -> None: - """Flushes the log lines from the logging queue.""" - - while True: - # Get lines from log_queue - try: - line = self._log_queue.get(block=False) - if len(line) > 0: - self.__logged_messages.append(line) - except queue.Empty: - # No more lines are logged in the queue. - break - - if len(self.__logged_messages) > 0: - title = "Test server (" + self.server + ") output:\n" - message = [title] + self.__logged_messages - self.__logger.info('| '.join(message)) - self.__logged_messages = [] - - - - def clean(self) -> None: - """ - Kills the subprocess and closes the TempFile. - Calls flush_log to check for logged information, but not yet flushed. - """ - - # If there is anything logged, flush it before closing the resourses. - self.flush_log() - - self._kill_server_process() - + self._kill_server_process() - def is_process_running(self) -> bool: - return True if self.__server_process.poll() is None else False + def is_process_running(self) -> bool: + assert isinstance(self.__server_process, subprocess.Popen) + # pylint: disable=simplifiable-if-expression + return True if self.__server_process.poll() is None else False