diff --git a/lib/test.py b/lib/test.py index 625a0275..03360de1 100644 --- a/lib/test.py +++ b/lib/test.py @@ -1,14 +1,15 @@ +import difflib +import filecmp +import gevent import os +import pprint +import pytap13 import re +import shutil import sys import time -import filecmp -import difflib import traceback -import gevent -import pytap13 -import pprint -import shutil +from functools import partial try: from cStringIO import StringIO @@ -16,8 +17,9 @@ from StringIO import StringIO import lib -from lib.utils import non_empty_valgrind_logs, print_tail_n from lib.colorer import color_stdout +from lib.utils import non_empty_valgrind_logs +from lib.utils import print_tail_n class TestExecutionError(OSError): @@ -36,7 +38,10 @@ def _run(self, *args, **kwargs): self.callable(*self.callable_args, **self.callable_kwargs) def __repr__(self): - return "" % (hex(id(self)), getattr(self, "info", None)) + return "".format( + hex(id(self)), + getattr(self, "info", None) + ) class FilteredStream: @@ -84,6 +89,16 @@ def flush(self): self.stream.flush() +def get_filename_by_test(postfix, test_name): + rg = re.compile('\.test.*') + return os.path.basename(rg.sub(postfix, test_name)) + + +get_reject = partial(get_filename_by_test, '.reject') +get_result = partial(get_filename_by_test, '.result') +get_skipcond = partial(get_filename_by_test, '.skipcond') + + class Test: """An individual test file. A test object can run itself and remembers completion state of the run. @@ -91,7 +106,6 @@ class Test: If file .skipcond is exists it will be executed before test and if it sets self.skip to True value the test will be skipped. """ - rg = re.compile('\.test.*') def __init__(self, name, args, suite_ini, params={}, conf_name=None): """Initialize test properties: path to test file, path to @@ -99,13 +113,11 @@ def __init__(self, name, args, suite_ini, params={}, conf_name=None): self.name = name self.args = args self.suite_ini = suite_ini - self.result = os.path.join(suite_ini['suite'], - os.path.basename(self.rg.sub('.result', name))) - self.skip_cond = os.path.join(suite_ini['suite'], - os.path.basename(self.rg.sub('.skipcond', name))) + self.result = os.path.join(suite_ini['suite'], get_result(name)) + self.skip_cond = os.path.join(suite_ini['suite'], get_skipcond(name)) self.tmp_result = os.path.join(self.suite_ini['vardir'], os.path.basename(self.result)) - self.reject = self.rg.sub('.reject', name) + self.reject = get_reject(name) self.is_executed = False self.is_executed_ok = None self.is_equal_result = None @@ -126,7 +138,9 @@ def id(self): def passed(self): """Return true if this test was run successfully.""" - return self.is_executed and self.is_executed_ok and self.is_equal_result + return (self.is_executed and + self.is_executed_ok and + self.is_equal_result) def execute(self, server): # Note: don't forget to set 'server.current_test = self' in @@ -172,8 +186,9 @@ def run(self, server): if e.__class__.__name__ == 'TarantoolStartError': # worker should stop raise - color_stdout('\nTest.run() received the following error:\n' + - traceback.format_exc() + '\n', schema='error') + color_stdout('\nTest.run() received the following error:\n' + '{0}\n'.format(traceback.format_exc()), + schema='error') diagnostics = str(e) finally: if sys.stdout and sys.stdout != save_stdout: @@ -185,7 +200,8 @@ def run(self, server): is_tap = False if not self.skip: if self.is_executed_ok and os.path.isfile(self.result): - self.is_equal_result = filecmp.cmp(self.result, self.tmp_result) + self.is_equal_result = filecmp.cmp(self.result, + self.tmp_result) elif self.is_executed_ok: if lib.Options().args.is_verbose: color_stdout('\n') @@ -208,12 +224,15 @@ def run(self, server): color_stdout("[ skip ]\n", schema='test_skip') if os.path.exists(self.tmp_result): os.remove(self.tmp_result) - elif self.is_executed_ok and self.is_equal_result and self.is_valgrind_clean: + elif (self.is_executed_ok and + self.is_equal_result and + self.is_valgrind_clean): short_status = 'pass' color_stdout("[ pass ]\n", schema='test_pass') if os.path.exists(self.tmp_result): os.remove(self.tmp_result) - elif (self.is_executed_ok and not self.is_equal_result and not + elif (self.is_executed_ok and not + self.is_equal_result and not os.path.isfile(self.result)) and not is_tap: shutil.copy(self.tmp_result, self.result) short_status = 'new' @@ -226,9 +245,11 @@ def run(self, server): where = "" if not self.is_crash_reported and not self.is_executed_ok: self.print_diagnostics(self.reject, - "Test failed! Output from reject file {}:\n".format(self.reject)) + "Test failed! Output from reject file " + "{0}:\n".format(self.reject)) server.print_log(15) - where = ": test execution aborted, reason '{0}'".format(diagnostics) + where = ": test execution aborted, reason " \ + "'{0}'".format(diagnostics) elif not self.is_crash_reported and not self.is_equal_result: self.print_unidiff() server.print_log(15) @@ -237,7 +258,8 @@ def run(self, server): os.remove(self.reject) for log_file in non_empty_logs: self.print_diagnostics(log_file, - "Test failed! Output from log file {}:\n".format(log_file)) + "Test failed! Output from log file " + "{0}:\n".format(log_file)) where = ": there were warnings in the valgrind log file(s)" return short_status @@ -253,7 +275,8 @@ def print_unidiff(self): to establish the cause of a failure when .test differs from .result.""" - color_stdout("\nTest failed! Result content mismatch:\n", schema='error') + color_stdout("\nTest failed! Result content mismatch:\n", + schema='error') with open(self.result, "r") as result: with open(self.reject, "r") as reject: result_time = time.ctime(os.stat(self.result).st_mtime) @@ -298,7 +321,8 @@ def tap_parse_print_yaml(self, yml): def check_tap_output(self): """ Returns is_tap, is_ok """ if not os.path.isfile(self.tmp_result): - color_stdout('\nCannot find %s\n' % self.tmp_result, schema='error') + color_stdout('\nCannot find %s\n' % self.tmp_result, + schema='error') self.is_crash_reported = True return False with open(self.tmp_result, 'r') as f: @@ -307,7 +331,8 @@ def check_tap_output(self): try: tap.parse(content) except ValueError as e: - color_stdout('\nTAP13 parse failed: %s\n' % str(e), schema='error') + color_stdout('\nTAP13 parse failed: %s\n' % str(e), + schema='error') self.is_crash_reported = True return False, False is_ok = True @@ -326,6 +351,7 @@ def check_tap_output(self): self.tap_parse_print_yaml(test_case.yaml) is_ok = False if not is_ok: - color_stdout('Rejected result file: %s\n' % self.reject, schema='test_var') + color_stdout('Rejected result file: %s\n' % self.reject, + schema='test_var') self.is_crash_reported = True return True, is_ok diff --git a/lib/worker.py b/lib/worker.py index f0a871ef..c6e8c58f 100644 --- a/lib/worker.py +++ b/lib/worker.py @@ -1,17 +1,18 @@ +import collections +import copy +import functools import os import signal import traceback import yaml -import copy -import functools -import collections import lib -from lib.utils import safe_makedirs -from lib.test_suite import TestSuite - -from lib.colorer import color_stdout, color_log +from lib.colorer import color_log +from lib.colorer import color_stdout from lib.tarantool_server import TarantoolServer +from lib.test import get_result +from lib.test_suite import TestSuite +from lib.utils import safe_makedirs # Utils ####### @@ -157,6 +158,19 @@ def __init__(self, worker_id, worker_name): super(WorkerDone, self).__init__(worker_id, worker_name) +class WorkerCurrentTask(BaseWorkerMessage): + """ Provide information about current task running on worker. + It possible to check the `.result` file of hung tests. + And collect information about current tasks in parallel mode, + to show which parallel tests can affect failed test. + """ + def __init__(self, worker_id, worker_name, + task_name, task_param, task_result_filepath): + super(WorkerCurrentTask, self).__init__(worker_id, worker_name) + self.task_name = task_name + self.task_param = task_param + self.task_result_filepath = task_result_filepath + # Worker ######## @@ -176,6 +190,13 @@ def wrap_output(self, output, log_only): def done_marker(self): return WorkerDone(self.id, self.name) + def current_task(self, task_id): + task_name, task_param = task_id + task_result_filepath = os.path.join(self.suite.ini['vardir'], + get_result(task_name)) + return WorkerCurrentTask(self.id, self.name, + task_name, task_param, task_result_filepath) + def wrap_result(self, task_id, short_status): return WorkerTaskResult(self.id, self.name, task_id, short_status) @@ -267,7 +288,7 @@ def run_task(self, task_id): except KeyboardInterrupt: self.report_keyboard_interrupt() raise - except Exception as e: + except Exception: color_stdout( '\nWorker "%s" received the following error; stopping...\n' % self.name + traceback.format_exc() + '\n', schema='error') @@ -285,6 +306,8 @@ def run_loop(self, task_queue, result_queue): schema='test_var') self.stop_worker(task_queue, result_queue) break + + result_queue.put(self.current_task(task_id)) short_status = self.run_task(task_id) result_queue.put(self.wrap_result(task_id, short_status)) if not lib.Options().args.is_force and short_status == 'fail': @@ -307,7 +330,10 @@ def run_all(self, task_queue, result_queue): try: self.run_loop(task_queue, result_queue) - except (KeyboardInterrupt, Exception): + except (KeyboardInterrupt, Exception) as e: + if not isinstance(e, KeyboardInterrupt) and \ + not isinstance(e, VoluntaryStopException): + color_stdout('Exception: %s\n' % e, schema='error') self.stop_worker(task_queue, result_queue, cleanup=False) result_queue.put(self.done_marker()) diff --git a/listeners.py b/listeners.py index 81a0dc61..5e95325e 100644 --- a/listeners.py +++ b/listeners.py @@ -4,9 +4,12 @@ import yaml import lib -from lib.worker import get_reproduce_file -from lib.worker import WorkerOutput, WorkerDone, WorkerTaskResult from lib.colorer import color_stdout +from lib.worker import WorkerCurrentTask +from lib.worker import WorkerDone +from lib.worker import WorkerOutput +from lib.worker import WorkerTaskResult +from lib.worker import get_reproduce_file class BaseWatcher(object): @@ -170,36 +173,60 @@ class HangError(Exception): class HangWatcher(BaseWatcher): """Terminate all workers if no output received 'no_output_times' time.""" - def __init__(self, get_not_done_worker_ids, kill_all_workers, warn_timeout, - kill_timeout): + def __init__(self, get_not_done_worker_ids, kill_all_workers, + warn_timeout, kill_timeout): self.get_not_done_worker_ids = get_not_done_worker_ids self.kill_all_workers = kill_all_workers self.warn_timeout = warn_timeout self.kill_timeout = kill_timeout self.warned_seconds_ago = 0.0 self.inactivity = 0.0 + self.worker_current_task = dict() def process_result(self, obj): self.warned_seconds_ago = 0.0 self.inactivity = 0.0 + if isinstance(obj, WorkerCurrentTask): + self.worker_current_task[obj.worker_id] = obj + def process_timeout(self, delta_seconds): self.warned_seconds_ago += delta_seconds self.inactivity += delta_seconds worker_ids = self.get_not_done_worker_ids() + if self.warned_seconds_ago < self.warn_timeout: return - color_stdout("No output during %d seconds. " - "List of workers not reporting the status: %s; " - "Will abort after %d seconds without output.\n" % ( - self.inactivity, worker_ids, self.kill_timeout), - schema='test_var') + + color_stdout( + "No output during {0.inactivity:.0f} seconds. " + "Will abort after {0.kill_timeout:.0f} seconds without output. " + "List of workers not reporting the status:\n".format(self), + schema='test_var') + + hung_tasks = [task for worker_id, task + in self.worker_current_task.iteritems() + if worker_id in worker_ids] + for current_task in hung_tasks: + color_stdout("- [{0:03d}, {1}, {2}]\n".format(current_task.worker_id, + current_task.task_name, + current_task.task_param), + schema='test_var') + color_stdout("Last 15 lines of result file " + "[{0}]\n".format(current_task.task_result_filepath), + schema='error') + lib.utils.print_tail_n(current_task.task_result_filepath, + num_lines=15) + self.warned_seconds_ago = 0.0 + if self.inactivity < self.kill_timeout: return + color_stdout('\n[Main process] No output from workers. ' 'It seems that we hang. Send SIGKILL to workers; ' 'exiting...\n', schema='test_var') self.kill_all_workers() + raise HangError()