Skip to content

Commit 37a4d41

Browse files
author
Jussi Kukkonen
committed
Metadata API: Implement threshold verification
The delegating Metadata (root or targets) verifies that the delegated metadata is signed by required threshold of keys for the delegated role. Calling the function on non-delegator-metadata or giving a rolename that is not actually delegated by the delegator is considered a programming error and ValueError is raised. If the threshold is not reached, UnsignedMetadataError is raised. Tweak type annotation of Delegations.keys to match the one for Root.keys (so they can be assigned to same local variable). Signed-off-by: Jussi Kukkonen <[email protected]>
1 parent 745a8f7 commit 37a4d41

File tree

2 files changed

+105
-1
lines changed

2 files changed

+105
-1
lines changed

tests/test_api.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,55 @@ def test_metadata_timestamp(self):
333333
)
334334

335335

336+
def test_metadata_verify_delegate(self):
337+
root_path = os.path.join(self.repo_dir, 'metadata', 'root.json')
338+
root = Metadata.from_file(root_path)
339+
snapshot_path = os.path.join(
340+
self.repo_dir, 'metadata', 'snapshot.json')
341+
snapshot = Metadata.from_file(snapshot_path)
342+
targets_path = os.path.join(
343+
self.repo_dir, 'metadata', 'targets.json')
344+
targets = Metadata.from_file(targets_path)
345+
role1_path = os.path.join(
346+
self.repo_dir, 'metadata', 'role1.json')
347+
role1 = Metadata.from_file(role1_path)
348+
role2_path = os.path.join(
349+
self.repo_dir, 'metadata', 'role2.json')
350+
role2 = Metadata.from_file(role2_path)
351+
352+
# test the expected delegation tree
353+
root.verify_delegate('root', root)
354+
root.verify_delegate('snapshot', snapshot)
355+
root.verify_delegate('targets', targets)
356+
targets.verify_delegate('role1', role1)
357+
role1.verify_delegate('role2', role2)
358+
359+
# only root and targets can verify delegates
360+
with self.assertRaises(ValueError):
361+
snapshot.verify_delegate('snapshot', snapshot)
362+
# verify fails for roles that are not delegated by delegator
363+
with self.assertRaises(ValueError):
364+
root.verify_delegate('role1', role1)
365+
with self.assertRaises(ValueError):
366+
targets.verify_delegate('targets', targets)
367+
368+
# verify fails when delegate content is modified
369+
expires = snapshot.signed.expires
370+
snapshot.signed.bump_expiration()
371+
with self.assertRaises(exceptions.UnsignedMetadataError):
372+
root.verify_delegate('snapshot', snapshot)
373+
snapshot.signed.expires = expires
374+
375+
# verify fails if roles keys do not sign the metadata
376+
with self.assertRaises(exceptions.UnsignedMetadataError):
377+
root.verify_delegate('timestamp', snapshot)
378+
379+
# verify fails if threshold of signatures is not reached
380+
root.signed.roles['snapshot'].threshold = 2
381+
with self.assertRaises(exceptions.UnsignedMetadataError):
382+
root.verify_delegate('snapshot', snapshot)
383+
384+
# TODO test successful verify with higher thresholds
336385

337386
def test_key_class(self):
338387
keys = {

tuf/api/metadata.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,61 @@ def sign(
266266

267267
return signature
268268

269+
def verify_delegate(
270+
self,
271+
role_name: str,
272+
delegate: "Metadata",
273+
signed_serializer: Optional[SignedSerializer] = None,
274+
):
275+
"""Verifies that 'delegate' is signed with the required threshold of
276+
keys for the delegated role 'role_name'.
277+
278+
Args:
279+
role_name: Name of the delegated role to verify
280+
delegate: The Metadata object for the delegated role
281+
signed_serializer: Optional; serializer used for delegate
282+
serialization. Default is CanonicalJSONSerializer.
283+
284+
Raises:
285+
UnsignedMetadataError: 'delegate' was not signed with required
286+
threshold of keys for 'role_name'
287+
"""
288+
289+
# Find the keys and role in our metadata
290+
role = None
291+
if isinstance(self.signed, Root):
292+
keys = self.signed.keys
293+
role = self.signed.roles.get(role_name)
294+
elif isinstance(self.signed, Targets):
295+
if self.signed.delegations:
296+
keys = self.signed.delegations.keys
297+
# Assume role names are unique in delegations.roles: #1426
298+
roles = self.signed.delegations.roles
299+
role = next((r for r in roles if r.name == role_name), None)
300+
else:
301+
raise ValueError("Call is valid only on delegator metadata")
302+
303+
if role is None:
304+
raise ValueError(f"No delegation found for {role_name}")
305+
306+
# verify that delegate is signed by required threshold of unique keys
307+
signing_keys = set()
308+
for keyid in role.keyids:
309+
key = keys[keyid]
310+
try:
311+
key.verify_signature(delegate, signed_serializer)
312+
# keyids are unique. Try to make sure the public keys are too
313+
signing_keys.add(key.keyval["public"])
314+
except exceptions.UnsignedMetadataError:
315+
pass
316+
317+
if len(signing_keys) < role.threshold:
318+
raise exceptions.UnsignedMetadataError(
319+
f"{role_name} was signed by {len(signing_keys)}/"
320+
f"{role.threshold} keys",
321+
delegate.signed,
322+
)
323+
269324

270325
class Signed(metaclass=abc.ABCMeta):
271326
"""A base class for the signed part of TUF metadata.
@@ -966,7 +1021,7 @@ class Delegations:
9661021

9671022
def __init__(
9681023
self,
969-
keys: Mapping[str, Key],
1024+
keys: Dict[str, Key],
9701025
roles: List[DelegatedRole],
9711026
unrecognized_fields: Optional[Mapping[str, Any]] = None,
9721027
) -> None:

0 commit comments

Comments
 (0)