Skip to content

Update ngclient to return loaded metadata #1680

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions tests/test_trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ def test_update(self):

self.assertTrue(count, 6)

def test_update_metadata_output(self):
timestamp = self.trusted_set.update_timestamp(self.metadata["timestamp"])
snapshot = self.trusted_set.update_snapshot(self.metadata["snapshot"])
targets = self.trusted_set.update_targets(self.metadata["targets"])
delegeted_targets_1 = self.trusted_set.update_delegated_targets(
self.metadata["role1"], "role1", "targets"
)
delegeted_targets_2 = self.trusted_set.update_delegated_targets(
self.metadata["role2"], "role2", "role1"
)
self.assertIsInstance(timestamp.signed, Timestamp)
self.assertIsInstance(snapshot.signed, Snapshot)
self.assertIsInstance(targets.signed, Targets)
self.assertIsInstance(delegeted_targets_1.signed, Targets)
self.assertIsInstance(delegeted_targets_2.signed, Targets)

def test_out_of_order_ops(self):
# Update snapshot before timestamp
with self.assertRaises(RuntimeError):
Expand Down
35 changes: 29 additions & 6 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def targets(self) -> Optional[Metadata[Targets]]:
return self._trusted_set.get("targets")

# Methods for updating metadata
def update_root(self, data: bytes) -> None:
def update_root(self, data: bytes) -> Metadata[Root]:
"""Verifies and loads 'data' as new root metadata.

Note that an expired intermediate root is considered valid: expiry is
Expand All @@ -153,6 +153,9 @@ def update_root(self, data: bytes) -> None:
Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.

Returns:
Deserialized and verified root Metadata object
"""
if self.timestamp is not None:
raise RuntimeError("Cannot update root after timestamp")
Expand Down Expand Up @@ -182,7 +185,9 @@ def update_root(self, data: bytes) -> None:
self._trusted_set["root"] = new_root
logger.info("Updated root v%d", new_root.signed.version)

def update_timestamp(self, data: bytes) -> None:
return new_root

def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
"""Verifies and loads 'data' as new timestamp metadata.

Note that an intermediate timestamp is allowed to be expired:
Expand All @@ -199,6 +204,9 @@ def update_timestamp(self, data: bytes) -> None:
RepositoryError: Metadata failed to load or verify as final
timestamp. The actual error type and content will contain
more details.

Returns:
Deserialized and verified timestamp Metadata object
"""
if self.snapshot is not None:
raise RuntimeError("Cannot update timestamp after snapshot")
Expand Down Expand Up @@ -251,6 +259,8 @@ def update_timestamp(self, data: bytes) -> None:
# timestamp is loaded: raise if it is not valid _final_ timestamp
self._check_final_timestamp()

return new_timestamp

def _check_final_timestamp(self) -> None:
"""Raise if timestamp is expired"""

Expand All @@ -260,7 +270,7 @@ def _check_final_timestamp(self) -> None:

def update_snapshot(
self, data: bytes, trusted: Optional[bool] = False
) -> None:
) -> Metadata[Snapshot]:
"""Verifies and loads 'data' as new snapshot metadata.

Note that an intermediate snapshot is allowed to be expired and version
Expand All @@ -282,6 +292,9 @@ def update_snapshot(
Raises:
RepositoryError: data failed to load or verify as final snapshot.
The actual error type and content will contain more details.

Returns:
Deserialized and verified snapshot Metadata object
"""

if self.timestamp is None:
Expand Down Expand Up @@ -347,6 +360,8 @@ def update_snapshot(
# snapshot is loaded, but we raise if it's not valid _final_ snapshot
self._check_final_snapshot()

return new_snapshot

def _check_final_snapshot(self) -> None:
"""Raise if snapshot is expired or meta version does not match"""

Expand All @@ -361,7 +376,7 @@ def _check_final_snapshot(self) -> None:
f"got {self.snapshot.signed.version}"
)

def update_targets(self, data: bytes) -> None:
def update_targets(self, data: bytes) -> Metadata[Targets]:
"""Verifies and loads 'data' as new top-level targets metadata.

Args:
Expand All @@ -370,12 +385,15 @@ def update_targets(self, data: bytes) -> None:
Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.

Returns:
Deserialized and verified targets Metadata object
"""
self.update_delegated_targets(data, "targets", "root")
return self.update_delegated_targets(data, "targets", "root")

def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str
) -> None:
) -> Metadata[Targets]:
"""Verifies and loads 'data' as new metadata for target 'role_name'.

Args:
Expand All @@ -386,6 +404,9 @@ def update_delegated_targets(
Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.

Returns:
Deserialized and verified targets Metadata object
"""
if self.snapshot is None:
raise RuntimeError("Cannot load targets before snapshot")
Expand Down Expand Up @@ -438,6 +459,8 @@ def update_delegated_targets(
self._trusted_set[role_name] = new_delegate
logger.info("Updated %s v%d", role_name, version)

return new_delegate

def _load_trusted_root(self, data: bytes) -> None:
"""Verifies and loads 'data' as trusted root metadata.

Expand Down
24 changes: 15 additions & 9 deletions tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from securesystemslib import util as sslib_util

from tuf import exceptions
from tuf.api.metadata import TargetFile, Targets
from tuf.api.metadata import Metadata, TargetFile, Targets
from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set
from tuf.ngclient.config import UpdaterConfig
from tuf.ngclient.fetcher import FetcherInterface
Expand Down Expand Up @@ -368,12 +368,15 @@ def _load_snapshot(self) -> None:
self._trusted_set.update_snapshot(data)
self._persist_metadata("snapshot", data)

def _load_targets(self, role: str, parent_role: str) -> None:
def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]:
"""Load local (and if needed remote) metadata for 'role'."""
try:
data = self._load_local_metadata(role)
self._trusted_set.update_delegated_targets(data, role, parent_role)
delegated_targets = self._trusted_set.update_delegated_targets(
data, role, parent_role
)
logger.debug("Local %s is valid: not downloading new one", role)
return delegated_targets
except (OSError, exceptions.RepositoryError) as e:
# Local 'role' does not exist or is invalid: update from remote
logger.debug("Failed to load local %s: %s", role, e)
Expand All @@ -386,9 +389,13 @@ def _load_targets(self, role: str, parent_role: str) -> None:
version = metainfo.version

data = self._download_metadata(role, length, version)
self._trusted_set.update_delegated_targets(data, role, parent_role)
delegated_targets = self._trusted_set.update_delegated_targets(
data, role, parent_role
)
self._persist_metadata(role, data)

return delegated_targets

def _preorder_depth_first_walk(
self, target_filepath: str
) -> Optional[TargetFile]:
Expand Down Expand Up @@ -417,10 +424,9 @@ def _preorder_depth_first_walk(

# The metadata for 'role_name' must be downloaded/updated before
# its targets, delegations, and child roles can be inspected.
self._load_targets(role_name, parent_role)
targets = self._load_targets(role_name, parent_role).signed

role_metadata: Targets = self._trusted_set[role_name].signed
target = role_metadata.targets.get(target_filepath)
target = targets.targets.get(target_filepath)

if target is not None:
logger.debug("Found target in current role %s", role_name)
Expand All @@ -432,11 +438,11 @@ def _preorder_depth_first_walk(
# And also decrement number of visited roles.
number_of_delegations -= 1

if role_metadata.delegations is not None:
if targets.delegations is not None:
child_roles_to_visit = []
# NOTE: This may be a slow operation if there are many
# delegated roles.
for child_role in role_metadata.delegations.roles.values():
for child_role in targets.delegations.roles.values():
if child_role.is_delegated_path(target_filepath):
logger.debug("Adding child role %s", child_role.name)

Expand Down