@@ -62,10 +62,6 @@ def setUp(self) -> None:
62
62
63
63
self .sim = RepositorySimulator ()
64
64
65
- # boostrap client with initial root metadata
66
- with open (os .path .join (self .metadata_dir , "root.json" ), "bw" ) as f :
67
- f .write (self .sim .signed_roots [0 ])
68
-
69
65
if self .dump_dir is not None :
70
66
# create test specific dump directory
71
67
name = self .id ().split ("." )[- 1 ]
@@ -75,22 +71,13 @@ def setUp(self) -> None:
75
71
def tearDown (self ) -> None :
76
72
self .temp_dir .cleanup ()
77
73
78
- def _run_refresh (self ) -> Updater :
74
+ def _run_refresh (self , skip_bootstrap : bool = False ) -> Updater :
79
75
"""Create a new Updater instance and refresh"""
80
- if self .dump_dir is not None :
81
- self .sim .write ()
82
-
83
- updater = Updater (
84
- self .metadata_dir ,
85
- "https://example.com/metadata/" ,
86
- self .targets_dir ,
87
- "https://example.com/targets/" ,
88
- self .sim ,
89
- )
76
+ updater = self ._init_updater (skip_bootstrap )
90
77
updater .refresh ()
91
78
return updater
92
79
93
- def _init_updater (self ) -> Updater :
80
+ def _init_updater (self , skip_bootstrap : bool = False ) -> Updater :
94
81
"""Create a new Updater instance"""
95
82
if self .dump_dir is not None :
96
83
self .sim .write ()
@@ -101,6 +88,7 @@ def _init_updater(self) -> Updater:
101
88
self .targets_dir ,
102
89
"https://example.com/targets/" ,
103
90
self .sim ,
91
+ bootstrap = None if skip_bootstrap else self .sim .signed_roots [0 ],
104
92
)
105
93
106
94
def _assert_files_exist (self , roles : Iterable [str ]) -> None :
@@ -126,9 +114,6 @@ def _assert_version_equals(self, role: str, expected_version: int) -> None:
126
114
self .assertEqual (md .signed .version , expected_version )
127
115
128
116
def test_first_time_refresh (self ) -> None :
129
- # Metadata dir contains only the mandatory initial root.json
130
- self ._assert_files_exist ([Root .type ])
131
-
132
117
# Add one more root version to repository so that
133
118
# refresh() updates from local trusted root (v1) to
134
119
# remote root (v2)
@@ -142,10 +127,11 @@ def test_first_time_refresh(self) -> None:
142
127
version = 2 if role == Root .type else None
143
128
self ._assert_content_equals (role , version )
144
129
145
- def test_trusted_root_missing (self ) -> None :
146
- os .remove (os .path .join (self .metadata_dir , "root.json" ))
130
+ def test_cached_root_missing_without_bootstrap (self ) -> None :
131
+ # Run update without a bootstrap, with empty cache: this fails since there is no
132
+ # trusted root
147
133
with self .assertRaises (OSError ):
148
- self ._run_refresh ()
134
+ self ._run_refresh (skip_bootstrap = True )
149
135
150
136
# Metadata dir is empty
151
137
self .assertFalse (os .listdir (self .metadata_dir ))
@@ -178,15 +164,15 @@ def test_trusted_root_expired(self) -> None:
178
164
self ._assert_files_exist (TOP_LEVEL_ROLE_NAMES )
179
165
self ._assert_content_equals (Root .type , 3 )
180
166
181
- def test_trusted_root_unsigned (self ) -> None :
182
- # Local trusted root is not signed
167
+ def test_trusted_root_unsigned_without_bootstrap (self ) -> None :
168
+ # Cached root is not signed, bootstrap root is not used
183
169
root_path = os .path .join (self .metadata_dir , "root.json" )
184
- md_root = Metadata .from_file ( root_path )
170
+ md_root = Metadata .from_bytes ( self . sim . signed_roots [ 0 ] )
185
171
md_root .signatures .clear ()
186
172
md_root .to_file (root_path )
187
173
188
174
with self .assertRaises (UnsignedMetadataError ):
189
- self ._run_refresh ()
175
+ self ._run_refresh (skip_bootstrap = True )
190
176
191
177
# The update failed, no changes in metadata
192
178
self ._assert_files_exist ([Root .type ])
@@ -204,10 +190,7 @@ def test_max_root_rotations(self) -> None:
204
190
self .sim .root .version += 1
205
191
self .sim .publish_root ()
206
192
207
- md_root = Metadata .from_file (
208
- os .path .join (self .metadata_dir , "root.json" )
209
- )
210
- initial_root_version = md_root .signed .version
193
+ initial_root_version = 1
211
194
212
195
updater .refresh ()
213
196
@@ -712,26 +695,20 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
712
695
updater = self ._run_refresh ()
713
696
updater .get_targetinfo ("non_existent_target" )
714
697
715
- # Clean up calls to open during refresh()
698
+ # Clear statistics for open() calls and metadata requests
716
699
wrapped_open .reset_mock ()
717
- # Clean up fetch tracker metadata
718
700
self .sim .fetch_tracker .metadata .clear ()
719
701
720
702
# Create a new updater and perform a second update while
721
703
# the metadata is already stored in cache (metadata dir)
722
- updater = Updater (
723
- self .metadata_dir ,
724
- "https://example.com/metadata/" ,
725
- self .targets_dir ,
726
- "https://example.com/targets/" ,
727
- self .sim ,
728
- )
704
+ updater = self ._init_updater ()
729
705
updater .get_targetinfo ("non_existent_target" )
730
706
731
707
# Test that metadata is loaded from cache and not downloaded
708
+ root_dir = os .path .join (self .metadata_dir , "root_history" )
732
709
wrapped_open .assert_has_calls (
733
710
[
734
- call (os .path .join (self . metadata_dir , "root.json" ), "rb" ),
711
+ call (os .path .join (root_dir , "2. root.json" ), "rb" ),
735
712
call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
736
713
call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
737
714
call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
@@ -742,6 +719,96 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
742
719
expected_calls = [("root" , 2 ), ("timestamp" , None )]
743
720
self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
744
721
722
+ @patch .object (builtins , "open" , wraps = builtins .open )
723
+ def test_intermediate_root_cache (self , wrapped_open : MagicMock ) -> None :
724
+ """Test that refresh uses the intermediate roots from cache"""
725
+ # Add root versions 2, 3
726
+ self .sim .root .version += 1
727
+ self .sim .publish_root ()
728
+ self .sim .root .version += 1
729
+ self .sim .publish_root ()
730
+
731
+ # Make a successful update of valid metadata which stores it in cache
732
+ self ._run_refresh ()
733
+
734
+ # assert that cache lookups happened but data was downloaded from remote
735
+ root_dir = os .path .join (self .metadata_dir , "root_history" )
736
+ wrapped_open .assert_has_calls (
737
+ [
738
+ call (os .path .join (root_dir , "2.root.json" ), "rb" ),
739
+ call (os .path .join (root_dir , "3.root.json" ), "rb" ),
740
+ call (os .path .join (root_dir , "4.root.json" ), "rb" ),
741
+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
742
+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
743
+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
744
+ ]
745
+ )
746
+ expected_calls = [
747
+ ("root" , 2 ),
748
+ ("root" , 3 ),
749
+ ("root" , 4 ),
750
+ ("timestamp" , None ),
751
+ ("snapshot" , 1 ),
752
+ ("targets" , 1 ),
753
+ ]
754
+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
755
+
756
+ # Clear statistics for open() calls and metadata requests
757
+ wrapped_open .reset_mock ()
758
+ self .sim .fetch_tracker .metadata .clear ()
759
+
760
+ # Run update again, assert that metadata from cache was used (including intermediate roots)
761
+ self ._run_refresh ()
762
+ wrapped_open .assert_has_calls (
763
+ [
764
+ call (os .path .join (root_dir , "2.root.json" ), "rb" ),
765
+ call (os .path .join (root_dir , "3.root.json" ), "rb" ),
766
+ call (os .path .join (root_dir , "4.root.json" ), "rb" ),
767
+ call (os .path .join (self .metadata_dir , "timestamp.json" ), "rb" ),
768
+ call (os .path .join (self .metadata_dir , "snapshot.json" ), "rb" ),
769
+ call (os .path .join (self .metadata_dir , "targets.json" ), "rb" ),
770
+ ]
771
+ )
772
+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
773
+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
774
+
775
+ def test_intermediate_root_cache_poisoning (self ) -> None :
776
+ """Test that refresh works as expected when intermediate roots in cache are poisoned"""
777
+ # Add root versions 2, 3
778
+ self .sim .root .version += 1
779
+ self .sim .publish_root ()
780
+ self .sim .root .version += 1
781
+ self .sim .publish_root ()
782
+
783
+ # Make a successful update of valid metadata which stores it in cache
784
+ self ._run_refresh ()
785
+
786
+ # Modify cached intermediate root v2 so that it's no longer signed correctly
787
+ root_path = os .path .join (
788
+ self .metadata_dir , "root_history" , "2.root.json"
789
+ )
790
+ md = Metadata .from_file (root_path )
791
+ md .signatures .clear ()
792
+ md .to_file (root_path )
793
+
794
+ # Clear statistics for metadata requests
795
+ self .sim .fetch_tracker .metadata .clear ()
796
+
797
+ # Update again, assert that intermediate root v2 was downloaded again
798
+ self ._run_refresh ()
799
+
800
+ expected_calls = [("root" , 2 ), ("root" , 4 ), ("timestamp" , None )]
801
+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
802
+
803
+ # Clear statistics for metadata requests
804
+ self .sim .fetch_tracker .metadata .clear ()
805
+
806
+ # Update again, this time assert that intermediate root v2 was used from cache
807
+ self ._run_refresh ()
808
+
809
+ expected_calls = [("root" , 4 ), ("timestamp" , None )]
810
+ self .assertListEqual (self .sim .fetch_tracker .metadata , expected_calls )
811
+
745
812
def test_expired_metadata (self ) -> None :
746
813
"""Verifies that expired local timestamp/snapshot can be used for
747
814
updating from remote.
0 commit comments