10
10
network IO, which are not handled here.
11
11
12
12
Loaded metadata can be accessed via index access with rolename as key
13
- (trusted_set["root" ]) or, in the case of top-level metadata, using the helper
13
+ (trusted_set[RoleNames.ROOT.value ]) or, in the case of top-level metadata, using the helper
14
14
properties (trusted_set.root).
15
15
16
16
The rules that TrustedMetadataSet follows for top-level metadata are
35
35
>>> trusted_set = TrustedMetadataSet(f.read())
36
36
>>>
37
37
>>> # update root from remote until no more are available
38
- >>> with download("root" , trusted_set.root.signed.version + 1) as f:
38
+ >>> with download(RoleNames.ROOT.value , trusted_set.root.signed.version + 1) as f:
39
39
>>> trusted_set.update_root(f.read())
40
40
>>>
41
41
>>> # load local timestamp, then update from remote
45
45
>>> except (RepositoryError, OSError):
46
46
>>> pass # failure to load a local file is ok
47
47
>>>
48
- >>> with download("timestamp" ) as f:
48
+ >>> with download(RoleNames.TIMESTAMP.value ) as f:
49
49
>>> trusted_set.update_timestamp(f.read())
50
50
>>>
51
51
>>> # load local snapshot, then update from remote if needed
55
55
>>> except (RepositoryError, OSError):
56
56
>>> # local snapshot is not valid, load from remote
57
57
>>> # (RepositoryErrors here stop the update)
58
- >>> with download("snapshot" , version) as f:
58
+ >>> with download(RoleNames.SNAPSHOT.value , version) as f:
59
59
>>> trusted_set.update_snapshot(f.read())
60
60
61
61
TODO:
73
73
from typing import Dict , Iterator , Optional
74
74
75
75
from tuf import exceptions
76
- from tuf .api .metadata import Metadata , Root , Snapshot , Targets , Timestamp
76
+ from tuf .api .metadata import (
77
+ Metadata ,
78
+ RoleNames ,
79
+ Root ,
80
+ Snapshot ,
81
+ Targets ,
82
+ Timestamp ,
83
+ )
77
84
from tuf .api .serialization import DeserializationError
78
85
79
86
logger = logging .getLogger (__name__ )
@@ -123,22 +130,22 @@ def __iter__(self) -> Iterator[Metadata]:
123
130
@property
124
131
def root (self ) -> Metadata [Root ]:
125
132
"""Current root Metadata"""
126
- return self ._trusted_set ["root" ]
133
+ return self ._trusted_set [RoleNames . ROOT . value ]
127
134
128
135
@property
129
136
def timestamp (self ) -> Optional [Metadata [Timestamp ]]:
130
137
"""Current timestamp Metadata or None"""
131
- return self ._trusted_set .get ("timestamp" )
138
+ return self ._trusted_set .get (RoleNames . TIMESTAMP . value )
132
139
133
140
@property
134
141
def snapshot (self ) -> Optional [Metadata [Snapshot ]]:
135
142
"""Current snapshot Metadata or None"""
136
- return self ._trusted_set .get ("snapshot" )
143
+ return self ._trusted_set .get (RoleNames . SNAPSHOT . value )
137
144
138
145
@property
139
146
def targets (self ) -> Optional [Metadata [Targets ]]:
140
147
"""Current targets Metadata or None"""
141
- return self ._trusted_set .get ("targets" )
148
+ return self ._trusted_set .get (RoleNames . TARGETS . value )
142
149
143
150
# Methods for updating metadata
144
151
def update_root (self , data : bytes ) -> None :
@@ -163,23 +170,25 @@ def update_root(self, data: bytes) -> None:
163
170
except DeserializationError as e :
164
171
raise exceptions .RepositoryError ("Failed to load root" ) from e
165
172
166
- if new_root .signed .type != "root" :
173
+ if new_root .signed .type != RoleNames . ROOT . value :
167
174
raise exceptions .RepositoryError (
168
175
f"Expected 'root', got '{ new_root .signed .type } '"
169
176
)
170
177
171
178
# Verify that new root is signed by trusted root
172
- self .root .verify_delegate ("root" , new_root )
179
+ self .root .verify_delegate (RoleNames . ROOT . value , new_root )
173
180
174
181
if new_root .signed .version != self .root .signed .version + 1 :
175
182
raise exceptions .ReplayedMetadataError (
176
- "root" , new_root .signed .version , self .root .signed .version
183
+ RoleNames .ROOT .value ,
184
+ new_root .signed .version ,
185
+ self .root .signed .version ,
177
186
)
178
187
179
188
# Verify that new root is signed by itself
180
- new_root .verify_delegate ("root" , new_root )
189
+ new_root .verify_delegate (RoleNames . ROOT . value , new_root )
181
190
182
- self ._trusted_set ["root" ] = new_root
191
+ self ._trusted_set [RoleNames . ROOT . value ] = new_root
183
192
logger .info ("Updated root v%d" , new_root .signed .version )
184
193
185
194
def update_timestamp (self , data : bytes ) -> None :
@@ -214,20 +223,20 @@ def update_timestamp(self, data: bytes) -> None:
214
223
except DeserializationError as e :
215
224
raise exceptions .RepositoryError ("Failed to load timestamp" ) from e
216
225
217
- if new_timestamp .signed .type != "timestamp" :
226
+ if new_timestamp .signed .type != RoleNames . TIMESTAMP . value :
218
227
raise exceptions .RepositoryError (
219
228
f"Expected 'timestamp', got '{ new_timestamp .signed .type } '"
220
229
)
221
230
222
- self .root .verify_delegate ("timestamp" , new_timestamp )
231
+ self .root .verify_delegate (RoleNames . TIMESTAMP . value , new_timestamp )
223
232
224
233
# If an existing trusted timestamp is updated,
225
234
# check for a rollback attack
226
235
if self .timestamp is not None :
227
236
# Prevent rolling back timestamp version
228
237
if new_timestamp .signed .version < self .timestamp .signed .version :
229
238
raise exceptions .ReplayedMetadataError (
230
- "timestamp" ,
239
+ RoleNames . TIMESTAMP . value ,
231
240
new_timestamp .signed .version ,
232
241
self .timestamp .signed .version ,
233
242
)
@@ -237,15 +246,15 @@ def update_timestamp(self, data: bytes) -> None:
237
246
< self .timestamp .signed .snapshot_meta .version
238
247
):
239
248
raise exceptions .ReplayedMetadataError (
240
- "snapshot" ,
249
+ RoleNames . SNAPSHOT . value ,
241
250
new_timestamp .signed .snapshot_meta .version ,
242
251
self .timestamp .signed .snapshot_meta .version ,
243
252
)
244
253
245
254
# expiry not checked to allow old timestamp to be used for rollback
246
255
# protection of new timestamp: expiry is checked in update_snapshot()
247
256
248
- self ._trusted_set ["timestamp" ] = new_timestamp
257
+ self ._trusted_set [RoleNames . TIMESTAMP . value ] = new_timestamp
249
258
logger .info ("Updated timestamp v%d" , new_timestamp .signed .version )
250
259
251
260
# timestamp is loaded: raise if it is not valid _final_ timestamp
@@ -310,12 +319,12 @@ def update_snapshot(
310
319
except DeserializationError as e :
311
320
raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
312
321
313
- if new_snapshot .signed .type != "snapshot" :
322
+ if new_snapshot .signed .type != RoleNames . SNAPSHOT . value :
314
323
raise exceptions .RepositoryError (
315
324
f"Expected 'snapshot', got '{ new_snapshot .signed .type } '"
316
325
)
317
326
318
- self .root .verify_delegate ("snapshot" , new_snapshot )
327
+ self .root .verify_delegate (RoleNames . SNAPSHOT . value , new_snapshot )
319
328
320
329
# version not checked against meta version to allow old snapshot to be
321
330
# used in rollback protection: it is checked when targets is updated
@@ -341,7 +350,7 @@ def update_snapshot(
341
350
# expiry not checked to allow old snapshot to be used for rollback
342
351
# protection of new snapshot: it is checked when targets is updated
343
352
344
- self ._trusted_set ["snapshot" ] = new_snapshot
353
+ self ._trusted_set [RoleNames . SNAPSHOT . value ] = new_snapshot
345
354
logger .info ("Updated snapshot v%d" , new_snapshot .signed .version )
346
355
347
356
# snapshot is loaded, but we raise if it's not valid _final_ snapshot
@@ -371,7 +380,9 @@ def update_targets(self, data: bytes) -> None:
371
380
RepositoryError: Metadata failed to load or verify. The actual
372
381
error type and content will contain more details.
373
382
"""
374
- self .update_delegated_targets (data , "targets" , "root" )
383
+ self .update_delegated_targets (
384
+ data , RoleNames .TARGETS .value , RoleNames .ROOT .value
385
+ )
375
386
376
387
def update_delegated_targets (
377
388
self , data : bytes , role_name : str , delegator_name : str
@@ -419,7 +430,7 @@ def update_delegated_targets(
419
430
except DeserializationError as e :
420
431
raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
421
432
422
- if new_delegate .signed .type != "targets" :
433
+ if new_delegate .signed .type != RoleNames . TARGETS . value :
423
434
raise exceptions .RepositoryError (
424
435
f"Expected 'targets', got '{ new_delegate .signed .type } '"
425
436
)
@@ -449,12 +460,12 @@ def _load_trusted_root(self, data: bytes) -> None:
449
460
except DeserializationError as e :
450
461
raise exceptions .RepositoryError ("Failed to load root" ) from e
451
462
452
- if new_root .signed .type != "root" :
463
+ if new_root .signed .type != RoleNames . ROOT . value :
453
464
raise exceptions .RepositoryError (
454
465
f"Expected 'root', got '{ new_root .signed .type } '"
455
466
)
456
467
457
- new_root .verify_delegate ("root" , new_root )
468
+ new_root .verify_delegate (RoleNames . ROOT . value , new_root )
458
469
459
- self ._trusted_set ["root" ] = new_root
470
+ self ._trusted_set [RoleNames . ROOT . value ] = new_root
460
471
logger .info ("Loaded trusted root v%d" , new_root .signed .version )
0 commit comments