10
10
network IO, which are not handled here.
11
11
12
12
Loaded metadata can be accessed via index access with rolename as key
13
- (trusted_set[" root" ]) or, in the case of top-level metadata, using the helper
13
+ (trusted_set[Rolename. root]) or, in the case of top-level metadata, using the helper
14
14
properties (trusted_set.root).
15
15
16
16
The rules that TrustedMetadataSet follows for top-level metadata are
35
35
>>> trusted_set = TrustedMetadataSet(f.read())
36
36
>>>
37
37
>>> # update root from remote until no more are available
38
- >>> with download(" root" , trusted_set.root.signed.version + 1) as f:
38
+ >>> with download(Rolename. root, trusted_set.root.signed.version + 1) as f:
39
39
>>> trusted_set.update_root(f.read())
40
40
>>>
41
41
>>> # load local timestamp, then update from remote
45
45
>>> except (RepositoryError, OSError):
46
46
>>> pass # failure to load a local file is ok
47
47
>>>
48
- >>> with download(" timestamp" ) as f:
48
+ >>> with download(Rolename. timestamp) as f:
49
49
>>> trusted_set.update_timestamp(f.read())
50
50
>>>
51
51
>>> # load local snapshot, then update from remote if needed
55
55
>>> except (RepositoryError, OSError):
56
56
>>> # local snapshot is not valid, load from remote
57
57
>>> # (RepositoryErrors here stop the update)
58
- >>> with download(" snapshot" , version) as f:
58
+ >>> with download(Rolename. snapshot, version) as f:
59
59
>>> trusted_set.update_snapshot(f.read())
60
60
61
61
TODO:
73
73
from typing import Dict , Iterator , Optional
74
74
75
75
from tuf import exceptions
76
- from tuf .api .metadata import Metadata , Root , Snapshot , Targets , Timestamp
76
+ from tuf .api .metadata import Metadata , Root , Snapshot , Targets , Timestamp , Rolename
77
77
from tuf .api .serialization import DeserializationError
78
78
79
79
logger = logging .getLogger (__name__ )
@@ -123,22 +123,22 @@ def __iter__(self) -> Iterator[Metadata]:
123
123
@property
124
124
def root (self ) -> Metadata [Root ]:
125
125
"""Current root Metadata"""
126
- return self ._trusted_set [" root" ]
126
+ return self ._trusted_set [Rolename . root ]
127
127
128
128
@property
129
129
def timestamp (self ) -> Optional [Metadata [Timestamp ]]:
130
130
"""Current timestamp Metadata or None"""
131
- return self ._trusted_set .get (" timestamp" )
131
+ return self ._trusted_set .get (Rolename . timestamp )
132
132
133
133
@property
134
134
def snapshot (self ) -> Optional [Metadata [Snapshot ]]:
135
135
"""Current snapshot Metadata or None"""
136
- return self ._trusted_set .get (" snapshot" )
136
+ return self ._trusted_set .get (Rolename . snapshot )
137
137
138
138
@property
139
139
def targets (self ) -> Optional [Metadata [Targets ]]:
140
140
"""Current targets Metadata or None"""
141
- return self ._trusted_set .get (" targets" )
141
+ return self ._trusted_set .get (Rolename . targets )
142
142
143
143
# Methods for updating metadata
144
144
def update_root (self , data : bytes ) -> None :
@@ -163,23 +163,23 @@ def update_root(self, data: bytes) -> None:
163
163
except DeserializationError as e :
164
164
raise exceptions .RepositoryError ("Failed to load root" ) from e
165
165
166
- if new_root .signed .type != " root" :
166
+ if new_root .signed .type != Rolename . root :
167
167
raise exceptions .RepositoryError (
168
168
f"Expected 'root', got '{ new_root .signed .type } '"
169
169
)
170
170
171
171
# Verify that new root is signed by trusted root
172
- self .root .verify_delegate (" root" , new_root )
172
+ self .root .verify_delegate (Rolename . root , new_root )
173
173
174
174
if new_root .signed .version != self .root .signed .version + 1 :
175
175
raise exceptions .ReplayedMetadataError (
176
- " root" , new_root .signed .version , self .root .signed .version
176
+ Rolename . root , new_root .signed .version , self .root .signed .version
177
177
)
178
178
179
179
# Verify that new root is signed by itself
180
- new_root .verify_delegate (" root" , new_root )
180
+ new_root .verify_delegate (Rolename . root , new_root )
181
181
182
- self ._trusted_set [" root" ] = new_root
182
+ self ._trusted_set [Rolename . root ] = new_root
183
183
logger .info ("Updated root v%d" , new_root .signed .version )
184
184
185
185
def update_timestamp (self , data : bytes ) -> None :
@@ -214,20 +214,20 @@ def update_timestamp(self, data: bytes) -> None:
214
214
except DeserializationError as e :
215
215
raise exceptions .RepositoryError ("Failed to load timestamp" ) from e
216
216
217
- if new_timestamp .signed .type != " timestamp" :
217
+ if new_timestamp .signed .type != Rolename . timestamp :
218
218
raise exceptions .RepositoryError (
219
219
f"Expected 'timestamp', got '{ new_timestamp .signed .type } '"
220
220
)
221
221
222
- self .root .verify_delegate (" timestamp" , new_timestamp )
222
+ self .root .verify_delegate (Rolename . timestamp , new_timestamp )
223
223
224
224
# If an existing trusted timestamp is updated,
225
225
# check for a rollback attack
226
226
if self .timestamp is not None :
227
227
# Prevent rolling back timestamp version
228
228
if new_timestamp .signed .version < self .timestamp .signed .version :
229
229
raise exceptions .ReplayedMetadataError (
230
- " timestamp" ,
230
+ Rolename . timestamp ,
231
231
new_timestamp .signed .version ,
232
232
self .timestamp .signed .version ,
233
233
)
@@ -237,15 +237,15 @@ def update_timestamp(self, data: bytes) -> None:
237
237
< self .timestamp .signed .snapshot_meta .version
238
238
):
239
239
raise exceptions .ReplayedMetadataError (
240
- " snapshot" ,
240
+ Rolename . snapshot ,
241
241
new_timestamp .signed .snapshot_meta .version ,
242
242
self .timestamp .signed .snapshot_meta .version ,
243
243
)
244
244
245
245
# expiry not checked to allow old timestamp to be used for rollback
246
246
# protection of new timestamp: expiry is checked in update_snapshot()
247
247
248
- self ._trusted_set [" timestamp" ] = new_timestamp
248
+ self ._trusted_set [Rolename . timestamp ] = new_timestamp
249
249
logger .info ("Updated timestamp v%d" , new_timestamp .signed .version )
250
250
251
251
# timestamp is loaded: raise if it is not valid _final_ timestamp
@@ -310,12 +310,12 @@ def update_snapshot(
310
310
except DeserializationError as e :
311
311
raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
312
312
313
- if new_snapshot .signed .type != " snapshot" :
313
+ if new_snapshot .signed .type != Rolename . snapshot :
314
314
raise exceptions .RepositoryError (
315
315
f"Expected 'snapshot', got '{ new_snapshot .signed .type } '"
316
316
)
317
317
318
- self .root .verify_delegate (" snapshot" , new_snapshot )
318
+ self .root .verify_delegate (Rolename . snapshot , new_snapshot )
319
319
320
320
# version not checked against meta version to allow old snapshot to be
321
321
# used in rollback protection: it is checked when targets is updated
@@ -341,7 +341,7 @@ def update_snapshot(
341
341
# expiry not checked to allow old snapshot to be used for rollback
342
342
# protection of new snapshot: it is checked when targets is updated
343
343
344
- self ._trusted_set [" snapshot" ] = new_snapshot
344
+ self ._trusted_set [Rolename . snapshot ] = new_snapshot
345
345
logger .info ("Updated snapshot v%d" , new_snapshot .signed .version )
346
346
347
347
# snapshot is loaded, but we raise if it's not valid _final_ snapshot
@@ -371,7 +371,7 @@ def update_targets(self, data: bytes) -> None:
371
371
RepositoryError: Metadata failed to load or verify. The actual
372
372
error type and content will contain more details.
373
373
"""
374
- self .update_delegated_targets (data , " targets" , " root" )
374
+ self .update_delegated_targets (data , Rolename . targets , Rolename . root )
375
375
376
376
def update_delegated_targets (
377
377
self , data : bytes , role_name : str , delegator_name : str
@@ -419,7 +419,7 @@ def update_delegated_targets(
419
419
except DeserializationError as e :
420
420
raise exceptions .RepositoryError ("Failed to load snapshot" ) from e
421
421
422
- if new_delegate .signed .type != " targets" :
422
+ if new_delegate .signed .type != Rolename . targets :
423
423
raise exceptions .RepositoryError (
424
424
f"Expected 'targets', got '{ new_delegate .signed .type } '"
425
425
)
@@ -449,12 +449,12 @@ def _load_trusted_root(self, data: bytes) -> None:
449
449
except DeserializationError as e :
450
450
raise exceptions .RepositoryError ("Failed to load root" ) from e
451
451
452
- if new_root .signed .type != " root" :
452
+ if new_root .signed .type != Rolename . root :
453
453
raise exceptions .RepositoryError (
454
454
f"Expected 'root', got '{ new_root .signed .type } '"
455
455
)
456
456
457
- new_root .verify_delegate (" root" , new_root )
457
+ new_root .verify_delegate (Rolename . root , new_root )
458
458
459
- self ._trusted_set [" root" ] = new_root
459
+ self ._trusted_set [Rolename . root ] = new_root
460
460
logger .info ("Loaded trusted root v%d" , new_root .signed .version )
0 commit comments