Skip to content

Commit 8bb704b

Browse files
author
Jussi Kukkonen
authored
Merge pull request #1355 from jku/experimental-metadata-bundle
experimental client: Add MetadataBundle
2 parents 6dbcb73 + 377eac1 commit 8bb704b

File tree

5 files changed

+632
-36
lines changed

5 files changed

+632
-36
lines changed

tests/test_api.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -450,21 +450,23 @@ def test_delegated_role_class(self):
450450
with self.assertRaises(ValueError):
451451
DelegatedRole.from_dict(role.copy())
452452

453-
# Test creating DelegatedRole only with "path_hash_prefixes"
453+
# Test creating DelegatedRole only with "path_hash_prefixes" (an empty one)
454454
del role["paths"]
455-
DelegatedRole.from_dict(role.copy())
456-
role["paths"] = "foo"
455+
role["path_hash_prefixes"] = []
456+
role_obj = DelegatedRole.from_dict(role.copy())
457+
self.assertEqual(role_obj.to_dict(), role)
457458

458-
# Test creating DelegatedRole only with "paths"
459+
# Test creating DelegatedRole only with "paths" (now an empty one)
459460
del role["path_hash_prefixes"]
460-
DelegatedRole.from_dict(role.copy())
461-
role["path_hash_prefixes"] = "foo"
461+
role["paths"] = []
462+
role_obj = DelegatedRole.from_dict(role.copy())
463+
self.assertEqual(role_obj.to_dict(), role)
462464

463465
# Test creating DelegatedRole without "paths" and
464466
# "path_hash_prefixes" set
465467
del role["paths"]
466-
del role["path_hash_prefixes"]
467-
DelegatedRole.from_dict(role)
468+
role_obj = DelegatedRole.from_dict(role.copy())
469+
self.assertEqual(role_obj.to_dict(), role)
468470

469471

470472
def test_delegation_class(self):
@@ -494,6 +496,21 @@ def test_delegation_class(self):
494496
delegations = Delegations.from_dict(copy.deepcopy(delegations_dict))
495497
self.assertEqual(delegations_dict, delegations.to_dict())
496498

499+
# empty keys and roles
500+
delegations_dict = {"keys":{}, "roles":[]}
501+
delegations = Delegations.from_dict(delegations_dict.copy())
502+
self.assertEqual(delegations_dict, delegations.to_dict())
503+
504+
# Test some basic missing or broken input
505+
invalid_delegations_dicts = [
506+
{},
507+
{"keys":None, "roles":None},
508+
{"keys":{"foo":0}, "roles":[]},
509+
{"keys":{}, "roles":["foo"]},
510+
]
511+
for d in invalid_delegations_dicts:
512+
with self.assertRaises((KeyError, AttributeError)):
513+
Delegations.from_dict(d)
497514

498515
def test_metadata_targets(self):
499516
targets_path = os.path.join(

tests/test_metadata_bundle.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import json
2+
import logging
3+
import os
4+
import shutil
5+
import sys
6+
import tempfile
7+
import unittest
8+
9+
from tuf import exceptions
10+
from tuf.api.metadata import Metadata
11+
from tuf.client_rework.metadata_bundle import MetadataBundle
12+
13+
from tests import utils
14+
15+
logger = logging.getLogger(__name__)
16+
17+
class TestMetadataBundle(unittest.TestCase):
18+
19+
def test_update(self):
20+
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
21+
22+
with open(os.path.join(repo_dir, "root.json"), "rb") as f:
23+
bundle = MetadataBundle(f.read())
24+
bundle.root_update_finished()
25+
26+
with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f:
27+
bundle.update_timestamp(f.read())
28+
with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f:
29+
bundle.update_snapshot(f.read())
30+
with open(os.path.join(repo_dir, "targets.json"), "rb") as f:
31+
bundle.update_targets(f.read())
32+
with open(os.path.join(repo_dir, "role1.json"), "rb") as f:
33+
bundle.update_delegated_targets(f.read(), "role1", "targets")
34+
with open(os.path.join(repo_dir, "role2.json"), "rb") as f:
35+
bundle.update_delegated_targets(f.read(), "role2", "role1")
36+
37+
def test_out_of_order_ops(self):
38+
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
39+
data={}
40+
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
41+
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
42+
data[md] = f.read()
43+
44+
bundle = MetadataBundle(data["root"])
45+
46+
# Update timestamp before root is finished
47+
with self.assertRaises(RuntimeError):
48+
bundle.update_timestamp(data["timestamp"])
49+
50+
bundle.root_update_finished()
51+
with self.assertRaises(RuntimeError):
52+
bundle.root_update_finished()
53+
54+
# Update snapshot before timestamp
55+
with self.assertRaises(RuntimeError):
56+
bundle.update_snapshot(data["snapshot"])
57+
58+
bundle.update_timestamp(data["timestamp"])
59+
60+
# Update targets before snapshot
61+
with self.assertRaises(RuntimeError):
62+
bundle.update_targets(data["targets"])
63+
64+
bundle.update_snapshot(data["snapshot"])
65+
66+
#update timestamp after snapshot
67+
with self.assertRaises(RuntimeError):
68+
bundle.update_timestamp(data["timestamp"])
69+
70+
# Update delegated targets before targets
71+
with self.assertRaises(RuntimeError):
72+
bundle.update_delegated_targets(data["role1"], "role1", "targets")
73+
74+
bundle.update_targets(data["targets"])
75+
bundle.update_delegated_targets(data["role1"], "role1", "targets")
76+
77+
def test_update_with_invalid_json(self):
78+
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
79+
data={}
80+
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
81+
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
82+
data[md] = f.read()
83+
84+
# root.json not a json file at all
85+
with self.assertRaises(exceptions.RepositoryError):
86+
MetadataBundle(b"")
87+
# root.json is invalid
88+
root = Metadata.from_bytes(data["root"])
89+
root.signed.version += 1
90+
with self.assertRaises(exceptions.RepositoryError):
91+
MetadataBundle(json.dumps(root.to_dict()).encode())
92+
93+
bundle = MetadataBundle(data["root"])
94+
bundle.root_update_finished()
95+
96+
top_level_md = [
97+
(data["timestamp"], bundle.update_timestamp),
98+
(data["snapshot"], bundle.update_snapshot),
99+
(data["targets"], bundle.update_targets),
100+
]
101+
for metadata, update_func in top_level_md:
102+
# metadata is not json
103+
with self.assertRaises(exceptions.RepositoryError):
104+
update_func(b"")
105+
# metadata is invalid
106+
md = Metadata.from_bytes(metadata)
107+
md.signed.version += 1
108+
with self.assertRaises(exceptions.RepositoryError):
109+
update_func(json.dumps(md.to_dict()).encode())
110+
111+
# metadata is of wrong type
112+
with self.assertRaises(exceptions.RepositoryError):
113+
update_func(data["root"])
114+
115+
update_func(metadata)
116+
117+
118+
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
119+
# TODO test the actual specification checks
120+
121+
122+
if __name__ == '__main__':
123+
utils.configure_test_logging(sys.argv)
124+
unittest.main()

tuf/api/metadata.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ def __init__(
770770
super().__init__(keyids, threshold, unrecognized_fields)
771771
self.name = name
772772
self.terminating = terminating
773-
if paths and path_hash_prefixes:
773+
if paths is not None and path_hash_prefixes is not None:
774774
raise ValueError(
775775
"Only one of the attributes 'paths' and"
776776
"'path_hash_prefixes' can be set!"
@@ -806,9 +806,9 @@ def to_dict(self) -> Dict[str, Any]:
806806
"terminating": self.terminating,
807807
**base_role_dict,
808808
}
809-
if self.paths:
809+
if self.paths is not None:
810810
res_dict["paths"] = self.paths
811-
elif self.path_hash_prefixes:
811+
elif self.path_hash_prefixes is not None:
812812
res_dict["path_hash_prefixes"] = self.path_hash_prefixes
813813
return res_dict
814814

@@ -911,17 +911,20 @@ def from_dict(cls, targets_dict: Dict[str, Any]) -> "Targets":
911911
"""Creates Targets object from its dict representation."""
912912
common_args = cls._common_fields_from_dict(targets_dict)
913913
targets = targets_dict.pop("targets")
914-
delegations = targets_dict.pop("delegations", None)
915-
if delegations:
916-
delegations = Delegations.from_dict(delegations)
914+
try:
915+
delegations_dict = targets_dict.pop("delegations")
916+
except KeyError:
917+
delegations = None
918+
else:
919+
delegations = Delegations.from_dict(delegations_dict)
917920
# All fields left in the targets_dict are unrecognized.
918921
return cls(*common_args, targets, delegations, targets_dict)
919922

920923
def to_dict(self) -> Dict[str, Any]:
921924
"""Returns the dict representation of self."""
922925
targets_dict = self._common_fields_to_dict()
923926
targets_dict["targets"] = self.targets
924-
if self.delegations:
927+
if self.delegations is not None:
925928
targets_dict["delegations"] = self.delegations.to_dict()
926929
return targets_dict
927930

0 commit comments

Comments
 (0)