44
44
updater.refresh()
45
45
"""
46
46
47
- from collections import OrderedDict
48
- from dataclasses import dataclass
49
- from datetime import datetime , timedelta
50
47
import logging
51
48
import os
52
49
import tempfile
50
+ from collections import OrderedDict
51
+ from dataclasses import dataclass
52
+ from datetime import datetime , timedelta
53
+ from typing import Dict , Iterator , List , Optional , Tuple
54
+ from urllib import parse
55
+
53
56
import securesystemslib .hash as sslib_hash
54
57
from securesystemslib .keys import generate_ed25519_key
55
58
from securesystemslib .signer import SSlibSigner
56
- from typing import Dict , Iterator , List , Optional , Tuple
57
- from urllib import parse
58
59
59
- from tuf .api .metadata import TOP_LEVEL_ROLE_NAMES
60
- from tuf .api .serialization .json import JSONSerializer
61
- from tuf .exceptions import FetcherHTTPError
62
60
from tuf .api .metadata import (
61
+ SPECIFICATION_VERSION ,
62
+ TOP_LEVEL_ROLE_NAMES ,
63
63
DelegatedRole ,
64
64
Delegations ,
65
65
Key ,
66
66
Metadata ,
67
67
MetaFile ,
68
68
Role ,
69
69
Root ,
70
- SPECIFICATION_VERSION ,
71
70
Snapshot ,
72
71
TargetFile ,
73
72
Targets ,
74
73
Timestamp ,
75
74
)
75
+ from tuf .api .serialization .json import JSONSerializer
76
+ from tuf .exceptions import FetcherHTTPError , RepositoryError
76
77
from tuf .ngclient .fetcher import FetcherInterface
77
78
78
79
logger = logging .getLogger (__name__ )
79
80
80
81
SPEC_VER = "." .join (SPECIFICATION_VERSION )
81
82
83
+
82
84
@dataclass
83
85
class RepositoryTarget :
84
86
"""Contains actual target data and the related target metadata"""
87
+
85
88
data : bytes
86
89
target_file : TargetFile
87
90
91
+
88
92
class RepositorySimulator (FetcherInterface ):
89
93
def __init__ (self ):
90
94
self .md_root : Metadata [Root ] = None
@@ -141,7 +145,7 @@ def create_key() -> Tuple[Key, SSlibSigner]:
141
145
sslib_key = generate_ed25519_key ()
142
146
return Key .from_securesystemslib_key (sslib_key ), SSlibSigner (sslib_key )
143
147
144
- def add_signer (self , role :str , signer : SSlibSigner ):
148
+ def add_signer (self , role : str , signer : SSlibSigner ):
145
149
if role not in self .signers :
146
150
self .signers [role ] = {}
147
151
self .signers [role ][signer .key_dict ["keyid" ]] = signer
@@ -197,7 +201,7 @@ def fetch(self, url: str) -> Iterator[bytes]:
197
201
elif path .startswith ("/targets/" ):
198
202
# figure out target path and hash prefix
199
203
target_path = path [len ("/targets/" ) :]
200
- dir_parts , sep , prefixed_filename = target_path .rpartition ("/" )
204
+ dir_parts , sep , prefixed_filename = target_path .rpartition ("/" )
201
205
prefix , _ , filename = prefixed_filename .partition ("." )
202
206
target_path = f"{ dir_parts } { sep } { filename } "
203
207
@@ -219,7 +223,9 @@ def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes:
219
223
logger .debug ("fetched target %s" , target_path )
220
224
return repo_target .data
221
225
222
- def _fetch_metadata (self , role : str , version : Optional [int ] = None ) -> bytes :
226
+ def _fetch_metadata (
227
+ self , role : str , version : Optional [int ] = None
228
+ ) -> bytes :
223
229
"""Return signed metadata for 'role', using 'version' if it is given.
224
230
225
231
If version is None, non-versioned metadata is being requested
@@ -264,7 +270,7 @@ def _compute_hashes_and_length(
264
270
data = self ._fetch_metadata (role )
265
271
digest_object = sslib_hash .digest (sslib_hash .DEFAULT_HASH_ALGORITHM )
266
272
digest_object .update (data )
267
- hashes = {sslib_hash .DEFAULT_HASH_ALGORITHM : digest_object .hexdigest ()}
273
+ hashes = {sslib_hash .DEFAULT_HASH_ALGORITHM : digest_object .hexdigest ()}
268
274
return hashes , len (data )
269
275
270
276
def update_timestamp (self ):
0 commit comments