Skip to content

Extend delegations tests #1711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 198 additions & 49 deletions tests/test_updater_delegation_graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
# Copyright 2021, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0

"""Test updating delegated targets roles with various
delegation hierarchies"""
"""Test updating delegated targets roles and searching for
target files with various delegation graphs"""

import os
import sys
Expand All @@ -21,6 +21,7 @@
DelegatedRole,
Targets,
)
from tuf.exceptions import UnsignedMetadataError
from tuf.ngclient import Updater


Expand All @@ -35,18 +36,32 @@ class TestDelegation:
path_hash_prefixes: Optional[List[str]] = None


@dataclass
class TestTarget:
rolename: str
content: bytes
targetpath: str


@dataclass
class DelegationsTestCase:
"""Describes a delegations graph as a list of delegations
and the expected order of traversal as 'visited_order'."""
"""A delegations graph as lists of delegations and target files
and the expected order of traversal as a list of role names."""

delegations: List[TestDelegation]
visited_order: List[str]
target_files: List[TestTarget] = field(default_factory=list)
visited_order: List[str] = field(default_factory=list)


class TestDelegationsGraphs(unittest.TestCase):
"""Test creating delegations graphs with different complexity
and successfully updating the delegated roles metadata"""
@dataclass
class TargetTestCase:
targetpath: str
found: bool
visited_order: List[str] = field(default_factory=list)


class TestDelegations(unittest.TestCase):
"""Base class for delegation tests"""

# set dump_dir to trigger repository state dumps
dump_dir: Optional[str] = None
Expand All @@ -59,70 +74,73 @@ def setUp(self) -> None:
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
os.mkdir(self.metadata_dir)
os.mkdir(self.targets_dir)
self.sim: RepositorySimulator

def tearDown(self) -> None:
self.temp_dir.cleanup()

def setup_subtest(
self, delegations: List[TestDelegation]
) -> RepositorySimulator:
sim = self._init_repo(delegations)

def setup_subtest(self) -> None:
self.subtest_count += 1
if self.dump_dir is not None:
# create subtest dumpdir
name = f"{self.id().split('.')[-1]}-{self.subtest_count}"
sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(sim.dump_dir)
self.sim.dump_dir = os.path.join(self.dump_dir, name)
os.mkdir(self.sim.dump_dir)
# dump the repo simulator metadata
sim.write()

return sim
self.sim.write()

def teardown_subtest(self) -> None:
# clean up after each subtest
utils.cleanup_dir(self.metadata_dir)

def _init_updater(self, sim: RepositorySimulator) -> Updater:
"""Create a new Updater instance"""
return Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
sim,
)
def _init_repo(self, test_case: DelegationsTestCase) -> None:
"""Create a new RepositorySimulator instance and
populate it with delegations and target files"""

def _init_repo(
self, delegations: List[TestDelegation]
) -> RepositorySimulator:
"""Create a new RepositorySimulator instance with 'delegations'"""
sim = RepositorySimulator()
self.sim = RepositorySimulator()
spec_version = ".".join(SPECIFICATION_VERSION)

for d in delegations:
if d.rolename in sim.md_delegates:
targets = sim.md_delegates[d.rolename].signed
for d in test_case.delegations:
if d.rolename in self.sim.md_delegates:
targets = self.sim.md_delegates[d.rolename].signed
else:
targets = Targets(1, spec_version, sim.safe_expiry, {}, None)

targets = Targets(
1, spec_version, self.sim.safe_expiry, {}, None
)
# unpack 'd' but skip "delegator"
role = DelegatedRole(*astuple(d)[1:])
sim.add_delegation(d.delegator, role, targets)
sim.update_snapshot()
self.sim.add_delegation(d.delegator, role, targets)

for target in test_case.target_files:
self.sim.add_target(*astuple(target))

if test_case.target_files:
self.sim.targets.version += 1
self.sim.update_snapshot()

def _init_updater(self) -> Updater:
"""Create a new Updater instance"""
# Init trusted root for Updater
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
f.write(sim.signed_roots[0])
f.write(self.sim.signed_roots[0])

return sim
return Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
)

def _assert_files_exist(self, roles: Iterable[str]) -> None:
"""Assert that local metadata files exist for 'roles'"""
expected_files = sorted([f"{role}.json" for role in roles])
local_metadata_files = sorted(os.listdir(self.metadata_dir))
self.assertListEqual(local_metadata_files, expected_files)


class TestDelegationsGraphs(TestDelegations):
"""Test creating delegations graphs with different complexity
and successfully updating the delegated roles metadata"""

graphs: utils.DataSet = {
"basic delegation": DelegationsTestCase(
delegations=[TestDelegation("targets", "A")],
Expand Down Expand Up @@ -226,6 +244,17 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None:
# 'C' is reached through 'B' since 'A' does not delegate a matching pattern"
visited_order=["A", "B", "C"],
),
"max number of delegations": DelegationsTestCase(
delegations=[
TestDelegation("targets", "A"),
TestDelegation("targets", "B"),
TestDelegation("targets", "C"),
TestDelegation("C", "D"),
TestDelegation("C", "E"),
],
# "E" is skipped, max_delegations is 4
visited_order=["A", "B", "C", "D"],
),
}

@utils.run_sub_tests_with_dataset(graphs)
Expand All @@ -237,11 +266,15 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order]
exp_calls = [(role, 1) for role in test_data.visited_order]

sim = self.setup_subtest(test_data.delegations)
updater = self._init_updater(sim)
self._init_repo(test_data)
self.setup_subtest()

updater = self._init_updater()
# restrict the max number of delegations to simplify the test
updater.config.max_delegations = 4
# Call explicitly refresh to simplify the expected_calls list
updater.refresh()
sim.fetch_tracker.metadata.clear()
self.sim.fetch_tracker.metadata.clear()
# Check that metadata dir contains only top-level roles
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)

Expand All @@ -251,16 +284,132 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
self.assertIsNone(targetfile)
# Check that the delegated roles were visited in the expected
# order and the corresponding metadata files were persisted
self.assertListEqual(sim.fetch_tracker.metadata, exp_calls)
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
self._assert_files_exist(exp_files)
finally:
self.teardown_subtest()

invalid_metadata: utils.DataSet = {
"unsigned delegated role": DelegationsTestCase(
delegations=[
TestDelegation("targets", "invalid"),
TestDelegation("targets", "B"),
TestDelegation("invalid", "C"),
],
# The traversal stops after visiting an invalid role
visited_order=["invalid"],
)
}

@utils.run_sub_tests_with_dataset(invalid_metadata)
def test_invalid_metadata(self, test_data: DelegationsTestCase) -> None:
try:
self._init_repo(test_data)
# The invalid role is the last visited
invalid_role = test_data.visited_order[-1]
self.sim.signers[invalid_role].clear()

self.setup_subtest()
# The invalid role metadata must not be persisted
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order[:-1]]
exp_calls = [(role, 1) for role in test_data.visited_order]

updater = self._init_updater()
# Call explicitly refresh to simplify the expected_calls list
updater.refresh()
self.sim.fetch_tracker.metadata.clear()

with self.assertRaises(UnsignedMetadataError):
updater.get_targetinfo("missingpath")
# Check that there were no visited roles after the invalid one
# and only the valid metadata files were persisted
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
self._assert_files_exist(exp_files)
finally:
self.teardown_subtest()


class TestTargetFileSearch(TestDelegations):
r"""
Create a single repository with the following delegations:

targets
*.doc, *md / \ release/*/*
A B
release/x/* / \ release/y/*.zip
C D

Test that Updater successfully finds the target files metadata,
traversing the delegations as expected.
"""

delegations_tree = DelegationsTestCase(
delegations=[
TestDelegation("targets", "A", paths=["*.doc", "*.md"]),
TestDelegation("targets", "B", paths=["releases/*/*"]),
TestDelegation("B", "C", paths=["releases/x/*"]),
TestDelegation("B", "D", paths=["releases/y/*.zip"]),
],
target_files=[
TestTarget("targets", b"targetfile content", "targetfile"),
TestTarget("A", b"README by A", "README.md"),
TestTarget("C", b"x release by C", "releases/x/x_v1"),
TestTarget("D", b"y release by D", "releases/y/y_v1.zip"),
TestTarget("D", b"z release by D", "releases/z/z_v1.zip"),
],
)

def setUp(self) -> None:
super().setUp()
self._init_repo(self.delegations_tree)

# fmt: off
targets: utils.DataSet = {
"no delegations":
TargetTestCase("targetfile", True, []),
"targetpath matches wildcard":
TargetTestCase("README.md", True, ["A"]),
"targetpath with separators x":
TargetTestCase("releases/x/x_v1", True, ["B", "C"]),
"targetpath with separators y":
TargetTestCase("releases/y/y_v1.zip", True, ["B", "D"]),
"targetpath is not delegated by all roles in the chain":
TargetTestCase("releases/z/z_v1.zip", False, ["B"]),
}
# fmt: on

@utils.run_sub_tests_with_dataset(targets)
def test_targetfile_search(self, test_data: TargetTestCase) -> None:
try:
self.setup_subtest()
# targetpath, found, visited_order = test_data
exp_files = [*TOP_LEVEL_ROLE_NAMES, *test_data.visited_order]
exp_calls = [(role, 1) for role in test_data.visited_order]
exp_target = self.sim.target_files[test_data.targetpath].target_file

updater = self._init_updater()
# Call explicitly refresh to simplify the expected_calls list
updater.refresh()
self.sim.fetch_tracker.metadata.clear()
target = updater.get_targetinfo(test_data.targetpath)
if target is not None:
# Confirm that the expected TargetFile is found
self.assertTrue(test_data.found)
self.assertDictEqual(target.to_dict(), exp_target.to_dict())
else:
self.assertFalse(test_data.found)
# Check that the delegated roles were visited in the expected
# order and the corresponding metadata files were persisted
self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls)
self._assert_files_exist(exp_files)
finally:
self.teardown_subtest()


if __name__ == "__main__":
if "--dump" in sys.argv:
TestDelegationsGraphs.dump_dir = tempfile.mkdtemp()
print(f"Repository Simulator dumps in {TestDelegationsGraphs.dump_dir}")
TestDelegations.dump_dir = tempfile.mkdtemp()
print(f"Repository Simulator dumps in {TestDelegations.dump_dir}")
sys.argv.remove("--dump")

utils.configure_test_logging(sys.argv)
Expand Down
16 changes: 7 additions & 9 deletions tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,12 @@ def _preorder_depth_first_walk(
# is needed to load and verify the delegated targets metadata.
delegations_to_visit = [(Targets.type, Root.type)]
visited_role_names: Set[str] = set()
number_of_delegations = self.config.max_delegations

# Preorder depth-first traversal of the graph of target delegations.
while number_of_delegations > 0 and len(delegations_to_visit) > 0:
while (
len(visited_role_names) <= self.config.max_delegations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that I look at this again... doesn't this handle one more delegation than it used to?

The new interpretation could be the correct one (with the assumption that only delegated targets are counted so "targets" itself is not a delegation), just wanted to make sure we're not making a mistake. The test you added for max delegations feels correct to me so I guess I'm ok with this interpretation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in the previous implementation "targets" was counted in the delegations number which I noticed only after adding the test. So I changed a bit the implementation using "<=" here to compensate for targets. I think I mentioned it in the commit message but I can try adding a comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you did, I just did not understand the connection...

and len(delegations_to_visit) > 0
):

# Pop the role name from the top of the stack.
role_name, parent_role = delegations_to_visit.pop(-1)
Expand All @@ -449,9 +451,6 @@ def _preorder_depth_first_walk(
# After preorder check, add current role to set of visited roles.
visited_role_names.add(role_name)

# And also decrement number of visited roles.
number_of_delegations -= 1

if targets.delegations is not None:
child_roles_to_visit = []
# NOTE: This may be a slow operation if there are many
Expand All @@ -464,7 +463,7 @@ def _preorder_depth_first_walk(
(child_role.name, role_name)
)
if child_role.terminating:
logger.debug("Not backtracking to other roles.")
logger.debug("Not backtracking to other roles")
delegations_to_visit = []
break
# Push 'child_roles_to_visit' in reverse order of appearance
Expand All @@ -473,10 +472,9 @@ def _preorder_depth_first_walk(
child_roles_to_visit.reverse()
delegations_to_visit.extend(child_roles_to_visit)

if number_of_delegations == 0 and len(delegations_to_visit) > 0:
if len(delegations_to_visit) > 0:
logger.debug(
"%d roles left to visit, but allowed to "
"visit at most %d delegations.",
"%d roles left to visit, but allowed at most %d delegations",
len(delegations_to_visit),
self.config.max_delegations,
)
Expand Down