From 4fc2c19ba4227755e2694a9f0721e084ee5a15e7 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 6 Dec 2021 15:34:06 +0200 Subject: [PATCH 1/5] Modify delegations traversal loop condition Reduce the number of variables in the while loop by using len(visited_role_names) instead of number_of_delegations. Include equality in the comparison with config.max_delegations to account for visiting "targets". Shorten the commit message. Add max number of delegations test case. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 13 +++++++++++++ tuf/ngclient/updater.py | 16 +++++++--------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index f5003e1bb0..4a7d5c5930 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -226,6 +226,17 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None: # 'C' is reached through 'B' since 'A' does not delegate a matching pattern" visited_order=["A", "B", "C"], ), + "max number of delegations": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("targets", "C"), + TestDelegation("C", "D"), + TestDelegation("C", "E"), + ], + # "E" is skipped, max_delegations is 4 + visited_order=["A", "B", "C", "D"], + ), } @utils.run_sub_tests_with_dataset(graphs) @@ -239,6 +250,8 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: sim = self.setup_subtest(test_data.delegations) updater = self._init_updater(sim) + # restrict the max number of delegations to simplify the test + updater.config.max_delegations = 4 # Call explicitly refresh to simplify the expected_calls list updater.refresh() sim.fetch_tracker.metadata.clear() diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 9aa837f287..8a264e736f 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -423,10 +423,12 @@ def _preorder_depth_first_walk( # is needed to load and verify the delegated targets metadata. delegations_to_visit = [(Targets.type, Root.type)] visited_role_names: Set[str] = set() - number_of_delegations = self.config.max_delegations # Preorder depth-first traversal of the graph of target delegations. - while number_of_delegations > 0 and len(delegations_to_visit) > 0: + while ( + len(visited_role_names) <= self.config.max_delegations + and len(delegations_to_visit) > 0 + ): # Pop the role name from the top of the stack. role_name, parent_role = delegations_to_visit.pop(-1) @@ -449,9 +451,6 @@ def _preorder_depth_first_walk( # After preorder check, add current role to set of visited roles. visited_role_names.add(role_name) - # And also decrement number of visited roles. - number_of_delegations -= 1 - if targets.delegations is not None: child_roles_to_visit = [] # NOTE: This may be a slow operation if there are many @@ -464,7 +463,7 @@ def _preorder_depth_first_walk( (child_role.name, role_name) ) if child_role.terminating: - logger.debug("Not backtracking to other roles.") + logger.debug("Not backtracking to other roles") delegations_to_visit = [] break # Push 'child_roles_to_visit' in reverse order of appearance @@ -473,10 +472,9 @@ def _preorder_depth_first_walk( child_roles_to_visit.reverse() delegations_to_visit.extend(child_roles_to_visit) - if number_of_delegations == 0 and len(delegations_to_visit) > 0: + if len(delegations_to_visit) > 0: logger.debug( - "%d roles left to visit, but allowed to " - "visit at most %d delegations.", + "%d roles left to visit, but allowed at most %d delegations", len(delegations_to_visit), self.config.max_delegations, ) From 6fa5d3ddd2fe30d587ea3a8e88e5efe5cd574345 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Tue, 7 Dec 2021 14:15:52 +0200 Subject: [PATCH 2/5] Add TestTargetFileSearch class Extend test_updater_delegation_graphs.py with tests for targets metadata search. - create a new test class TestTargetFileSearch which creates a single repository and pefrorms multiple file searches in subtests. - group the common functionality in a base class TestDelegations. - extend the data classes to accomodate for target_files. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 185 +++++++++++++++++------- 1 file changed, 135 insertions(+), 50 deletions(-) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index 4a7d5c5930..3a983bc135 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -3,15 +3,15 @@ # Copyright 2021, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 -"""Test updating delegated targets roles with various -delegation hierarchies""" +"""Test updating delegated targets roles and searching for +target files with various delegation graphs""" import os import sys import tempfile import unittest from dataclasses import astuple, dataclass, field -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Tuple from tests import utils from tests.repository_simulator import RepositorySimulator @@ -35,18 +35,25 @@ class TestDelegation: path_hash_prefixes: Optional[List[str]] = None +@dataclass +class TestTarget: + rolename: str + content: bytes + targetpath: str + + @dataclass class DelegationsTestCase: - """Describes a delegations graph as a list of delegations - and the expected order of traversal as 'visited_order'.""" + """A delegations graph as lists of delegations and target files + and the expected order of traversal as a list of role names.""" delegations: List[TestDelegation] - visited_order: List[str] + target_files: List[TestTarget] = field(default_factory=list) + visited_order: List[str] = field(default_factory=list) -class TestDelegationsGraphs(unittest.TestCase): - """Test creating delegations graphs with different complexity - and successfully updating the delegated roles metadata""" +class TestDelegations(unittest.TestCase): + """Base class for delegation tests""" # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None @@ -59,63 +66,61 @@ def setUp(self) -> None: self.targets_dir = os.path.join(self.temp_dir.name, "targets") os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) + self.sim: RepositorySimulator def tearDown(self) -> None: self.temp_dir.cleanup() - def setup_subtest( - self, delegations: List[TestDelegation] - ) -> RepositorySimulator: - sim = self._init_repo(delegations) - + def setup_subtest(self) -> None: self.subtest_count += 1 if self.dump_dir is not None: # create subtest dumpdir name = f"{self.id().split('.')[-1]}-{self.subtest_count}" - sim.dump_dir = os.path.join(self.dump_dir, name) - os.mkdir(sim.dump_dir) + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) # dump the repo simulator metadata - sim.write() - - return sim + self.sim.write() def teardown_subtest(self) -> None: - # clean up after each subtest utils.cleanup_dir(self.metadata_dir) - def _init_updater(self, sim: RepositorySimulator) -> Updater: - """Create a new Updater instance""" - return Updater( - self.metadata_dir, - "https://example.com/metadata/", - self.targets_dir, - "https://example.com/targets/", - sim, - ) + def _init_repo(self, test_case: DelegationsTestCase) -> None: + """Create a new RepositorySimulator instance and + populate it with delegations and target files""" - def _init_repo( - self, delegations: List[TestDelegation] - ) -> RepositorySimulator: - """Create a new RepositorySimulator instance with 'delegations'""" - sim = RepositorySimulator() + self.sim = RepositorySimulator() spec_version = ".".join(SPECIFICATION_VERSION) - - for d in delegations: - if d.rolename in sim.md_delegates: - targets = sim.md_delegates[d.rolename].signed + for d in test_case.delegations: + if d.rolename in self.sim.md_delegates: + targets = self.sim.md_delegates[d.rolename].signed else: - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) - + targets = Targets( + 1, spec_version, self.sim.safe_expiry, {}, None + ) # unpack 'd' but skip "delegator" role = DelegatedRole(*astuple(d)[1:]) - sim.add_delegation(d.delegator, role, targets) - sim.update_snapshot() + self.sim.add_delegation(d.delegator, role, targets) + + for target in test_case.target_files: + self.sim.add_target(*astuple(target)) + if test_case.target_files: + self.sim.targets.version += 1 + self.sim.update_snapshot() + + def _init_updater(self) -> Updater: + """Create a new Updater instance""" # Init trusted root for Updater with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: - f.write(sim.signed_roots[0]) + f.write(self.sim.signed_roots[0]) - return sim + return Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + ) def _assert_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" @@ -123,6 +128,11 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None: local_metadata_files = sorted(os.listdir(self.metadata_dir)) self.assertListEqual(local_metadata_files, expected_files) + +class TestDelegationsGraphs(TestDelegations): + """Test creating delegations graphs with different complexity + and successfully updating the delegated roles metadata""" + graphs: utils.DataSet = { "basic delegation": DelegationsTestCase( delegations=[TestDelegation("targets", "A")], @@ -248,13 +258,15 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] exp_calls = [(role, 1) for role in test_data.visited_order] - sim = self.setup_subtest(test_data.delegations) - updater = self._init_updater(sim) + self._init_repo(test_data) + self.setup_subtest() + + updater = self._init_updater() # restrict the max number of delegations to simplify the test updater.config.max_delegations = 4 # Call explicitly refresh to simplify the expected_calls list updater.refresh() - sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) @@ -264,7 +276,80 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: self.assertIsNone(targetfile) # Check that the delegated roles were visited in the expected # order and the corresponding metadata files were persisted - self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + finally: + self.teardown_subtest() + + +class TestTargetFileSearch(TestDelegations): + """ + Create a single repository with the following delegations: + + targets + *.doc, *md / \ release/*/* + A B + release/x/* / \ release/y/*.zip + C D + + Test that Updater successfully finds the target files metadata, + traversing the delegations as expected. + """ + + delegations_tree = DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A", paths=["*.doc", "*.md"]), + TestDelegation("targets", "B", paths=["releases/*/*"]), + TestDelegation("B", "C", paths=["releases/x/*"]), + TestDelegation("B", "D", paths=["releases/y/*.zip"]), + ], + target_files=[ + TestTarget("targets", b"targetfile content", "targetfile"), + TestTarget("A", b"README by A", "README.md"), + TestTarget("C", b"x release by C", "releases/x/x_v1"), + TestTarget("D", b"y release by D", "releases/y/y_v1.zip"), + TestTarget("D", b"z release by D", "releases/z/z_v1.zip"), + ], + ) + + def setUp(self) -> None: + super().setUp() + self._init_repo(self.delegations_tree) + + # fmt: off + targets: utils.DataSet = { + "target found, no delegations": ("targetfile", True, []), + "targetpath matches wildcard": ("README.md", True, ["A"]), + "targetpath with separators x": ("releases/x/x_v1", True, ["B", "C"]), + "targetpath with separators y": ("releases/y/y_v1.zip", True, ["B", "D"]), + "target exists, path is not delegated": ("releases/z/z_v1.zip", False, ["B"]), + } + # fmt: on + + @utils.run_sub_tests_with_dataset(targets) + def test_targetfile_search( + self, test_data: Tuple[str, bool, List[str]] + ) -> None: + try: + self.setup_subtest() + targetpath, found, visited_order = test_data + exp_files = [*TOP_LEVEL_ROLE_NAMES, *visited_order] + exp_calls = [(role, 1) for role in visited_order] + updater = self._init_updater() + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + self.sim.fetch_tracker.metadata.clear() + target = updater.get_targetinfo(targetpath) + if target is not None: + # Confirm that the expected TargetFile is found + self.assertTrue(found) + exp_target = self.sim.target_files[targetpath].target_file + self.assertDictEqual(target.to_dict(), exp_target.to_dict()) + else: + self.assertFalse(found) + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) self._assert_files_exist(exp_files) finally: self.teardown_subtest() @@ -272,8 +357,8 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: if __name__ == "__main__": if "--dump" in sys.argv: - TestDelegationsGraphs.dump_dir = tempfile.mkdtemp() - print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}") + TestDelegations.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestDelegations.dump_dir}") sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) From 7eea3f908b1a8b15cb93d01644c8a398e017a9d4 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Tue, 7 Dec 2021 15:21:55 +0200 Subject: [PATCH 3/5] Add tests for invalid delegated role metadata Extend TestDelegationsGraphs with a test case for unsigned metadata. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 40 +++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index 3a983bc135..b1ee7cf5b3 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -21,6 +21,7 @@ DelegatedRole, Targets, ) +from tuf.exceptions import UnsignedMetadataError from tuf.ngclient import Updater @@ -281,6 +282,45 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: finally: self.teardown_subtest() + invalid_metadata: utils.DataSet = { + "unsigned delegated role": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "invalid"), + TestDelegation("targets", "B"), + TestDelegation("invalid", "C"), + ], + # The traversal stops after visiting an invalid role + visited_order=["invalid"], + ) + } + + @utils.run_sub_tests_with_dataset(invalid_metadata) + def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None: + try: + self._init_repo(test_data) + # The invalid role is the last visited + invalid_role = test_data.visited_order[-1] + self.sim.signers[invalid_role].clear() + + self.setup_subtest() + # The invalid role metadata must not be persisted + exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order[:-1]] + exp_calls = [(role, 1) for role in test_data.visited_order] + + updater = self._init_updater() + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + self.sim.fetch_tracker.metadata.clear() + + with self.assertRaises(UnsignedMetadataError): + updater.get_targetinfo("missingpath") + # Check that there were no visited roles after the invalid one + # and only the valid metadata files were persisted + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + finally: + self.teardown_subtest() + class TestTargetFileSearch(TestDelegations): """ From d10c8e980d9b37464728402374d7c5828721e699 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Fri, 10 Dec 2021 16:34:23 +0200 Subject: [PATCH 4/5] Use raw string in TestTargetFileSearch docstirng Using a raw string allows the use of backslashes in the docstring comment whithout them being interpreted as an escape character. It also silences pylint W1401: anomalous-backslash-in-string. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index b1ee7cf5b3..ab1edbb9f7 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -323,7 +323,7 @@ def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None: class TestTargetFileSearch(TestDelegations): - """ + r""" Create a single repository with the following delegations: targets From 36eaffaa64920f463d0c075d0ca3069c6028ae4c Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Tue, 21 Dec 2021 12:12:08 +0200 Subject: [PATCH 5/5] Add TestTargetCase dataclass Use a dataclass for a better visual representation of the test case data set. Signed-off-by: Teodora Sechkova --- tests/test_updater_delegation_graphs.py | 43 ++++++++++++++++--------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index ab1edbb9f7..d752be4225 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -11,7 +11,7 @@ import tempfile import unittest from dataclasses import astuple, dataclass, field -from typing import Iterable, List, Optional, Tuple +from typing import Iterable, List, Optional from tests import utils from tests.repository_simulator import RepositorySimulator @@ -53,6 +53,13 @@ class DelegationsTestCase: visited_order: List[str] = field(default_factory=list) +@dataclass +class TargetTestCase: + targetpath: str + found: bool + visited_order: List[str] = field(default_factory=list) + + class TestDelegations(unittest.TestCase): """Base class for delegation tests""" @@ -358,35 +365,39 @@ def setUp(self) -> None: # fmt: off targets: utils.DataSet = { - "target found, no delegations": ("targetfile", True, []), - "targetpath matches wildcard": ("README.md", True, ["A"]), - "targetpath with separators x": ("releases/x/x_v1", True, ["B", "C"]), - "targetpath with separators y": ("releases/y/y_v1.zip", True, ["B", "D"]), - "target exists, path is not delegated": ("releases/z/z_v1.zip", False, ["B"]), + "no delegations": + TargetTestCase("targetfile", True, []), + "targetpath matches wildcard": + TargetTestCase("README.md", True, ["A"]), + "targetpath with separators x": + TargetTestCase("releases/x/x_v1", True, ["B", "C"]), + "targetpath with separators y": + TargetTestCase("releases/y/y_v1.zip", True, ["B", "D"]), + "targetpath is not delegated by all roles in the chain": + TargetTestCase("releases/z/z_v1.zip", False, ["B"]), } # fmt: on @utils.run_sub_tests_with_dataset(targets) - def test_targetfile_search( - self, test_data: Tuple[str, bool, List[str]] - ) -> None: + def test_targetfile_search(self, test_data: TargetTestCase) -> None: try: self.setup_subtest() - targetpath, found, visited_order = test_data - exp_files = [*TOP_LEVEL_ROLE_NAMES, *visited_order] - exp_calls = [(role, 1) for role in visited_order] + # targetpath, found, visited_order = test_data + exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_calls = [(role, 1) for role in test_data.visited_order] + exp_target = self.sim.target_files[test_data.targetpath].target_file + updater = self._init_updater() # Call explicitly refresh to simplify the expected_calls list updater.refresh() self.sim.fetch_tracker.metadata.clear() - target = updater.get_targetinfo(targetpath) + target = updater.get_targetinfo(test_data.targetpath) if target is not None: # Confirm that the expected TargetFile is found - self.assertTrue(found) - exp_target = self.sim.target_files[targetpath].target_file + self.assertTrue(test_data.found) self.assertDictEqual(target.to_dict(), exp_target.to_dict()) else: - self.assertFalse(found) + self.assertFalse(test_data.found) # Check that the delegated roles were visited in the expected # order and the corresponding metadata files were persisted self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)