Skip to content

Commit 785ce31

Browse files
committed
ngclient TrustedMetadataSet: improve unit testing
The current situation with the TrustedMetadataSet testing is that we don't have a mnimimal amount of unit tests testing the different branches in the various API functionality in the class. This commit proposes simple unit tests covering almost all of the branches in the API functions and increasing the unit test coverage (as reported from the "coverage" tool) from 74 % to 97 %. The code could be complicated at places, because the different branches in the update_* functions depend on other metadata classes as well. Still, I hope we can find a way and simplify the code. Signed-off-by: Martin Vrachev <[email protected]>
1 parent 25c9991 commit 785ce31

File tree

1 file changed

+260
-5
lines changed

1 file changed

+260
-5
lines changed

tests/test_trusted_metadata_set.py

Lines changed: 260 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
import copy
12
import json
23
import logging
34
import os
4-
import shutil
55
import sys
6-
import tempfile
76
import unittest
7+
from typing import Dict, Any
8+
from datetime import datetime
89

910
from tuf import exceptions
10-
from tuf.api.metadata import Metadata
11-
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
11+
from tuf.api.metadata import Metadata, MetaFile
12+
from tuf.ngclient._internal.trusted_metadata_set import(
13+
TrustedMetadataSet,
14+
verify_with_threshold
15+
)
16+
from securesystemslib import hash as sslib_hash
17+
from securesystemslib.signer import SSlibSigner
18+
from securesystemslib.interface import import_ed25519_privatekey_from_file
1219

1320
from tests import utils
1421

@@ -26,6 +33,13 @@ def setUpClass(cls):
2633
with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f:
2734
cls.metadata[md] = f.read()
2835

36+
keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore')
37+
cls.keystore = {}
38+
for role in ['delegation', 'snapshot', 'targets', 'timestamp']:
39+
cls.keystore[role] = import_ed25519_privatekey_from_file(
40+
os.path.join(keystore_dir, role + '_key'),
41+
password="password"
42+
)
2943

3044
def test_update(self):
3145
trusted_set = TrustedMetadataSet(self.metadata["root"])
@@ -41,6 +55,9 @@ def test_update(self):
4155
self.metadata["role2"], "role2", "role1"
4256
)
4357

58+
# the 4 top level metadata objects + 2 additional delegated targets
59+
self.assertTrue(len(trusted_set), 6)
60+
4461
def test_out_of_order_ops(self):
4562
trusted_set = TrustedMetadataSet(self.metadata["root"])
4663

@@ -118,9 +135,247 @@ def test_update_with_invalid_json(self):
118135

119136
update_func(metadata)
120137

138+
def test_verify_with_threshold(self):
139+
# Call verify_with_threshold with non root or targets delegator.
140+
delegated_role = Metadata.from_bytes(self.metadata["role1"])
141+
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
142+
with self.assertRaises(ValueError):
143+
verify_with_threshold(timestamp, "role1", delegated_role)
144+
145+
# Call verify_with_threshold with non existent role_name.
146+
targets = Metadata.from_bytes(self.metadata["targets"])
147+
with self.assertRaises(ValueError):
148+
verify_with_threshold(targets, "foo", delegated_role)
149+
150+
def test_invalid_update_root(self):
151+
trusted_set = TrustedMetadataSet(self.metadata["root"])
152+
# new_root data with invalid snapshot type
153+
invalid_type_data = json.loads(self.metadata["root"])
154+
invalid_type_data["signed"]["_type"] = "snapshot"
155+
invalid_type_data["signed"]["meta"] = {"file1.txt": {"version": 1}}
156+
invalid_type_data = json.dumps(invalid_type_data).encode()
157+
# RepositoryError is thrown during new_root deserialization.
158+
# It's not thrown when checking new_root.signed.type != "root"
159+
with self.assertRaises(exceptions.RepositoryError):
160+
trusted_set.update_root(invalid_type_data)
161+
162+
# new_root data with threshold which cannot be verified.
163+
modified_threshold_data = copy.deepcopy(
164+
json.loads(self.metadata["root"])
165+
)
166+
modified_threshold_data["signed"]["roles"]["root"]["threshold"] = 2
167+
modified_threshold_data = json.dumps(modified_threshold_data).encode()
168+
with self.assertRaises(exceptions.UnsignedMetadataError):
169+
trusted_set.update_root(modified_threshold_data)
170+
171+
# new_root.signed.version has the same version as old root
172+
with self.assertRaises(exceptions.ReplayedMetadataError):
173+
trusted_set.update_root(self.metadata["root"])
174+
175+
# if _root_update_finished, then fail when calling update_root
176+
trusted_set.root_update_finished()
177+
with self.assertRaises(RuntimeError):
178+
trusted_set.update_root(self.metadata["root"])
179+
180+
def test_root_update_finished_expired(self):
181+
trusted_set = TrustedMetadataSet(self.metadata["root"])
182+
# call root_update_finished when trusted root has expired
183+
expired_datetime = datetime.strptime(
184+
"1970-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"
185+
)
186+
trusted_set.root.signed.expires = expired_datetime
187+
with self.assertRaises(exceptions.ExpiredMetadataError):
188+
trusted_set.root_update_finished()
189+
190+
def _sign_modified_obj(
191+
self,
192+
role:str,
193+
metadata_obj: Metadata
194+
) -> Dict[str, Any]:
195+
key_dict = self.keystore[role]
196+
sslib_signer = SSlibSigner(key_dict)
197+
signature = metadata_obj.sign(sslib_signer)
198+
return signature.to_dict()
199+
200+
def test_update_timestamp(self):
201+
trusted_set = TrustedMetadataSet(self.metadata["root"])
202+
trusted_set.root_update_finished()
203+
trusted_set.update_timestamp(self.metadata["timestamp"])
204+
# new_timestamp.version < trusted_timestamp.version
205+
trusted_set.timestamp.signed.version = 2
206+
with self.assertRaises(exceptions.ReplayedMetadataError):
207+
trusted_set.update_timestamp(self.metadata["timestamp"])
208+
trusted_set.timestamp.signed.version = 1
209+
210+
# new_timestamp.snapshot.version < trusted_timestamp.snapshot.version
211+
trusted_set.timestamp.signed.meta["snapshot.json"].version = 2
212+
with self.assertRaises(exceptions.ReplayedMetadataError):
213+
trusted_set.update_timestamp(self.metadata["timestamp"])
214+
trusted_set.timestamp.signed.meta["snapshot.json"].version = 1
215+
216+
# new_timestamp has expired
217+
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
218+
timestamp.signed.expires = datetime.strptime(
219+
"1970-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"
220+
)
221+
self._sign_modified_obj("timestamp", timestamp)
222+
new_timestamp_byte_data = json.dumps(timestamp.to_dict()).encode()
223+
with self.assertRaises(exceptions.ExpiredMetadataError):
224+
trusted_set.update_timestamp(new_timestamp_byte_data)
225+
226+
def _calculate_modified_hashes(
227+
self, true_hashes,
228+
data: bytes
229+
) -> Dict[str, str]:
230+
modified_hashes = {}
231+
# Calculate hashes on modified data in order to pass hashes verification.
232+
for algo in true_hashes.keys():
233+
digest_object = sslib_hash.digest(algo)
234+
digest_object.update(data)
235+
observed_hash = digest_object.hexdigest()
236+
modified_hashes[algo] = observed_hash
237+
return modified_hashes
238+
239+
def test_update_snapshot(self):
240+
trusted_set = TrustedMetadataSet(self.metadata["root"])
241+
trusted_set.root_update_finished()
242+
trusted_set.update_timestamp(self.metadata["timestamp"])
243+
# new_snapshot data with invalid targets type
244+
invalid_type_data = json.loads(self.metadata["snapshot"])
245+
invalid_type_data["signed"]["_type"] = "targets"
246+
invalid_type_data["signed"]["targets"] = {}
247+
invalid_type_data = json.dumps(invalid_type_data).encode()
248+
timestamp_meta = trusted_set.timestamp.signed.meta["snapshot.json"]
249+
true_hashes = timestamp_meta.hashes or {}
250+
modified_hashes = self._calculate_modified_hashes(
251+
true_hashes, invalid_type_data
252+
)
253+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = modified_hashes
254+
255+
with self.assertRaises(exceptions.RepositoryError):
256+
trusted_set.update_snapshot(invalid_type_data)
257+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = true_hashes
258+
# cannot update snapshot after targets update completes or targets != None
259+
targets_obj = Metadata.from_bytes(self.metadata["targets"])
260+
trusted_set._trusted_set["targets"] = targets_obj
261+
with self.assertRaises(RuntimeError):
262+
trusted_set.update_snapshot(self.metadata["snapshot"])
263+
del trusted_set._trusted_set["targets"]
264+
265+
# Deserialization error - failed to decode the new_snapshot JSON.
266+
timestamp_meta = trusted_set.timestamp.signed.meta["snapshot.json"]
267+
true_hashes = timestamp_meta.hashes or {}
268+
269+
modified_hashes = self._calculate_modified_hashes(
270+
true_hashes, b'{""sig": }'
271+
)
272+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = modified_hashes
273+
with self.assertRaises(exceptions.RepositoryError):
274+
trusted_set.update_snapshot(b'{""sig": }')
275+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = true_hashes
276+
277+
# root data with threshold which cannot be verified for new_snapshot
278+
trusted_set.root.signed.roles["snapshot"].threshold = 2
279+
with self.assertRaises(exceptions.UnsignedMetadataError):
280+
trusted_set.update_snapshot(self.metadata["snapshot"])
281+
trusted_set.root.signed.roles["snapshot"].threshold = 1
282+
283+
# new_snapshot.version != trusted timestamp.meta["snapshot"].version
284+
trusted_set.timestamp.signed.meta["snapshot.json"].version = 2
285+
with self.assertRaises(exceptions.BadVersionNumberError):
286+
trusted_set.update_snapshot(self.metadata["snapshot"])
287+
trusted_set.timestamp.signed.meta["snapshot.json"].version = 1
288+
289+
290+
def test_update_snapshot_after_succesfull_update(self):
291+
trusted_set = TrustedMetadataSet(self.metadata["root"])
292+
trusted_set.root_update_finished()
293+
trusted_set.update_timestamp(self.metadata["timestamp"])
294+
trusted_set.update_snapshot(self.metadata["snapshot"])
295+
296+
# Test removing a meta_file in new_snapshot compared to the old snapshot
297+
snapshot_obj = Metadata.from_bytes(self.metadata["snapshot"])
298+
snapshot_obj.signed.meta = {}
299+
# prepare timestamp.meta["snapshot"].hashes
300+
self._sign_modified_obj("snapshot", snapshot_obj)
301+
timestamp_meta = trusted_set.timestamp.signed.meta["snapshot.json"]
302+
true_hashes = timestamp_meta.hashes or {}
303+
modified_snapshot_data = json.dumps(snapshot_obj.to_dict()).encode()
304+
modified_hashes = self._calculate_modified_hashes(
305+
true_hashes, modified_snapshot_data
306+
)
307+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = modified_hashes
308+
309+
with self.assertRaises(exceptions.RepositoryError):
310+
trusted_set.update_snapshot(modified_snapshot_data)
311+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = true_hashes
312+
313+
# snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
314+
for meta_file_path in trusted_set.snapshot.signed.meta.keys():
315+
trusted_set.snapshot.signed.meta[meta_file_path].version = 2
316+
with self.assertRaises(exceptions.BadVersionNumberError):
317+
trusted_set.update_snapshot(self.metadata["snapshot"])
318+
for meta_file_path in trusted_set.snapshot.signed.meta.keys():
319+
trusted_set.snapshot.signed.meta[meta_file_path].version = 1
320+
321+
# new_snapshot has expired
322+
snapshot_obj = Metadata.from_bytes(self.metadata["snapshot"])
323+
snapshot_obj.signed.expires = datetime.strptime(
324+
"1970-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"
325+
)
326+
self._sign_modified_obj("snapshot", snapshot_obj)
327+
modified_snapshot_data = json.dumps(snapshot_obj.to_dict()).encode()
328+
modified_hashes = self._calculate_modified_hashes(
329+
true_hashes, modified_snapshot_data
330+
)
331+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = modified_hashes
332+
with self.assertRaises(exceptions.ExpiredMetadataError):
333+
trusted_set.update_snapshot(modified_snapshot_data)
334+
trusted_set.timestamp.signed.meta["snapshot.json"].hashes = true_hashes
335+
336+
def test_update_targets(self):
337+
trusted_set = TrustedMetadataSet(self.metadata["root"])
338+
trusted_set.root_update_finished()
339+
trusted_set.update_timestamp(self.metadata["timestamp"])
340+
trusted_set.update_snapshot(self.metadata["snapshot"])
341+
342+
# remove meta information with information about targets from snapshot
343+
trusted_set.snapshot.signed.meta = {}
344+
with self.assertRaises(exceptions.RepositoryError):
345+
trusted_set.update_targets(self.metadata["targets"])
346+
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
347+
trusted_set.snapshot.signed.meta = snapshot.signed.meta
348+
349+
# observed_hash != stored hash in snapshot meta for targets
350+
true_hashes = {}
351+
for target_path, meta_file in trusted_set.snapshot.signed.meta.items():
352+
true_hashes[target_path] = meta_file.hashes
353+
trusted_set.snapshot.signed.meta[target_path].hashes = {"sha256": "b"}
354+
with self.assertRaises(exceptions.BadHashError):
355+
trusted_set.update_targets(self.metadata["targets"])
356+
# Return to the original hash values
357+
for target_path in true_hashes.keys():
358+
trusted_set.snapshot.signed.meta[target_path].hashes = \
359+
true_hashes[target_path]
360+
361+
# new_delegate.signed.version != meta.version stored in snapshot
362+
for target_path in trusted_set.snapshot.signed.meta.keys():
363+
trusted_set.snapshot.signed.meta[target_path].version = 2
364+
with self.assertRaises(exceptions.BadVersionNumberError):
365+
trusted_set.update_targets(self.metadata["targets"])
366+
trusted_set.snapshot.signed.meta[target_path].version = 1
367+
368+
# new_delegate has expired
369+
targets_obj = Metadata.from_bytes(self.metadata["targets"])
370+
targets_obj.signed.expires = datetime.strptime(
371+
"1970-01-01T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"
372+
)
373+
self._sign_modified_obj("targets", targets_obj)
374+
modified_targets_data = json.dumps(targets_obj.to_dict()).encode()
375+
with self.assertRaises(exceptions.ExpiredMetadataError):
376+
trusted_set.update_targets(modified_targets_data)
121377

122378
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
123-
# TODO test the actual specification checks
124379

125380

126381
if __name__ == '__main__':

0 commit comments

Comments
 (0)