diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index f5003e1bb0..d752be4225 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -3,8 +3,8 @@ # Copyright 2021, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 -"""Test updating delegated targets roles with various -delegation hierarchies""" +"""Test updating delegated targets roles and searching for +target files with various delegation graphs""" import os import sys @@ -21,6 +21,7 @@ DelegatedRole, Targets, ) +from tuf.exceptions import UnsignedMetadataError from tuf.ngclient import Updater @@ -35,18 +36,32 @@ class TestDelegation: path_hash_prefixes: Optional[List[str]] = None +@dataclass +class TestTarget: + rolename: str + content: bytes + targetpath: str + + @dataclass class DelegationsTestCase: - """Describes a delegations graph as a list of delegations - and the expected order of traversal as 'visited_order'.""" + """A delegations graph as lists of delegations and target files + and the expected order of traversal as a list of role names.""" delegations: List[TestDelegation] - visited_order: List[str] + target_files: List[TestTarget] = field(default_factory=list) + visited_order: List[str] = field(default_factory=list) -class TestDelegationsGraphs(unittest.TestCase): - """Test creating delegations graphs with different complexity - and successfully updating the delegated roles metadata""" +@dataclass +class TargetTestCase: + targetpath: str + found: bool + visited_order: List[str] = field(default_factory=list) + + +class TestDelegations(unittest.TestCase): + """Base class for delegation tests""" # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None @@ -59,63 +74,61 @@ def setUp(self) -> None: self.targets_dir = os.path.join(self.temp_dir.name, "targets") os.mkdir(self.metadata_dir) os.mkdir(self.targets_dir) + self.sim: RepositorySimulator def tearDown(self) -> None: self.temp_dir.cleanup() - def setup_subtest( - self, delegations: List[TestDelegation] - ) -> RepositorySimulator: - sim = self._init_repo(delegations) - + def setup_subtest(self) -> None: self.subtest_count += 1 if self.dump_dir is not None: # create subtest dumpdir name = f"{self.id().split('.')[-1]}-{self.subtest_count}" - sim.dump_dir = os.path.join(self.dump_dir, name) - os.mkdir(sim.dump_dir) + self.sim.dump_dir = os.path.join(self.dump_dir, name) + os.mkdir(self.sim.dump_dir) # dump the repo simulator metadata - sim.write() - - return sim + self.sim.write() def teardown_subtest(self) -> None: - # clean up after each subtest utils.cleanup_dir(self.metadata_dir) - def _init_updater(self, sim: RepositorySimulator) -> Updater: - """Create a new Updater instance""" - return Updater( - self.metadata_dir, - "https://example.com/metadata/", - self.targets_dir, - "https://example.com/targets/", - sim, - ) + def _init_repo(self, test_case: DelegationsTestCase) -> None: + """Create a new RepositorySimulator instance and + populate it with delegations and target files""" - def _init_repo( - self, delegations: List[TestDelegation] - ) -> RepositorySimulator: - """Create a new RepositorySimulator instance with 'delegations'""" - sim = RepositorySimulator() + self.sim = RepositorySimulator() spec_version = ".".join(SPECIFICATION_VERSION) - - for d in delegations: - if d.rolename in sim.md_delegates: - targets = sim.md_delegates[d.rolename].signed + for d in test_case.delegations: + if d.rolename in self.sim.md_delegates: + targets = self.sim.md_delegates[d.rolename].signed else: - targets = Targets(1, spec_version, sim.safe_expiry, {}, None) - + targets = Targets( + 1, spec_version, self.sim.safe_expiry, {}, None + ) # unpack 'd' but skip "delegator" role = DelegatedRole(*astuple(d)[1:]) - sim.add_delegation(d.delegator, role, targets) - sim.update_snapshot() + self.sim.add_delegation(d.delegator, role, targets) + + for target in test_case.target_files: + self.sim.add_target(*astuple(target)) + + if test_case.target_files: + self.sim.targets.version += 1 + self.sim.update_snapshot() + def _init_updater(self) -> Updater: + """Create a new Updater instance""" # Init trusted root for Updater with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: - f.write(sim.signed_roots[0]) + f.write(self.sim.signed_roots[0]) - return sim + return Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + ) def _assert_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" @@ -123,6 +136,11 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None: local_metadata_files = sorted(os.listdir(self.metadata_dir)) self.assertListEqual(local_metadata_files, expected_files) + +class TestDelegationsGraphs(TestDelegations): + """Test creating delegations graphs with different complexity + and successfully updating the delegated roles metadata""" + graphs: utils.DataSet = { "basic delegation": DelegationsTestCase( delegations=[TestDelegation("targets", "A")], @@ -226,6 +244,17 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None: # 'C' is reached through 'B' since 'A' does not delegate a matching pattern" visited_order=["A", "B", "C"], ), + "max number of delegations": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A"), + TestDelegation("targets", "B"), + TestDelegation("targets", "C"), + TestDelegation("C", "D"), + TestDelegation("C", "E"), + ], + # "E" is skipped, max_delegations is 4 + visited_order=["A", "B", "C", "D"], + ), } @utils.run_sub_tests_with_dataset(graphs) @@ -237,11 +266,15 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] exp_calls = [(role, 1) for role in test_data.visited_order] - sim = self.setup_subtest(test_data.delegations) - updater = self._init_updater(sim) + self._init_repo(test_data) + self.setup_subtest() + + updater = self._init_updater() + # restrict the max number of delegations to simplify the test + updater.config.max_delegations = 4 # Call explicitly refresh to simplify the expected_calls list updater.refresh() - sim.fetch_tracker.metadata.clear() + self.sim.fetch_tracker.metadata.clear() # Check that metadata dir contains only top-level roles self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) @@ -251,7 +284,123 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: self.assertIsNone(targetfile) # Check that the delegated roles were visited in the expected # order and the corresponding metadata files were persisted - self.assertListEqual(sim.fetch_tracker.metadata, exp_calls) + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + finally: + self.teardown_subtest() + + invalid_metadata: utils.DataSet = { + "unsigned delegated role": DelegationsTestCase( + delegations=[ + TestDelegation("targets", "invalid"), + TestDelegation("targets", "B"), + TestDelegation("invalid", "C"), + ], + # The traversal stops after visiting an invalid role + visited_order=["invalid"], + ) + } + + @utils.run_sub_tests_with_dataset(invalid_metadata) + def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None: + try: + self._init_repo(test_data) + # The invalid role is the last visited + invalid_role = test_data.visited_order[-1] + self.sim.signers[invalid_role].clear() + + self.setup_subtest() + # The invalid role metadata must not be persisted + exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order[:-1]] + exp_calls = [(role, 1) for role in test_data.visited_order] + + updater = self._init_updater() + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + self.sim.fetch_tracker.metadata.clear() + + with self.assertRaises(UnsignedMetadataError): + updater.get_targetinfo("missingpath") + # Check that there were no visited roles after the invalid one + # and only the valid metadata files were persisted + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) + self._assert_files_exist(exp_files) + finally: + self.teardown_subtest() + + +class TestTargetFileSearch(TestDelegations): + r""" + Create a single repository with the following delegations: + + targets + *.doc, *md / \ release/*/* + A B + release/x/* / \ release/y/*.zip + C D + + Test that Updater successfully finds the target files metadata, + traversing the delegations as expected. + """ + + delegations_tree = DelegationsTestCase( + delegations=[ + TestDelegation("targets", "A", paths=["*.doc", "*.md"]), + TestDelegation("targets", "B", paths=["releases/*/*"]), + TestDelegation("B", "C", paths=["releases/x/*"]), + TestDelegation("B", "D", paths=["releases/y/*.zip"]), + ], + target_files=[ + TestTarget("targets", b"targetfile content", "targetfile"), + TestTarget("A", b"README by A", "README.md"), + TestTarget("C", b"x release by C", "releases/x/x_v1"), + TestTarget("D", b"y release by D", "releases/y/y_v1.zip"), + TestTarget("D", b"z release by D", "releases/z/z_v1.zip"), + ], + ) + + def setUp(self) -> None: + super().setUp() + self._init_repo(self.delegations_tree) + + # fmt: off + targets: utils.DataSet = { + "no delegations": + TargetTestCase("targetfile", True, []), + "targetpath matches wildcard": + TargetTestCase("README.md", True, ["A"]), + "targetpath with separators x": + TargetTestCase("releases/x/x_v1", True, ["B", "C"]), + "targetpath with separators y": + TargetTestCase("releases/y/y_v1.zip", True, ["B", "D"]), + "targetpath is not delegated by all roles in the chain": + TargetTestCase("releases/z/z_v1.zip", False, ["B"]), + } + # fmt: on + + @utils.run_sub_tests_with_dataset(targets) + def test_targetfile_search(self, test_data: TargetTestCase) -> None: + try: + self.setup_subtest() + # targetpath, found, visited_order = test_data + exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order] + exp_calls = [(role, 1) for role in test_data.visited_order] + exp_target = self.sim.target_files[test_data.targetpath].target_file + + updater = self._init_updater() + # Call explicitly refresh to simplify the expected_calls list + updater.refresh() + self.sim.fetch_tracker.metadata.clear() + target = updater.get_targetinfo(test_data.targetpath) + if target is not None: + # Confirm that the expected TargetFile is found + self.assertTrue(test_data.found) + self.assertDictEqual(target.to_dict(), exp_target.to_dict()) + else: + self.assertFalse(test_data.found) + # Check that the delegated roles were visited in the expected + # order and the corresponding metadata files were persisted + self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) self._assert_files_exist(exp_files) finally: self.teardown_subtest() @@ -259,8 +408,8 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: if __name__ == "__main__": if "--dump" in sys.argv: - TestDelegationsGraphs.dump_dir = tempfile.mkdtemp() - print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}") + TestDelegations.dump_dir = tempfile.mkdtemp() + print(f"Repository Simulator dumps in {TestDelegations.dump_dir}") sys.argv.remove("--dump") utils.configure_test_logging(sys.argv) diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 9aa837f287..8a264e736f 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -423,10 +423,12 @@ def _preorder_depth_first_walk( # is needed to load and verify the delegated targets metadata. delegations_to_visit = [(Targets.type, Root.type)] visited_role_names: Set[str] = set() - number_of_delegations = self.config.max_delegations # Preorder depth-first traversal of the graph of target delegations. - while number_of_delegations > 0 and len(delegations_to_visit) > 0: + while ( + len(visited_role_names) <= self.config.max_delegations + and len(delegations_to_visit) > 0 + ): # Pop the role name from the top of the stack. role_name, parent_role = delegations_to_visit.pop(-1) @@ -449,9 +451,6 @@ def _preorder_depth_first_walk( # After preorder check, add current role to set of visited roles. visited_role_names.add(role_name) - # And also decrement number of visited roles. - number_of_delegations -= 1 - if targets.delegations is not None: child_roles_to_visit = [] # NOTE: This may be a slow operation if there are many @@ -464,7 +463,7 @@ def _preorder_depth_first_walk( (child_role.name, role_name) ) if child_role.terminating: - logger.debug("Not backtracking to other roles.") + logger.debug("Not backtracking to other roles") delegations_to_visit = [] break # Push 'child_roles_to_visit' in reverse order of appearance @@ -473,10 +472,9 @@ def _preorder_depth_first_walk( child_roles_to_visit.reverse() delegations_to_visit.extend(child_roles_to_visit) - if number_of_delegations == 0 and len(delegations_to_visit) > 0: + if len(delegations_to_visit) > 0: logger.debug( - "%d roles left to visit, but allowed to " - "visit at most %d delegations.", + "%d roles left to visit, but allowed at most %d delegations", len(delegations_to_visit), self.config.max_delegations, )