3
3
# Copyright 2021, New York University and the TUF contributors
4
4
# SPDX-License-Identifier: MIT OR Apache-2.0
5
5
6
- """Test updating delegated targets roles with various
7
- delegation hierarchies """
6
+ """Test updating delegated targets roles and searching for
7
+ target files with various delegation graphs """
8
8
9
9
import os
10
10
import sys
11
11
import tempfile
12
12
import unittest
13
13
from dataclasses import astuple , dataclass , field
14
- from typing import Iterable , List , Optional
14
+ from typing import Iterable , List , Optional , Tuple
15
15
from unittest .mock import call , patch
16
16
17
17
from tests import utils
@@ -36,18 +36,25 @@ class TestDelegation:
36
36
path_hash_prefixes : Optional [List [str ]] = None
37
37
38
38
39
+ @dataclass
40
+ class TestTarget :
41
+ rolename : str
42
+ content : bytes
43
+ targetpath : str
44
+
45
+
39
46
@dataclass
40
47
class DelegationsTestCase :
41
- """Describes a delegations graph as a list of delegations
42
- and the expected order of traversal as 'visited_order' ."""
48
+ """A delegations graph as lists of delegations and target files
49
+ and the expected order of traversal as a list of role names ."""
43
50
44
51
delegations : List [TestDelegation ]
45
- visited_order : List [str ]
52
+ target_files : List [TestTarget ] = field (default_factory = list )
53
+ visited_order : List [str ] = field (default_factory = list )
46
54
47
55
48
- class TestDelegationsGraphs (unittest .TestCase ):
49
- """Test creating delegations graphs with different complexity
50
- and successfully updating the delegated roles metadata"""
56
+ class TestDelegations (unittest .TestCase ):
57
+ """Base class for delegation tests"""
51
58
52
59
# set dump_dir to trigger repository state dumps
53
60
dump_dir : Optional [str ] = None
@@ -59,73 +66,76 @@ def setUp(self) -> None:
59
66
self .targets_dir = os .path .join (self .temp_dir .name , "targets" )
60
67
os .mkdir (self .metadata_dir )
61
68
os .mkdir (self .targets_dir )
69
+ self .sim : RepositorySimulator
62
70
63
71
def tearDown (self ) -> None :
64
72
self .temp_dir .cleanup ()
65
73
66
- def setup_subtest (
67
- self , delegations : List [TestDelegation ]
68
- ) -> RepositorySimulator :
69
- sim = self ._init_repo (delegations )
70
-
74
+ def setup_subtest (self ) -> None :
71
75
self .subtest_count += 1
72
76
if self .dump_dir is not None :
73
77
# create subtest dumpdir
74
78
name = f"{ self .id ().split ('.' )[- 1 ]} -{ self .subtest_count } "
75
- sim .dump_dir = os .path .join (self .dump_dir , name )
76
- os .mkdir (sim .dump_dir )
79
+ self . sim .dump_dir = os .path .join (self .dump_dir , name )
80
+ os .mkdir (self . sim .dump_dir )
77
81
# dump the repo simulator metadata
78
- sim .write ()
79
-
80
- return sim
82
+ self .sim .write ()
81
83
82
84
def teardown_subtest (self ) -> None :
83
- # clean up after each subtest
84
85
utils .cleanup_dir (self .metadata_dir )
85
86
86
- def _init_updater (self , sim : RepositorySimulator ) -> Updater :
87
- """Create a new Updater instance"""
88
- return Updater (
89
- self .metadata_dir ,
90
- "https://example.com/metadata/" ,
91
- self .targets_dir ,
92
- "https://example.com/targets/" ,
93
- sim ,
94
- )
87
+ def _init_repo (self , test_case : DelegationsTestCase ) -> None :
88
+ """Create a new RepositorySimulator instance and
89
+ populate it with delegations and target files"""
95
90
96
- def _init_repo (
97
- self , delegations : List [TestDelegation ]
98
- ) -> RepositorySimulator :
99
- """Create a new RepositorySimulator instance with 'delegations'"""
100
- sim = RepositorySimulator ()
91
+ self .sim = RepositorySimulator ()
101
92
spec_version = "." .join (SPECIFICATION_VERSION )
102
-
103
- for d in delegations :
104
- if d .rolename in sim .md_delegates :
105
- targets = sim .md_delegates [d .rolename ].signed
93
+ for d in test_case .delegations :
94
+ if d .rolename in self .sim .md_delegates :
95
+ targets = self .sim .md_delegates [d .rolename ].signed
106
96
else :
107
- targets = Targets (1 , spec_version , sim .safe_expiry , {}, None )
108
-
97
+ targets = Targets (
98
+ 1 , spec_version , self .sim .safe_expiry , {}, None
99
+ )
109
100
# unpack 'd' but skip "delegator"
110
101
role = DelegatedRole (* astuple (d )[1 :])
111
- sim .add_delegation (d .delegator , role , targets )
112
- sim .update_snapshot ()
102
+ self .sim .add_delegation (d .delegator , role , targets )
103
+
104
+ for target in test_case .target_files :
105
+ self .sim .add_target (* astuple (target ))
113
106
107
+ if test_case .target_files :
108
+ self .sim .targets .version += 1
109
+ self .sim .update_snapshot ()
110
+
111
+ def _init_updater (self ) -> Updater :
112
+ """Create a new Updater instance"""
114
113
# Init trusted root for Updater
115
114
with open (os .path .join (self .metadata_dir , "root.json" ), "bw" ) as f :
116
- root = sim .download_bytes (
115
+ root = self . sim .download_bytes (
117
116
"https://example.com/metadata/1.root.json" , 100000
118
117
)
119
118
f .write (root )
120
119
121
- return sim
120
+ return Updater (
121
+ self .metadata_dir ,
122
+ "https://example.com/metadata/" ,
123
+ self .targets_dir ,
124
+ "https://example.com/targets/" ,
125
+ self .sim ,
126
+ )
122
127
123
128
def _assert_files_exist (self , roles : Iterable [str ]) -> None :
124
129
"""Assert that local metadata files exist for 'roles'"""
125
130
expected_files = sorted ([f"{ role } .json" for role in roles ])
126
131
local_metadata_files = sorted (os .listdir (self .metadata_dir ))
127
132
self .assertListEqual (local_metadata_files , expected_files )
128
133
134
+
135
+ class TestDelegationsGraphs (TestDelegations ):
136
+ """Test creating delegations graphs with different complexity
137
+ and successfully updating the delegated roles metadata"""
138
+
129
139
graphs : utils .DataSet = {
130
140
"basic delegation" : DelegationsTestCase (
131
141
delegations = [TestDelegation ("targets" , "A" )],
@@ -251,8 +261,10 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
251
261
exp_files = [* TOP_LEVEL_ROLE_NAMES , * test_data .visited_order ]
252
262
exp_calls = [call (role , 1 ) for role in test_data .visited_order ]
253
263
254
- sim = self .setup_subtest (test_data .delegations )
255
- updater = self ._init_updater (sim )
264
+ self ._init_repo (test_data )
265
+ self .setup_subtest ()
266
+
267
+ updater = self ._init_updater ()
256
268
# restrict the max number of delegations to simplify the test
257
269
updater .config .max_delegations = 4
258
270
# Call explicitly refresh to simplify the expected_calls list
@@ -261,7 +273,7 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
261
273
self ._assert_files_exist (TOP_LEVEL_ROLE_NAMES )
262
274
263
275
with patch .object (
264
- sim , "_fetch_metadata" , wraps = sim ._fetch_metadata
276
+ self . sim , "_fetch_metadata" , wraps = self . sim ._fetch_metadata
265
277
) as wrapped_fetch :
266
278
# Looking for a non-existing targetpath forces updater
267
279
# to visit all possible delegated roles
@@ -276,10 +288,76 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
276
288
self .teardown_subtest ()
277
289
278
290
291
+ class TestTargetFileSearch (TestDelegations ):
292
+ """
293
+ Create a single repository with the following delegations:
294
+
295
+ targets
296
+ *py / \*
297
+ A B
298
+ *txt / \*
299
+ C D
300
+
301
+ Test that Updater successfully finds the target files metadata,
302
+ traversing the delegations as expected.
303
+ """
304
+
305
+ delegations_tree = DelegationsTestCase (
306
+ delegations = [
307
+ TestDelegation ("targets" , "A" , paths = ["*.py" ]),
308
+ TestDelegation ("targets" , "B" , paths = ["*" ]),
309
+ TestDelegation ("B" , "C" , paths = ["*.txt" ]),
310
+ TestDelegation ("B" , "D" , paths = ["*" ]),
311
+ ],
312
+ target_files = [
313
+ TestTarget ("targets" , b"content" , "file" ),
314
+ TestTarget ("A" , b"content" , "file.py" ),
315
+ TestTarget ("C" , b"content" , "file.txt" ),
316
+ TestTarget ("D" , b"content" , "file.ext" ),
317
+ ],
318
+ )
319
+
320
+ def setUp (self ) -> None :
321
+ super ().setUp ()
322
+ self ._init_repo (self .delegations_tree )
323
+
324
+ targets : utils .DataSet = {
325
+ "file" : ("file" , []),
326
+ "file.py" : ("file.py" , ["A" ]),
327
+ "file.txt" : ("file.txt" , ["B" , "C" ]),
328
+ "file.ext" : ("file.ext" , ["B" , "D" ]),
329
+ }
330
+
331
+ @utils .run_sub_tests_with_dataset (targets )
332
+ def test_targetfile_search (self , test_data : Tuple [str , List [str ]]) -> None :
333
+ try :
334
+ self .setup_subtest ()
335
+ targetpath , visited_order = test_data
336
+ exp_files = [* TOP_LEVEL_ROLE_NAMES , * visited_order ]
337
+ exp_calls = [call (role , 1 ) for role in visited_order ]
338
+ updater = self ._init_updater ()
339
+ # Call explicitly refresh to simplify the expected_calls list
340
+ updater .refresh ()
341
+ with patch .object (
342
+ self .sim , "_fetch_metadata" , wraps = self .sim ._fetch_metadata
343
+ ) as wrapped_fetch :
344
+ target = updater .get_targetinfo (targetpath )
345
+ # Confirm that the expected TargetFile is found
346
+ assert target is not None
347
+ exp_target = self .sim .target_files [targetpath ].target_file
348
+ self .assertDictEqual (target .to_dict (), exp_target .to_dict ())
349
+ # Check that the delegated roles were visited in the expected
350
+ # order and the corresponding metadata files were persisted
351
+ self .assertListEqual (wrapped_fetch .call_args_list , exp_calls )
352
+ self ._assert_files_exist (exp_files )
353
+ finally :
354
+ self .teardown_subtest ()
355
+
356
+
279
357
if __name__ == "__main__" :
280
358
if "--dump" in sys .argv :
281
- TestDelegationsGraphs .dump_dir = tempfile .mkdtemp ()
282
- print (f"Repository Simulator dumps in { TestDelegationsGraphs .dump_dir } " )
359
+ TestDelegations .dump_dir = tempfile .mkdtemp ()
360
+ print (f"Repository Simulator dumps in { TestDelegations .dump_dir } " )
283
361
sys .argv .remove ("--dump" )
284
362
285
363
utils .configure_test_logging (sys .argv )
0 commit comments