From 8b426ee595271ea8625af6062cfdbcb07fcf48fc Mon Sep 17 00:00:00 2001 From: Matej Cotman Date: Mon, 2 Aug 2021 07:32:41 +0000 Subject: [PATCH] feat(robotframework): add support for robotframework 4.x --- .github/workflows/test.yml | 6 +- src/oxygen/__init__.py | 8 +- src/oxygen/base_handler.py | 16 +- src/oxygen/robot3_interface.py | 365 +++++++++++++++++ src/oxygen/robot4_interface.py | 366 +++++++++++++++++ src/oxygen/robot_interface.py | 385 ++---------------- src/oxygen/utils.py | 15 +- test.nix | 2 +- tests/atest/oxygen_junit_tests.robot | 2 +- .../test_robot_interface_basic_usage.py | 46 ++- .../robot_interface/test_time_conversions.py | 2 +- 11 files changed, 814 insertions(+), 399 deletions(-) create mode 100644 src/oxygen/robot3_interface.py create mode 100644 src/oxygen/robot4_interface.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b6d8a2c..3380681 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: python: [python38, python39] - rfVersion: [3.1.2, 3.2, 3.2.1, 3.2.2] + rfVersion: ['3.1.2', '3.2', '3.2.1', '3.2.2', '4.0', '4.0.1', '4.0.2', '4.0.3', '4.1'] runs-on: windows-latest name: Windows (${{ matrix.python }}, robotframework-${{ matrix.rfVersion }}) defaults: @@ -62,7 +62,7 @@ jobs: fail-fast: false matrix: python: [python38, python39] - rfVersion: [3.1.2, 3.2, 3.2.1, 3.2.2] + rfVersion: ['3.1.2', '3.2', '3.2.1', '3.2.2', '4.0', '4.0.1', '4.0.2', '4.0.3', '4.1'] runs-on: ubuntu-latest name: Linux (${{ matrix.python }}, robotframework-${{ matrix.rfVersion }}) steps: @@ -80,7 +80,7 @@ jobs: fail-fast: false matrix: python: [python38, python39] - rfVersion: [3.1.2, 3.2, 3.2.1, 3.2.2] + rfVersion: ['3.1.2', '3.2', '3.2.1', '3.2.2', '4.0', '4.0.1', '4.0.2', '4.0.3', '4.1'] runs-on: macos-latest name: MacOS (${{ matrix.python }}, robotframework-${{ matrix.rfVersion }}) steps: diff --git a/src/oxygen/__init__.py b/src/oxygen/__init__.py index 55b6711..f7fb3d5 100644 --- a/src/oxygen/__init__.py +++ b/src/oxygen/__init__.py @@ -1,10 +1,6 @@ +from .version import VERSION from .base_handler import BaseHandler from .oxygen import listener, OxygenLibrary -from .robot_interface import RobotInterface -from .version import VERSION -__all__ = ['BaseHandler', 'listener', 'OxygenLibrary', 'RobotInterface'] +__all__ = ['BaseHandler', 'listener', 'OxygenLibrary'] __version__ = VERSION - - - diff --git a/src/oxygen/base_handler.py b/src/oxygen/base_handler.py index eb422c5..71122bf 100644 --- a/src/oxygen/base_handler.py +++ b/src/oxygen/base_handler.py @@ -1,6 +1,8 @@ import re -from .robot_interface import RobotInterface +from .robot_interface import (RobotInterface, get_keywords_from, + set_special_keyword) + class BaseHandler(object): DEFAULT_CLI = {tuple(['resultfile']): {}} @@ -43,7 +45,9 @@ def check_for_keyword(self, test, data): test: A Robot test ''' - for curr, keyword in enumerate(test.keywords): + test_keywords = get_keywords_from(test) + + for curr, keyword in enumerate(test_keywords): keyword_name = self._normalize_keyword_name(keyword.name) if not (keyword_name == self.keyword): continue @@ -52,8 +56,8 @@ def check_for_keyword(self, test, data): # ALL keywords, setup or not, preceding the trigger will be treated # as setup keywords later. Same goes for keywords succeeding the # trigger; they will become teardown keywords. - setup_keywords = test.keywords[:curr] - teardown_keywords = test.keywords[(curr+1):] + setup_keywords = test_keywords[:curr] + teardown_keywords = test_keywords[(curr+1):] self._report_oxygen_run(keyword, setup_keywords, teardown_keywords) @@ -107,10 +111,10 @@ def _build_results(self, keyword, setup_keyword, teardown_keyword): self._set_suite_tags(result_suite, *(self._tags + list(test.tags))) if setup_keyword: - result_suite.keywords.append(setup_keyword) + set_special_keyword(result_suite, 'setup', setup_keyword) if teardown_keyword: - result_suite.keywords.append(teardown_keyword) + set_special_keyword(result_suite, 'teardown', teardown_keyword) self._inject_suite_report(test, result_suite) diff --git a/src/oxygen/robot3_interface.py b/src/oxygen/robot3_interface.py new file mode 100644 index 0000000..22b0385 --- /dev/null +++ b/src/oxygen/robot3_interface.py @@ -0,0 +1,365 @@ +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from robot.result.model import (Keyword as RobotResultKeyword, + Message as RobotResultMessage, + TestCase as RobotResultTest, + TestSuite as RobotResultSuite) + +from robot.running.model import TestSuite as RobotRunningSuite + + +class RobotResultInterface(object): + def build_suites(self, starting_time, *suites): + '''Convert a given `suite` dictionaries into a Robot suite''' + finished_suites = [] + current_time = starting_time + + for suite in suites: + current_time, finished_suite = self.build_suite(current_time, suite) + + if finished_suite: + finished_suites.append(finished_suite) + + return current_time, finished_suites + + + def build_suite(self, starting_time, suite): + '''Convert a set of suite dicts into Robot suites and append them to the + given list-like object + + target: List-like object to add the Robot suites to + suites: Set of suite dicts to build into Robot suites + + Return: The updated list-like object + ''' + + if not suite: + return starting_time, None + + updated_time = starting_time + name = suite.get('name') or 'Unknown Suite Name' + tags = suite.get('tags') or [] + setup_keyword = suite.get('setup') or None + teardown_keyword = suite.get('teardown') or None + child_suites = suite.get('suites') or [] + tests = suite.get('tests') or [] + + updated_time, robot_setup = self.build_keyword(updated_time, setup_keyword, setup=True) + updated_time, robot_teardown = self.build_keyword(updated_time, teardown_keyword, teardown=True) + updated_time, robot_suites = self.build_suites(updated_time, *child_suites) + updated_time, robot_tests = self.build_tests(updated_time, *tests) + + robot_suite = self.spawn_robot_suite(name, + starting_time, + updated_time, + tags, + robot_setup, + robot_teardown, + robot_suites, + robot_tests) + + return updated_time, robot_suite + + + def spawn_robot_suite(self, + name, + start_time, + end_time, + tags, + setup_keyword, + teardown_keyword, + suites, + tests): + start_timestamp = self.ms_to_timestamp(start_time) + end_timestamp = self.ms_to_timestamp(end_time) + + robot_suite = RobotResultSuite(name, + starttime=start_timestamp, + endtime=end_timestamp) + robot_suite.set_tags(add=tags, persist=True) + + if setup_keyword: + robot_suite.keywords.append(setup_keyword) + if teardown_keyword: + robot_suite.keywords.append(teardown_keyword) + + for suite in filter(None, suites): + robot_suite.suites.append(suite) + + for test in filter(None, tests): + robot_suite.tests.append(test) + + return robot_suite + + + def build_tests(self, starting_time, *tests): + '''Convert a set of `tests` dicts and add to a Robot suite `target`''' + updated_time = starting_time + robot_tests = [] + for test in tests: + updated_time, robot_test = self.build_test(updated_time, test) + + if robot_test: + robot_tests.append(robot_test) + + return updated_time, robot_tests + + + def build_test(self, starting_time, test): + '''Convert a set of `tests` dicts and add to a Robot suite `target`''' + if not test: + return starting_time, None + + updated_time = starting_time + test_name = test.get('name') or 'Unknown Test Name' + tags = test.get('tags') or [] + setup_keyword = test.get('setup') or None + keywords = test.get('keywords') or [] + teardown_keyword = test.get('teardown') or None + + updated_time, robot_setup = self.build_keyword(updated_time, + setup_keyword, + setup=True) + updated_time, robot_keywords = self.build_keywords(updated_time, + *keywords) + updated_time, robot_teardown = self.build_keyword(updated_time, + teardown_keyword, + teardown=True) + + robot_test = self.spawn_robot_test(test_name, + starting_time, + updated_time, + tags, + robot_setup, + robot_teardown, + robot_keywords) + + return updated_time, robot_test + + + def spawn_robot_test(self, + name, + start_time, + end_time, + tags, + setup_keyword, + teardown_keyword, + keywords): + start_timestamp = self.ms_to_timestamp(start_time) + end_timestamp = self.ms_to_timestamp(end_time) + status = self.get_keywords_status(setup_keyword, teardown_keyword, *(keywords or [])) + + robot_test = RobotResultTest(name, + tags=tags, + status=status, + starttime=start_timestamp, + endtime=end_timestamp) + + if setup_keyword: + robot_test.keywords.append(setup_keyword) + for keyword in keywords: + if keyword: + robot_test.keywords.append(keyword) + if teardown_keyword: + robot_test.keywords.append(teardown_keyword) + + return robot_test + + + def build_keywords(self, starting_time, *keywords): + '''Convert `keywords` dicts, add them as sub-keywords to a `target`''' + updated_time = starting_time + robot_keywords = [] + for keyword in keywords: + updated_time, robot_keyword = self.build_keyword(updated_time, keyword) + + if robot_keyword: + robot_keywords.append(robot_keyword) + + return updated_time, robot_keywords + + + def build_keyword(self, starting_time, keyword, setup=False, teardown=False): + if not keyword: + return starting_time, None + + updated_time = starting_time + name = keyword.get('name') or 'Unknown Keyword Name' + status = keyword.get('pass') or None + elapsed = keyword.get('elapsed') or 0.0 + tags = keyword.get('tags') or [] + messages = keyword.get('messages') or [] + teardown = keyword.get('teardown') or None + keywords = keyword.get('keywords') or [] + + updated_time, robot_teardown = self.build_keyword(updated_time, teardown) + updated_time, robot_keywords = self.build_keywords(updated_time, keywords) + + final_time = updated_time + elapsed + + robot_keyword = self.spawn_robot_keyword(name, + tags, + status, + updated_time, + final_time, + teardown, + keywords, + messages, + setup, + teardown) + + return final_time, robot_keyword + + + def spawn_robot_keyword(self, + name, + tags, + status, + start_time, + end_time, + teardown_keyword, + keywords, + messages, + setup=False, + teardown=False): + start_timestamp = self.ms_to_timestamp(start_time) + end_timestamp = self.ms_to_timestamp(end_time) + + if setup: + keyword_type = RobotResultKeyword.SETUP_TYPE + elif teardown: + keyword_type = RobotResultKeyword.TEARDOWN_TYPE + else: + keyword_type = RobotResultKeyword.KEYWORD_TYPE + + if status is None: + keyword_status = 'NOT_RUN' + elif status: + keyword_status = 'PASS' + else: + keyword_status = 'FAIL' + + robot_keyword = RobotResultKeyword(name, + tags=tags, + status=keyword_status, + starttime=start_timestamp, + endtime=end_timestamp) + + robot_keyword.type = keyword_type + + for keyword in keywords: + if keyword: + robot_keyword.keywords.append(keyword) + + if teardown_keyword: + robot_keyword.keywords.append(teardown_keyword) + + for message in messages: + if message: + ishtml = message.startswith('*HTML*') + if ishtml: + message = message[6:] + robot_keyword.messages.append(RobotResultMessage(message,html=ishtml)) + + return robot_keyword + + + def get_time_format(self): + '''Convenience to return the general Robot timestamp format.''' + return '%Y%m%d %H:%M:%S.%f' + + + def timestamp_to_ms(self, timestamp): + time_format = self.get_time_format() + time_object = datetime.strptime( + timestamp, + time_format, + ) + + tz_delta = self.get_timezone_delta() + + milliseconds = ((time_object + tz_delta).timestamp() * 1000) + + return milliseconds + + + def ms_to_timestamp(self, milliseconds): + tz_delta = self.get_timezone_delta() + + time_object = datetime.fromtimestamp(int(milliseconds / 1000)) - tz_delta + milliseconds_delta = timedelta(milliseconds=(milliseconds % 1000)) + time_object = (time_object + milliseconds_delta) + + time_format = self.get_time_format() + + return time_object.strftime(time_format) + + + def get_timezone_delta(self): + local_zone = datetime.now(timezone.utc).astimezone().tzinfo + return local_zone.utcoffset(None) + + + def get_keywords_status(self, *keywords): + ''' + keywords: List of Robot keywords + + Return: 'PASS' or 'FAIL' + ''' + if sum(not kw.passed for kw in filter(None, keywords)): + return 'FAIL' + else: + return 'PASS' + + + def create_wrapper_keyword(self, + name, + start_timestamp, + end_timestamp, + setup, + *keywords): + status = self.get_keywords_status(*keywords) + start_time = self.timestamp_to_ms(start_timestamp) + end_time = self.timestamp_to_ms(end_timestamp) + + robot_keyword = self.spawn_robot_keyword(name, + [], + status, + start_time, + end_time, + None, + keywords, + [], + setup, + (not setup)) + + return robot_keyword + + +class RobotRunningInterface(object): + def build_suite(self, parsed_results): + robot_root_suite = RobotRunningSuite(parsed_results['name']) + for parsed_suite in parsed_results['suites']: + robot_suite = robot_root_suite.suites.create(parsed_suite['name']) + for subsuite in parsed_suite.get('suites', []): + robot_subsuite = self.build_suite(subsuite) + robot_suite.suites.append(robot_subsuite) + self.build_tests(parsed_suite, robot_suite) + self.build_tests(parsed_results, robot_root_suite) + return robot_root_suite + + def build_tests(self, oxygen_suite, robot_suite): + for parsed_test in oxygen_suite.get('tests', []): + name = parsed_test['name'] + tags = parsed_test['tags'] + kw = parsed_test['keywords'][0] + msg = '\n'.join(kw['messages']) + test_robot_counterpart = robot_suite.tests.create(name, tags=tags) + if kw['pass']: + args = [msg if msg else 'Test passed :D'] + test_robot_counterpart.keywords.create('Pass execution', + args=args) + else: + args = [msg if msg else 'Test failed D:'] + test_robot_counterpart.keywords.create('Fail', args=args) diff --git a/src/oxygen/robot4_interface.py b/src/oxygen/robot4_interface.py new file mode 100644 index 0000000..d1b0cce --- /dev/null +++ b/src/oxygen/robot4_interface.py @@ -0,0 +1,366 @@ +from datetime import datetime +from datetime import timedelta +from datetime import timezone +from robot.result.model import (Keyword as RobotResultKeyword, + TestCase as RobotResultTest, + TestSuite as RobotResultSuite) + +from robot.model import BodyItem + +from robot.running.model import TestSuite as RobotRunningSuite + + +class RobotResultInterface(object): + def build_suites(self, starting_time, *suites): + '''Convert a given `suite` dictionaries into a Robot suite''' + finished_suites = [] + current_time = starting_time + + for suite in suites: + current_time, finished_suite = self.build_suite(current_time, suite) + + if finished_suite: + finished_suites.append(finished_suite) + + return current_time, finished_suites + + + def build_suite(self, starting_time, suite): + '''Convert a set of suite dicts into Robot suites and append them to the + given list-like object + + target: List-like object to add the Robot suites to + suites: Set of suite dicts to build into Robot suites + + Return: The updated list-like object + ''' + + if not suite: + return starting_time, None + + updated_time = starting_time + name = suite.get('name') or 'Unknown Suite Name' + tags = suite.get('tags') or [] + setup_keyword = suite.get('setup') or None + teardown_keyword = suite.get('teardown') or None + child_suites = suite.get('suites') or [] + tests = suite.get('tests') or [] + + updated_time, robot_setup = self.build_keyword(updated_time, setup_keyword, setup=True) + updated_time, robot_teardown = self.build_keyword(updated_time, teardown_keyword, teardown=True) + updated_time, robot_suites = self.build_suites(updated_time, *child_suites) + updated_time, robot_tests = self.build_tests(updated_time, *tests) + + robot_suite = self.spawn_robot_suite(name, + starting_time, + updated_time, + tags, + robot_setup, + robot_teardown, + robot_suites, + robot_tests) + + return updated_time, robot_suite + + + def spawn_robot_suite(self, + name, + start_time, + end_time, + tags, + setup_keyword, + teardown_keyword, + suites, + tests): + start_timestamp = self.ms_to_timestamp(start_time) + end_timestamp = self.ms_to_timestamp(end_time) + + robot_suite = RobotResultSuite(name, + starttime=start_timestamp, + endtime=end_timestamp) + robot_suite.set_tags(add=tags, persist=True) + + if setup_keyword: + robot_suite.setup = setup_keyword + if teardown_keyword: + robot_suite.teardown = teardown_keyword + + for suite in filter(None, suites): + robot_suite.suites.append(suite) + + for test in filter(None, tests): + robot_suite.tests.append(test) + + return robot_suite + + + def build_tests(self, starting_time, *tests): + '''Convert a set of `tests` dicts and add to a Robot suite `target`''' + updated_time = starting_time + robot_tests = [] + for test in tests: + updated_time, robot_test = self.build_test(updated_time, test) + + if robot_test: + robot_tests.append(robot_test) + + return updated_time, robot_tests + + + def build_test(self, starting_time, test): + '''Convert a set of `tests` dicts and add to a Robot suite `target`''' + if not test: + return starting_time, None + + updated_time = starting_time + test_name = test.get('name') or 'Unknown Test Name' + tags = test.get('tags') or [] + setup_keyword = test.get('setup') or None + keywords = test.get('keywords') or [] + teardown_keyword = test.get('teardown') or None + + updated_time, robot_setup = self.build_keyword(updated_time, + setup_keyword, + setup=True) + updated_time, robot_keywords = self.build_keywords(updated_time, + *keywords) + updated_time, robot_teardown = self.build_keyword(updated_time, + teardown_keyword, + teardown=True) + + robot_test = self.spawn_robot_test(test_name, + starting_time, + updated_time, + tags, + robot_setup, + robot_teardown, + robot_keywords) + + return updated_time, robot_test + + + def spawn_robot_test(self, + name, + start_time, + end_time, + tags, + setup_keyword, + teardown_keyword, + keywords): + start_timestamp = self.ms_to_timestamp(start_time) + end_timestamp = self.ms_to_timestamp(end_time) + status = self.get_keywords_status(setup_keyword, teardown_keyword, *(keywords or [])) + + robot_test = RobotResultTest(name, + tags=tags, + status=status, + starttime=start_timestamp, + endtime=end_timestamp) + + if setup_keyword: + robot_test.setup = setup_keyword + for keyword in keywords: + if keyword: + robot_test.body.append(keyword) + if teardown_keyword: + robot_test.teardown = teardown_keyword + + return robot_test + + + def build_keywords(self, starting_time, *keywords): + '''Convert `keywords` dicts, add them as sub-keywords to a `target`''' + updated_time = starting_time + robot_keywords = [] + for keyword in keywords: + updated_time, robot_keyword = self.build_keyword(updated_time, keyword) + + if robot_keyword: + robot_keywords.append(robot_keyword) + + return updated_time, robot_keywords + + + def build_keyword(self, starting_time, keyword, setup=False, teardown=False): + if not keyword: + return starting_time, None + + updated_time = starting_time + name = keyword.get('name') or 'Unknown Keyword Name' + status = keyword.get('pass') or None + elapsed = keyword.get('elapsed') or 0.0 + tags = keyword.get('tags') or [] + messages = keyword.get('messages') or [] + teardown = keyword.get('teardown') or None + keywords = keyword.get('keywords') or [] + + updated_time, robot_teardown = self.build_keyword(updated_time, teardown) + updated_time, robot_keywords = self.build_keywords(updated_time, keywords) + + final_time = updated_time + elapsed + + robot_keyword = self.spawn_robot_keyword(name, + tags, + status, + updated_time, + final_time, + teardown, + keywords, + messages, + setup, + teardown) + + return final_time, robot_keyword + + + def spawn_robot_keyword(self, + name, + tags, + status, + start_time, + end_time, + teardown_keyword, + keywords, + messages, + setup=False, + teardown=False): + start_timestamp = self.ms_to_timestamp(start_time) + end_timestamp = self.ms_to_timestamp(end_time) + + if setup: + keyword_type = BodyItem.SETUP + elif teardown: + keyword_type = BodyItem.TEARDOWN + else: + keyword_type = BodyItem.KEYWORD + + if status is None: + keyword_status = 'NOT RUN' + elif status: + keyword_status = 'PASS' + else: + keyword_status = 'FAIL' + + robot_keyword = RobotResultKeyword(name, + tags=tags, + status=keyword_status, + starttime=start_timestamp, + endtime=end_timestamp) + + robot_keyword.type = keyword_type + + for keyword in keywords: + if keyword: + robot_keyword.body.append(keyword) + + if teardown_keyword: + robot_keyword.teardown = teardown_keyword + + for message in messages: + if message: + ishtml = message.startswith('*HTML*') + if ishtml: + message = message[6:] + robot_keyword.body.create_message(message, html=ishtml) + + return robot_keyword + + + def get_time_format(self): + '''Convenience to return the general Robot timestamp format.''' + return '%Y%m%d %H:%M:%S.%f' + + + def timestamp_to_ms(self, timestamp): + time_format = self.get_time_format() + time_object = datetime.strptime( + timestamp, + time_format, + ) + + tz_delta = self.get_timezone_delta() + + milliseconds = ((time_object + tz_delta).timestamp() * 1000) + + return milliseconds + + + def ms_to_timestamp(self, milliseconds): + tz_delta = self.get_timezone_delta() + + time_object = datetime.fromtimestamp(int(milliseconds / 1000)) - tz_delta + milliseconds_delta = timedelta(milliseconds=(milliseconds % 1000)) + time_object = (time_object + milliseconds_delta) + + time_format = self.get_time_format() + + return time_object.strftime(time_format) + + + def get_timezone_delta(self): + local_zone = datetime.now(timezone.utc).astimezone().tzinfo + return local_zone.utcoffset(None) + + + def get_keywords_status(self, *keywords): + ''' + keywords: List of Robot keywords + + Return: 'PASS' or 'FAIL' + ''' + if sum(not kw.passed for kw in filter(None, keywords)): + return 'FAIL' + else: + return 'PASS' + + + def create_wrapper_keyword(self, + name, + start_timestamp, + end_timestamp, + setup, + *keywords): + status = self.get_keywords_status(*keywords) + start_time = self.timestamp_to_ms(start_timestamp) + end_time = self.timestamp_to_ms(end_timestamp) + + robot_keyword = self.spawn_robot_keyword(name, + [], + status, + start_time, + end_time, + None, + keywords, + [], + setup, + (not setup)) + + return robot_keyword + + +class RobotRunningInterface(object): + def build_suite(self, parsed_results): + robot_root_suite = RobotRunningSuite(parsed_results['name']) + for parsed_suite in parsed_results['suites']: + robot_suite = robot_root_suite.suites.create(parsed_suite['name']) + for subsuite in parsed_suite.get('suites', []): + robot_subsuite = self.build_suite(subsuite) + robot_suite.suites.append(robot_subsuite) + self.build_tests(parsed_suite, robot_suite) + self.build_tests(parsed_results, robot_root_suite) + return robot_root_suite + + def build_tests(self, oxygen_suite, robot_suite): + for parsed_test in oxygen_suite.get('tests', []): + name = parsed_test['name'] + tags = parsed_test['tags'] + kw = parsed_test['keywords'][0] + msg = '\n'.join(kw['messages']) + test_robot_counterpart = robot_suite.tests.create(name, tags=tags) + if kw['pass']: + args = [msg if msg else 'Test passed :D'] + test_robot_counterpart.body.create_keyword('Pass execution', + args=args) + else: + args = [msg if msg else 'Test failed D:'] + test_robot_counterpart.body.create_keyword('Fail', args=args) diff --git a/src/oxygen/robot_interface.py b/src/oxygen/robot_interface.py index 3722abc..58bb6f2 100644 --- a/src/oxygen/robot_interface.py +++ b/src/oxygen/robot_interface.py @@ -1,371 +1,32 @@ -from datetime import datetime -from datetime import timedelta -from datetime import timezone -from robot.result.model import (Keyword as RobotResultKeyword, - Message as RobotResultMessage, - TestCase as RobotResultTest, - TestSuite as RobotResultSuite) +from robot.version import get_version as robot_version -from robot.running.model import TestSuite as RobotRunningSuite +class RobotInterface(object): + def __init__(self): + major_version = int(robot_version().split('.')[0]) -class RobotResultInterface(object): - def build_suites(self, starting_time, *suites): - '''Convert a given `suite` dictionaries into a Robot suite''' - finished_suites = [] - current_time = starting_time - - for suite in suites: - current_time, finished_suite = self.build_suite(current_time, suite) - - if finished_suite: - finished_suites.append(finished_suite) - - return current_time, finished_suites - - - def build_suite(self, starting_time, suite): - '''Convert a set of suite dicts into Robot suites and append them to the - given list-like object - - target: List-like object to add the Robot suites to - suites: Set of suite dicts to build into Robot suites - - Return: The updated list-like object - ''' - - if not suite: - return starting_time, None - - updated_time = starting_time - name = suite.get('name') or 'Unknown Suite Name' - tags = suite.get('tags') or [] - setup_keyword = suite.get('setup') or None - teardown_keyword = suite.get('teardown') or None - child_suites = suite.get('suites') or [] - tests = suite.get('tests') or [] - - updated_time, robot_setup = self.build_keyword(updated_time, setup_keyword, setup=True) - updated_time, robot_teardown = self.build_keyword(updated_time, teardown_keyword, teardown=True) - updated_time, robot_suites = self.build_suites(updated_time, *child_suites) - updated_time, robot_tests = self.build_tests(updated_time, *tests) - - robot_suite = self.spawn_robot_suite(name, - starting_time, - updated_time, - tags, - robot_setup, - robot_teardown, - robot_suites, - robot_tests) - - return updated_time, robot_suite - - - def spawn_robot_suite(self, - name, - start_time, - end_time, - tags, - setup_keyword, - teardown_keyword, - suites, - tests): - start_timestamp = self.ms_to_timestamp(start_time) - end_timestamp = self.ms_to_timestamp(end_time) - - robot_suite = RobotResultSuite(name, - starttime=start_timestamp, - endtime=end_timestamp) - robot_suite.set_tags(add=tags, persist=True) - - if setup_keyword: - robot_suite.keywords.append(setup_keyword) - if teardown_keyword: - robot_suite.keywords.append(teardown_keyword) - - for suite in filter(None, suites): - robot_suite.suites.append(suite) - - for test in filter(None, tests): - robot_suite.tests.append(test) - - return robot_suite - - - def build_tests(self, starting_time, *tests): - '''Convert a set of `tests` dicts and add to a Robot suite `target`''' - updated_time = starting_time - robot_tests = [] - for test in tests: - updated_time, robot_test = self.build_test(updated_time, test) - - if robot_test: - robot_tests.append(robot_test) - - return updated_time, robot_tests - - - def build_test(self, starting_time, test): - '''Convert a set of `tests` dicts and add to a Robot suite `target`''' - if not test: - return starting_time, None - - updated_time = starting_time - test_name = test.get('name') or 'Unknown Test Name' - tags = test.get('tags') or [] - setup_keyword = test.get('setup') or None - keywords = test.get('keywords') or [] - teardown_keyword = test.get('teardown') or None - - updated_time, robot_setup = self.build_keyword(updated_time, - setup_keyword, - setup=True) - updated_time, robot_keywords = self.build_keywords(updated_time, - *keywords) - updated_time, robot_teardown = self.build_keyword(updated_time, - teardown_keyword, - teardown=True) - - robot_test = self.spawn_robot_test(test_name, - starting_time, - updated_time, - tags, - robot_setup, - robot_teardown, - robot_keywords) - - return updated_time, robot_test - - - def spawn_robot_test(self, - name, - start_time, - end_time, - tags, - setup_keyword, - teardown_keyword, - keywords): - start_timestamp = self.ms_to_timestamp(start_time) - end_timestamp = self.ms_to_timestamp(end_time) - status = self.get_keywords_status(setup_keyword, teardown_keyword, *(keywords or [])) - - robot_test = RobotResultTest(name, - tags=tags, - status=status, - starttime=start_timestamp, - endtime=end_timestamp) - - if setup_keyword: - robot_test.keywords.append(setup_keyword) - for keyword in keywords: - if keyword: - robot_test.keywords.append(keyword) - if teardown_keyword: - robot_test.keywords.append(teardown_keyword) - - return robot_test - - - def build_keywords(self, starting_time, *keywords): - '''Convert `keywords` dicts, add them as sub-keywords to a `target`''' - updated_time = starting_time - robot_keywords = [] - for keyword in keywords: - updated_time, robot_keyword = self.build_keyword(updated_time, keyword) - - if robot_keyword: - robot_keywords.append(robot_keyword) - - return updated_time, robot_keywords - - - def build_keyword(self, starting_time, keyword, setup=False, teardown=False): - if not keyword: - return starting_time, None - - updated_time = starting_time - name = keyword.get('name') or 'Unknown Keyword Name' - status = keyword.get('pass') or None - elapsed = keyword.get('elapsed') or 0.0 - tags = keyword.get('tags') or [] - messages = keyword.get('messages') or [] - teardown = keyword.get('teardown') or None - keywords = keyword.get('keywords') or [] - - updated_time, robot_teardown = self.build_keyword(updated_time, teardown) - updated_time, robot_keywords = self.build_keywords(updated_time, keywords) - - final_time = updated_time + elapsed - - robot_keyword = self.spawn_robot_keyword(name, - tags, - status, - updated_time, - final_time, - teardown, - keywords, - messages, - setup, - teardown) - - return final_time, robot_keyword - - - def spawn_robot_keyword(self, - name, - tags, - status, - start_time, - end_time, - teardown_keyword, - keywords, - messages, - setup=False, - teardown=False): - start_timestamp = self.ms_to_timestamp(start_time) - end_timestamp = self.ms_to_timestamp(end_time) - - if setup: - keyword_type = RobotResultKeyword.SETUP_TYPE - elif teardown: - keyword_type = RobotResultKeyword.TEARDOWN_TYPE - else: - keyword_type = RobotResultKeyword.KEYWORD_TYPE - - if status is None: - keyword_status = 'NOT_RUN' - elif status: - keyword_status = 'PASS' - else: - keyword_status = 'FAIL' - - robot_keyword = RobotResultKeyword(name, - tags=tags, - status=keyword_status, - starttime=start_timestamp, - endtime=end_timestamp) - - robot_keyword.type = keyword_type - - for keyword in keywords: - if keyword: - robot_keyword.keywords.append(keyword) - - if teardown_keyword: - robot_keyword.keywords.append(teardown_keyword) - - for message in messages: - if message: - ishtml = message.startswith('*HTML*') - if ishtml: - message = message[6:] - robot_keyword.messages.append(RobotResultMessage(message,html=ishtml)) - - return robot_keyword - - - def get_time_format(self): - '''Convenience to return the general Robot timestamp format.''' - return '%Y%m%d %H:%M:%S.%f' - - - def timestamp_to_ms(self, timestamp): - time_format = self.get_time_format() - time_object = datetime.strptime( - timestamp, - time_format, - ) - - tz_delta = self.get_timezone_delta() - - milliseconds = ((time_object + tz_delta).timestamp() * 1000) - - return milliseconds - - - def ms_to_timestamp(self, milliseconds): - tz_delta = self.get_timezone_delta() - - time_object = datetime.fromtimestamp(int(milliseconds / 1000)) - tz_delta - milliseconds_delta = timedelta(milliseconds=(milliseconds % 1000)) - time_object = (time_object + milliseconds_delta) - - time_format = self.get_time_format() - - return time_object.strftime(time_format) - - - def get_timezone_delta(self): - local_zone = datetime.now(timezone.utc).astimezone().tzinfo - return local_zone.utcoffset(None) - - - def get_keywords_status(self, *keywords): - ''' - keywords: List of Robot keywords - - Return: 'PASS' or 'FAIL' - ''' - if sum(not kw.passed for kw in filter(None, keywords)): - return 'FAIL' + if major_version > 3: + from .robot4_interface import (RobotResultInterface, + RobotRunningInterface) else: - return 'PASS' - - - def create_wrapper_keyword(self, - name, - start_timestamp, - end_timestamp, - setup, - *keywords): - status = self.get_keywords_status(*keywords) - start_time = self.timestamp_to_ms(start_timestamp) - end_time = self.timestamp_to_ms(end_timestamp) - - robot_keyword = self.spawn_robot_keyword(name, - [], - status, - start_time, - end_time, - None, - keywords, - [], - setup, - (not setup)) - - return robot_keyword + from .robot3_interface import (RobotResultInterface, + RobotRunningInterface) + self.result = RobotResultInterface() + self.running = RobotRunningInterface() -class RobotRunningInterface(object): - def build_suite(self, parsed_results): - robot_root_suite = RobotRunningSuite(parsed_results['name']) - for parsed_suite in parsed_results['suites']: - robot_suite = robot_root_suite.suites.create(parsed_suite['name']) - for subsuite in parsed_suite.get('suites', []): - robot_subsuite = self.build_suite(subsuite) - robot_suite.suites.append(robot_subsuite) - self.build_tests(parsed_suite, robot_suite) - self.build_tests(parsed_results, robot_root_suite) - return robot_root_suite - def build_tests(self, oxygen_suite, robot_suite): - for parsed_test in oxygen_suite.get('tests', []): - name = parsed_test['name'] - tags = parsed_test['tags'] - kw = parsed_test['keywords'][0] - msg = '\n'.join(kw['messages']) - test_robot_counterpart = robot_suite.tests.create(name, tags=tags) - if kw['pass']: - args = [msg if msg else 'Test passed :D'] - test_robot_counterpart.keywords.create('Pass execution', - args=args) - else: - args = [msg if msg else 'Test failed D:'] - test_robot_counterpart.keywords.create('Fail', args=args) +def get_keywords_from(test): + if hasattr(test, 'body'): + return test.body.filter(keywords=True) + return test.keywords -class RobotInterface(object): - def __init__(self): - self.result = RobotResultInterface() - self.running = RobotRunningInterface() +def set_special_keyword(suite, keyword_type, keyword): + if hasattr(suite, keyword_type): + if keyword_type == 'setup': + suite.setup = keyword + elif keyword_type == 'teardown': + suite.teardown = keyword + else: + suite.keywords.append(keyword) diff --git a/src/oxygen/utils.py b/src/oxygen/utils.py index 0a62d5b..982bd79 100644 --- a/src/oxygen/utils.py +++ b/src/oxygen/utils.py @@ -1,5 +1,6 @@ import os import subprocess +import sys from pathlib import Path @@ -7,23 +8,31 @@ ResultFileIsNotAFileException, ResultFileNotFoundException) + def run_command_line(command, check_return_code=True, **env): new_env = os.environ.copy() + # When user uses 'robot --pythonpath' we need to update PYTHONPATH in the + # subprocess + updated_pythonpath = {'PYTHONPATH': os.pathsep.join([new_env.get( + 'PYTHONPATH', '')] + sys.path)} + new_env.update(updated_pythonpath) new_env.update(env) + try: proc = subprocess.run(command, capture_output=True, env=new_env, shell=True) - except IndexError as e: + except IndexError: raise SubprocessException('Command "{}" was empty'.format(command)) if check_return_code and proc.returncode != 0: raise SubprocessException(f'Command "{command}" failed with return ' f'code {proc.returncode}:\n"{proc.stdout.decode("utf-8")}"') return proc.stdout + def validate_path(filepath): path = Path(filepath) if not path.exists(): raise ResultFileNotFoundException(f'File "{path}" does not exits') if path.is_dir(): - raise ResultFileIsNotAFileException(f'File "{path}" is not a file, ' - 'but a directory') + raise ResultFileIsNotAFileException(f'File "{path}" is not a file, ' + 'but a directory') return path diff --git a/test.nix b/test.nix index 1c0599b..b2d59a3 100644 --- a/test.nix +++ b/test.nix @@ -31,7 +31,7 @@ { nixpkgsBranch ? "release-21.05" , nixpkgs ? "https://github.com/NixOS/nixpkgs/archive/refs/heads/${nixpkgsBranch}.tar.gz" , pythons ? "python38 python39" -, rfVersions ? "3.1.2 3.2 3.2.1 3.2.2" +, rfVersions ? "3.1.2 3.2 3.2.1 3.2.2 4.0 4.0.1 4.0.2 4.0.3 4.1" , path ? toString ./. , cmd ? "invoke test --in-nix" }: let diff --git a/tests/atest/oxygen_junit_tests.robot b/tests/atest/oxygen_junit_tests.robot index 61efdc3..2cefb43 100644 --- a/tests/atest/oxygen_junit_tests.robot +++ b/tests/atest/oxygen_junit_tests.robot @@ -19,7 +19,7 @@ Oxygen's unit tests should pass ${green}= Get command green Run JUnit ${JUNIT XML FILE} ... ${green} -j ${JUNIT XML FILE} ${EXECDIR} - ... PYTHONPATH=${EXECDIR}/src + File should exist ${JUNIT XML FILE} *** Keywords *** Get command diff --git a/tests/utest/robot_interface/test_robot_interface_basic_usage.py b/tests/utest/robot_interface/test_robot_interface_basic_usage.py index 2ce5eb1..b98a7ce 100644 --- a/tests/utest/robot_interface/test_robot_interface_basic_usage.py +++ b/tests/utest/robot_interface/test_robot_interface_basic_usage.py @@ -8,7 +8,7 @@ from robot.running.model import TestSuite as RobotRunningSuite -from oxygen import RobotInterface +from oxygen.robot_interface import RobotInterface, get_keywords_from EXAMPLE_SUITES = [{ 'name': 'suite1', @@ -143,11 +143,12 @@ }] -class RobotInterfaceBasicTests(TestCase): +class RobotInterfaceBasicTests(TestCase): ''' This tests only two methods of RobotInterface, but since they internally use all the other functions, pretty much everything is covered. ''' + def setUp(self): self.iface = RobotInterface() @@ -162,21 +163,21 @@ def test_result_build_suites(self): self.assertIsInstance(converted_suite, RobotSuite) for subsuite in converted[0].suites: - self.assertIsInstance(converted_suite, RobotSuite) + self.assertIsInstance(subsuite, RobotSuite) for test in converted[1].tests: self.assertIsInstance(test, RobotTest) - for kw in converted_suite.tests[1].keywords: + for kw in get_keywords_from(converted[0].tests[1]): self.assertIsInstance(kw, RobotKeyword) - for message in converted_suite.tests[1].keywords[0].messages: + for message in get_keywords_from(converted[0].tests[1])[0].messages: self.assertIsInstance(message, RobotMessage) - self.assertEqual(converted[0].tests[3].keywords[0].messages[0].html, True) - self.assertEqual(converted[0].tests[3].keywords[0].messages[0].message, ' Robot Framework') - self.assertEqual(converted[0].tests[2].keywords[0].messages[0].message,'FAIL: Example failure message ' - '(the_failure_type)' ) - self.assertEqual(converted[0].tests[2].keywords[0].messages[0].html, False) + self.assertEqual(get_keywords_from(converted[0].tests[3])[0].messages[0].html, True) + self.assertEqual(get_keywords_from(converted[0].tests[3])[0].messages[0].message, ' Robot Framework') + self.assertEqual(get_keywords_from(converted[0].tests[2])[0].messages[0].message,'FAIL: Example failure message ' + '(the_failure_type)') + self.assertEqual(get_keywords_from(converted[0].tests[2])[0].messages[0].html, False) def test_result_create_wrapper_keyword_for_setup(self): ret = self.iface.result.create_wrapper_keyword('My Wrapper', @@ -188,17 +189,30 @@ def test_result_create_wrapper_keyword_for_setup(self): self.assertIsInstance(ret, RobotKeyword) self.assertEqual(ret.name, 'My Wrapper') - self.assertEqual(len(ret.keywords), 2) - self.assertEqual(ret.type, ret.SETUP_TYPE) - - def test_result_create_wrapper_keyword_for_setup(self): + self.assertEqual(len(get_keywords_from(ret)), 2) + try: + # Robot Framework < 4.0 + from robot.result.model import Keyword + self.assertEqual(ret.type, Keyword.SETUP_TYPE) + except AttributeError: + # Robot Framework >= 4.0 + from robot.model import BodyItem + self.assertEqual(ret.type, BodyItem.SETUP) + + def test_result_create_wrapper_keyword_for_teardown(self): ret = self.iface.result.create_wrapper_keyword('My Wrapper', '20200507 13:42:50.001', '20200507 14:59:01.999', False, RobotKeyword()) - - self.assertEqual(ret.type, ret.TEARDOWN_TYPE) + try: + # Robot Framework < 4.0 + from robot.result.model import Keyword + self.assertEqual(ret.type, Keyword.TEARDOWN_TYPE) + except AttributeError: + # Robot Framework >= 4.0 + from robot.model import BodyItem + self.assertEqual(ret.type, BodyItem.TEARDOWN) def test_running_build_suite(self): ret = self.iface.running.build_suite(EXAMPLE_SUITES[1]) diff --git a/tests/utest/robot_interface/test_time_conversions.py b/tests/utest/robot_interface/test_time_conversions.py index bdae843..2d96e3d 100644 --- a/tests/utest/robot_interface/test_time_conversions.py +++ b/tests/utest/robot_interface/test_time_conversions.py @@ -1,6 +1,6 @@ from unittest import TestCase -from oxygen import RobotInterface +from oxygen.robot_interface import RobotInterface class TestMsToTimestamp(TestCase):