10
10
network IO, which are not handled here.
11
11
12
12
Loaded metadata can be accessed via index access with rolename as key
13
- (trusted_set["root" ]) or, in the case of top-level metadata, using the helper
13
+ (trusted_set[Root.type ]) or, in the case of top-level metadata, using the helper
14
14
properties (trusted_set.root).
15
15
16
16
The rules that TrustedMetadataSet follows for top-level metadata are
35
35
>>> trusted_set = TrustedMetadataSet(f.read())
36
36
>>>
37
37
>>> # update root from remote until no more are available
38
- >>> with download("root" , trusted_set.root.signed.version + 1) as f:
38
+ >>> with download(Root.type , trusted_set.root.signed.version + 1) as f:
39
39
>>> trusted_set.update_root(f.read())
40
40
>>>
41
41
>>> # load local timestamp, then update from remote
45
45
>>> except (RepositoryError, OSError):
46
46
>>> pass # failure to load a local file is ok
47
47
>>>
48
- >>> with download("timestamp" ) as f:
48
+ >>> with download(Timestamp.type ) as f:
49
49
>>> trusted_set.update_timestamp(f.read())
50
50
>>>
51
51
>>> # load local snapshot, then update from remote if needed
55
55
>>> except (RepositoryError, OSError):
56
56
>>> # local snapshot is not valid, load from remote
57
57
>>> # (RepositoryErrors here stop the update)
58
- >>> with download("snapshot" , version) as f:
58
+ >>> with download(Snapshot.type , version) as f:
59
59
>>> trusted_set.update_snapshot(f.read())
60
60
61
61
TODO:
@@ -123,22 +123,22 @@ def __iter__(self) -> Iterator[Metadata]:
123
123
@property
124
124
def root (self ) -> Metadata [Root ]:
125
125
"""Current root Metadata"""
126
- return self ._trusted_set ["root" ]
126
+ return self ._trusted_set [Root . type ]
127
127
128
128
@property
129
129
def timestamp (self ) -> Optional [Metadata [Timestamp ]]:
130
130
"""Current timestamp Metadata or None"""
131
- return self ._trusted_set .get ("timestamp" )
131
+ return self ._trusted_set .get (Timestamp . type )
132
132
133
133
@property
134
134
def snapshot (self ) -> Optional [Metadata [Snapshot ]]:
135
135
"""Current snapshot Metadata or None"""
136
- return self ._trusted_set .get ("snapshot" )
136
+ return self ._trusted_set .get (Snapshot . type )
137
137
138
138
@property
139
139
def targets (self ) -> Optional [Metadata [Targets ]]:
140
140
"""Current targets Metadata or None"""
141
- return self ._trusted_set .get ("targets" )
141
+ return self ._trusted_set .get (Targets . type )
142
142
143
143
# Methods for updating metadata
144
144
def update_root (self , data : bytes ) -> None :
@@ -163,23 +163,25 @@ def update_root(self, data: bytes) -> None:
163
163
except DeserializationError as e :
164
164
raise exceptions .RepositoryError ("Failed to load root" ) from e
165
165
166
- if new_root .signed .type != "root" :
166
+ if new_root .signed .type != Root . type :
167
167
raise exceptions .RepositoryError (
168
168
f"Expected 'root', got '{ new_root .signed .type } '"
169
169
)
170
170
171
171
# Verify that new root is signed by trusted root
172
- self .root .verify_delegate ("root" , new_root )
172
+ self .root .verify_delegate (Root . type , new_root )
173
173
174
174
if new_root .signed .version != self .root .signed .version + 1 :
175
175
raise exceptions .ReplayedMetadataError (
176
- "root" , new_root .signed .version , self .root .signed .version
176
+ Root .type ,
177
+ new_root .signed .version ,
178
+ self .root .signed .version ,
177
179
)
178
180
179
181
# Verify that new root is signed by itself
180
- new_root .verify_delegate ("root" , new_root )
182
+ new_root .verify_delegate (Root . type , new_root )
181
183
182
- self ._trusted_set ["root" ] = new_root
184
+ self ._trusted_set [Root . type ] = new_root
183
185
logger .info ("Updated root v%d" , new_root .signed .version )
184
186
185
187
def update_timestamp (self , data : bytes ) -> None :
@@ -214,20 +216,20 @@ def update_timestamp(self, data: bytes) -> None:
214
216
except DeserializationError as e :
215
217
raise exceptions .RepositoryError ("Failed to load timestamp" ) from e
216
218
217
- if new_timestamp .signed .type != "timestamp" :
219
+ if new_timestamp .signed .type != Timestamp . type :
218
220
raise exceptions .RepositoryError (
219
221
f"Expected 'timestamp', got '{ new_timestamp .signed .type } '"
220
222
)
221
223
222
- self .root .verify_delegate ("timestamp" , new_timestamp )
224
+ self .root .verify_delegate (Timestamp . type , new_timestamp )
223
225
224
226
# If an existing trusted timestamp is updated,
225
227
# check for a rollback attack
226
228
if self .timestamp is not None :
227
229
# Prevent rolling back timestamp version
228
230
if new_timestamp .signed .version < self .timestamp .signed .version :
229
231
raise exceptions .ReplayedMetadataError (
230
- "timestamp" ,
232
+ Timestamp . type ,
231
233
new_timestamp .signed .version ,
232
234
self .timestamp .signed .version ,
233
235
)
@@ -237,15 +239,15 @@ def update_timestamp(self, data: bytes) -> None:
237
239
< self .timestamp .signed .snapshot_meta .version
238
240
):
239
241
raise exceptions .ReplayedMetadataError (
240
- "snapshot" ,
242
+ Snapshot . type ,
241
243
new_timestamp .signed .snapshot_meta .version ,
242
244
self .timestamp .signed .snapshot_meta .version ,
243
245
)
244
246
245
247
# expiry not checked to allow old timestamp to be used for rollback
246
248
# protection of new timestamp: expiry is checked in update_snapshot()
247
249
248
- self ._trusted_set ["timestamp" ] = new_timestamp
250
+ self ._trusted_set [Timestamp . type ] = new_timestamp
249
251
logger .info ("Updated timestamp v%d" , new_timestamp .signed .version )
250
252
251
253
# timestamp is loaded: raise if it is not valid _final_ timestamp
@@ -310,12 +312,12 @@ def update_snapshot(
310
312
except DeserializationError as e :
311
313
raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
312
314
313
- if new_snapshot .signed .type != "snapshot" :
315
+ if new_snapshot .signed .type != Snapshot . type :
314
316
raise exceptions .RepositoryError (
315
317
f"Expected 'snapshot', got '{ new_snapshot .signed .type } '"
316
318
)
317
319
318
- self .root .verify_delegate ("snapshot" , new_snapshot )
320
+ self .root .verify_delegate (Snapshot . type , new_snapshot )
319
321
320
322
# version not checked against meta version to allow old snapshot to be
321
323
# used in rollback protection: it is checked when targets is updated
@@ -341,7 +343,7 @@ def update_snapshot(
341
343
# expiry not checked to allow old snapshot to be used for rollback
342
344
# protection of new snapshot: it is checked when targets is updated
343
345
344
- self ._trusted_set ["snapshot" ] = new_snapshot
346
+ self ._trusted_set [Snapshot . type ] = new_snapshot
345
347
logger .info ("Updated snapshot v%d" , new_snapshot .signed .version )
346
348
347
349
# snapshot is loaded, but we raise if it's not valid _final_ snapshot
@@ -371,7 +373,7 @@ def update_targets(self, data: bytes) -> None:
371
373
RepositoryError: Metadata failed to load or verify. The actual
372
374
error type and content will contain more details.
373
375
"""
374
- self .update_delegated_targets (data , "targets" , "root" )
376
+ self .update_delegated_targets (data , Targets . type , Root . type )
375
377
376
378
def update_delegated_targets (
377
379
self , data : bytes , role_name : str , delegator_name : str
@@ -419,7 +421,7 @@ def update_delegated_targets(
419
421
except DeserializationError as e :
420
422
raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
421
423
422
- if new_delegate .signed .type != "targets" :
424
+ if new_delegate .signed .type != Targets . type :
423
425
raise exceptions .RepositoryError (
424
426
f"Expected 'targets', got '{ new_delegate .signed .type } '"
425
427
)
@@ -449,12 +451,12 @@ def _load_trusted_root(self, data: bytes) -> None:
449
451
except DeserializationError as e :
450
452
raise exceptions .RepositoryError ("Failed to load root" ) from e
451
453
452
- if new_root .signed .type != "root" :
454
+ if new_root .signed .type != Root . type :
453
455
raise exceptions .RepositoryError (
454
456
f"Expected 'root', got '{ new_root .signed .type } '"
455
457
)
456
458
457
- new_root .verify_delegate ("root" , new_root )
459
+ new_root .verify_delegate (Root . type , new_root )
458
460
459
- self ._trusted_set ["root" ] = new_root
461
+ self ._trusted_set [Root . type ] = new_root
460
462
logger .info ("Loaded trusted root v%d" , new_root .signed .version )
0 commit comments