1
+ import copy
1
2
import json
2
3
import logging
3
4
import os
4
- import shutil
5
5
import sys
6
- import tempfile
7
6
import unittest
7
+ from typing import Dict , Any
8
+ from datetime import datetime
8
9
9
10
from tuf import exceptions
10
- from tuf .api .metadata import Metadata
11
- from tuf .ngclient ._internal .trusted_metadata_set import TrustedMetadataSet
11
+ from tuf .api .metadata import Metadata , MetaFile
12
+ from tuf .ngclient ._internal .trusted_metadata_set import (
13
+ TrustedMetadataSet ,
14
+ verify_with_threshold
15
+ )
16
+ from securesystemslib import hash as sslib_hash
17
+ from securesystemslib .signer import SSlibSigner
18
+ from securesystemslib .interface import import_ed25519_privatekey_from_file
12
19
13
20
from tests import utils
14
21
@@ -26,6 +33,19 @@ def setUpClass(cls):
26
33
with open (os .path .join (cls .repo_dir , f"{ md } .json" ), "rb" ) as f :
27
34
cls .metadata [md ] = f .read ()
28
35
36
+ cls .root = Metadata .from_bytes (cls .metadata ["root" ])
37
+ cls .timestamp = Metadata .from_bytes (cls .metadata ["timestamp" ])
38
+ cls .snapshot = Metadata .from_bytes (cls .metadata ["snapshot" ])
39
+ cls .targets = Metadata .from_bytes (cls .metadata ["targets" ])
40
+ cls .delegated_role = Metadata .from_bytes (cls .metadata ["role1" ])
41
+
42
+ keystore_dir = os .path .join (os .getcwd (), 'repository_data' , 'keystore' )
43
+ cls .keystore = {}
44
+ for role in ['delegation' , 'snapshot' , 'targets' , 'timestamp' ]:
45
+ cls .keystore [role ] = import_ed25519_privatekey_from_file (
46
+ os .path .join (keystore_dir , role + '_key' ),
47
+ password = "password"
48
+ )
29
49
30
50
def test_update (self ):
31
51
trusted_set = TrustedMetadataSet (self .metadata ["root" ])
@@ -41,6 +61,9 @@ def test_update(self):
41
61
self .metadata ["role2" ], "role2" , "role1"
42
62
)
43
63
64
+ # the 4 top level metadata objects + 2 additional delegated targets
65
+ self .assertTrue (len (trusted_set ), 6 )
66
+
44
67
def test_out_of_order_ops (self ):
45
68
trusted_set = TrustedMetadataSet (self .metadata ["root" ])
46
69
@@ -118,9 +141,228 @@ def test_update_with_invalid_json(self):
118
141
119
142
update_func (metadata )
120
143
144
+ def test_verify_with_threshold (self ):
145
+ # Call verify_with_threshold with non root or targets delegator.
146
+ with self .assertRaises (ValueError ):
147
+ verify_with_threshold (self .timestamp , "role1" , self .delegated_role )
148
+
149
+ # Call verify_with_threshold with non existent role_name.
150
+ with self .assertRaises (ValueError ):
151
+ verify_with_threshold (self .targets , "foo" , self .delegated_role )
152
+
153
+ def test_invalid_update_root (self ):
154
+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
155
+ # new_root data with invalid _type
156
+ invalid_type_data = json .loads (self .metadata ["root" ])
157
+ invalid_type_data ["signed" ]["_type" ] = "foo"
158
+ invalid_type_data = json .dumps (invalid_type_data )
159
+ # RepositoryError is throwned during new_root deserialization.
160
+ # It's not throwned when checking new_root.signed.type != "root"
161
+ with self .assertRaises (exceptions .RepositoryError ):
162
+ trusted_set .update_root (invalid_type_data .encode ())
163
+
164
+ # new_root data with threshold which cannot be verified.
165
+ modified_threshold_data = copy .deepcopy (
166
+ json .loads (self .metadata ["root" ])
167
+ )
168
+ modified_threshold_data ["signed" ]["roles" ]["root" ]["threshold" ] = 2
169
+ modified_threshold_data = json .dumps (modified_threshold_data ).encode ()
170
+ with self .assertRaises (exceptions .UnsignedMetadataError ):
171
+ trusted_set .update_root (modified_threshold_data )
172
+
173
+ # new_root.signed.version has the same version as old root
174
+ with self .assertRaises (exceptions .ReplayedMetadataError ):
175
+ trusted_set .update_root (self .metadata ["root" ])
176
+
177
+ # if _root_update_finished, then fail when calling update_root
178
+ trusted_set .root_update_finished ()
179
+ with self .assertRaises (RuntimeError ):
180
+ trusted_set .update_root (self .metadata ["root" ])
181
+ trusted_set ._root_update_finished = False
182
+
183
+ def test_root_update_finished_expired (self ):
184
+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
185
+ # call root_update_finished when trusted root has expired
186
+ expired_datetime = datetime .strptime (
187
+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
188
+ )
189
+ trusted_set .root .signed .expires = expired_datetime
190
+ with self .assertRaises (exceptions .ExpiredMetadataError ):
191
+ trusted_set .root_update_finished ()
192
+
193
+ def _sign_modified_obj (
194
+ self ,
195
+ role :str ,
196
+ metadata_obj : Metadata
197
+ ) -> Dict [str , Any ]:
198
+ key_dict = self .keystore [role ]
199
+ sslib_signer = SSlibSigner (key_dict )
200
+ signature = metadata_obj .sign (sslib_signer )
201
+ return signature .to_dict ()
202
+
203
+ def test_update_timestamp (self ):
204
+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
205
+ trusted_set .root_update_finished ()
206
+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
207
+ # new_timestamp.version < trusted_timestamp.version
208
+ trusted_set .timestamp .signed .version = 2
209
+ with self .assertRaises (exceptions .ReplayedMetadataError ):
210
+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
211
+ trusted_set .timestamp .signed .version = 1
212
+
213
+ # new_timestamp.snapshot.version < trusted_timestamp.snapshot.version
214
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 2
215
+ with self .assertRaises (exceptions .ReplayedMetadataError ):
216
+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
217
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 1
218
+
219
+ # new_timestamp has expired
220
+ timestamp = Metadata .from_bytes (self .metadata ["timestamp" ])
221
+ timestamp .signed .expires = datetime .strptime (
222
+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
223
+ )
224
+ self ._sign_modified_obj ("timestamp" , timestamp )
225
+ new_timestamp_byte_data = json .dumps (timestamp .to_dict ()).encode ()
226
+ with self .assertRaises (exceptions .ExpiredMetadataError ):
227
+ trusted_set .update_timestamp (new_timestamp_byte_data )
228
+
229
+ def _calculate_modified_hashes (
230
+ self , true_hashes ,
231
+ data : bytes
232
+ ) -> Dict [str , str ]:
233
+ modified_hashes = {}
234
+ # Calculate correct hashes to pass the first check
235
+ for algo in true_hashes .keys ():
236
+ digest_object = sslib_hash .digest (algo )
237
+ digest_object .update (data )
238
+ observed_hash = digest_object .hexdigest ()
239
+ modified_hashes [algo ] = observed_hash
240
+ return modified_hashes
241
+
242
+ def test_update_snapshot (self ):
243
+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
244
+ trusted_set .root_update_finished ()
245
+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
246
+ # cannot update snapshot after targets update completes or targets != None
247
+ targets_obj = Metadata .from_bytes (self .metadata ["targets" ])
248
+ trusted_set ._trusted_set ["targets" ] = targets_obj
249
+ with self .assertRaises (RuntimeError ):
250
+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
251
+ del trusted_set ._trusted_set ["targets" ]
252
+
253
+ # Deserialization error - failed to load new_snapshot.
254
+ timestamp_meta = trusted_set .timestamp .signed .meta ["snapshot.json" ]
255
+ true_hashes = timestamp_meta .hashes or {}
256
+
257
+ modified_hashes = self ._calculate_modified_hashes (
258
+ true_hashes , b'{""sig": }'
259
+ )
260
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
261
+ with self .assertRaises (exceptions .RepositoryError ):
262
+ trusted_set .update_snapshot (b'{""sig": }' )
263
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
264
+
265
+ # root data with threshold which cannot be verified for new_snapshot
266
+ trusted_set .root .signed .roles ["snapshot" ].threshold = 2
267
+ with self .assertRaises (exceptions .UnsignedMetadataError ):
268
+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
269
+ trusted_set .root .signed .roles ["snapshot" ].threshold = 1
270
+
271
+ # new_snapshot.version != trusted timestamp.meta["snapshot"].version
272
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 2
273
+ with self .assertRaises (exceptions .BadVersionNumberError ):
274
+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
275
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 1
276
+
277
+
278
+ def test_update_snapshot_after_succesfull_update (self ):
279
+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
280
+ trusted_set .root_update_finished ()
281
+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
282
+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
283
+
284
+ # Test removing a meta_file in new_snapshot compared to the old snapshot
285
+ snapshot_obj = Metadata .from_bytes (self .metadata ["snapshot" ])
286
+ snapshot_obj .signed .meta = {}
287
+ # prepare timestamp.meta["snapshot"].hashes
288
+ self ._sign_modified_obj ("snapshot" , snapshot_obj )
289
+ timestamp_meta = trusted_set .timestamp .signed .meta ["snapshot.json" ]
290
+ true_hashes = timestamp_meta .hashes or {}
291
+ modified_snapshot_data = json .dumps (snapshot_obj .to_dict ()).encode ()
292
+ modified_hashes = self ._calculate_modified_hashes (
293
+ true_hashes , json .dumps (snapshot_obj .to_dict ()).encode ()
294
+ )
295
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
296
+
297
+ with self .assertRaises (exceptions .RepositoryError ):
298
+ trusted_set .update_snapshot (modified_snapshot_data )
299
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
300
+
301
+ # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
302
+ for meta_file_path in trusted_set .snapshot .signed .meta .keys ():
303
+ trusted_set .snapshot .signed .meta [meta_file_path ].version = 2
304
+ with self .assertRaises (exceptions .BadVersionNumberError ):
305
+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
306
+ for meta_file_path in trusted_set .snapshot .signed .meta .keys ():
307
+ trusted_set .snapshot .signed .meta [meta_file_path ].version = 1
308
+
309
+ # new_snapshot has expired
310
+ snapshot_obj = Metadata .from_bytes (self .metadata ["snapshot" ])
311
+ snapshot_obj .signed .expires = datetime .strptime (
312
+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
313
+ )
314
+ self ._sign_modified_obj ("snapshot" , snapshot_obj )
315
+ modified_snapshot_data = json .dumps (snapshot_obj .to_dict ()).encode ()
316
+ modified_hashes = self ._calculate_modified_hashes (
317
+ true_hashes , modified_snapshot_data
318
+ )
319
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
320
+ with self .assertRaises (exceptions .ExpiredMetadataError ):
321
+ trusted_set .update_snapshot (modified_snapshot_data )
322
+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
323
+
324
+ def test_update_targets (self ):
325
+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
326
+ trusted_set .root_update_finished ()
327
+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
328
+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
329
+
330
+ # remove meta information with information about targets from snapshot
331
+ trusted_set .snapshot .signed .meta = {}
332
+ with self .assertRaises (exceptions .RepositoryError ):
333
+ trusted_set .update_targets (self .metadata ["targets" ])
334
+ trusted_set .snapshot .signed .meta = self .snapshot .signed .meta
335
+
336
+ # observed_hash != stored hash in snapshot for targets
337
+ true_hashes = {}
338
+ for target_path , meta_file in trusted_set .snapshot .signed .meta .items ():
339
+ true_hashes [target_path ] = meta_file .hashes
340
+ trusted_set .snapshot .signed .meta [target_path ].hashes = {"sha256" : "b" }
341
+ with self .assertRaises (exceptions .BadHashError ):
342
+ trusted_set .update_targets (self .metadata ["targets" ])
343
+ # Return to the original hash values
344
+ for target_path in true_hashes .keys ():
345
+ trusted_set .snapshot .signed .meta [target_path ].hashes = \
346
+ true_hashes [target_path ]
347
+
348
+ # new_delegate.signed.version != meta.version stored in snapshot
349
+ for target_path in trusted_set .snapshot .signed .meta .keys ():
350
+ trusted_set .snapshot .signed .meta [target_path ].version = 2
351
+ with self .assertRaises (exceptions .BadVersionNumberError ):
352
+ trusted_set .update_targets (self .metadata ["targets" ])
353
+ trusted_set .snapshot .signed .meta [target_path ].version = 1
354
+
355
+ # new_delegate has expired
356
+ targets_obj = Metadata .from_bytes (self .metadata ["targets" ])
357
+ targets_obj .signed .expires = datetime .strptime (
358
+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
359
+ )
360
+ self ._sign_modified_obj ("targets" , targets_obj )
361
+ modified_targets_data = json .dumps (targets_obj .to_dict ()).encode ()
362
+ with self .assertRaises (exceptions .ExpiredMetadataError ):
363
+ trusted_set .update_targets (modified_targets_data )
121
364
122
365
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
123
- # TODO test the actual specification checks
124
366
125
367
126
368
if __name__ == '__main__' :
0 commit comments