-
Notifications
You must be signed in to change notification settings - Fork 278
Fix and better test abstract files and directories support #1034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -116,6 +116,22 @@ def test_init(self): | |
|
||
|
||
|
||
def create_repository_directory(self): | ||
# Create a repository directory and copy in test targets data | ||
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) | ||
targets_directory = os.path.join(temporary_directory, 'repository', | ||
repo_tool.TARGETS_DIRECTORY_NAME) | ||
original_targets_directory = os.path.join('repository_data', | ||
'repository', 'targets') | ||
shutil.copytree(original_targets_directory, targets_directory) | ||
|
||
# In this case, create_new_repository() creates the 'repository/' | ||
# sub-directory in 'temporary_directory' if it does not exist. | ||
return os.path.join(temporary_directory, 'repository') | ||
|
||
|
||
|
||
|
||
def test_writeall(self): | ||
# Test creation of a TUF repository. | ||
# | ||
|
@@ -129,16 +145,7 @@ def test_writeall(self): | |
# Copy the target files from 'tuf/tests/repository_data' so that writeall() | ||
# has target fileinfo to include in metadata. | ||
repository_name = 'test_repository' | ||
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) | ||
targets_directory = os.path.join(temporary_directory, 'repository', | ||
repo_tool.TARGETS_DIRECTORY_NAME) | ||
original_targets_directory = os.path.join('repository_data', | ||
'repository', 'targets') | ||
shutil.copytree(original_targets_directory, targets_directory) | ||
|
||
# In this case, create_new_repository() creates the 'repository/' | ||
# sub-directory in 'temporary_directory' if it does not exist. | ||
repository_directory = os.path.join(temporary_directory, 'repository') | ||
repository_directory = self.create_repository_directory() | ||
metadata_directory = os.path.join(repository_directory, | ||
repo_tool.METADATA_STAGED_DIRECTORY_NAME) | ||
|
||
|
@@ -550,6 +557,170 @@ def test_get_filepaths_in_directory(self): | |
|
||
|
||
|
||
def test_writeall_abstract_storage(self): | ||
# Test creation of a TUF repository with a custom storage backend to ensure | ||
# that functions relying on a storage backend being supplied operate | ||
# correctly | ||
|
||
|
||
class TestStorageBackend(securesystemslib.storage.StorageBackendInterface): | ||
""" | ||
An implementation of securesystemslib.storage.StorageBackendInterface | ||
which mutates filenames on put()/get(), translating filename in memory | ||
to filename + '.tst' on-disk, such that trying to read the | ||
expected/canonical file paths from local storage doesn't find the TUF | ||
metadata files. | ||
""" | ||
|
||
from contextlib import contextmanager | ||
|
||
|
||
@contextmanager | ||
def get(self, filepath): | ||
file_object = open(filepath + '.tst', 'rb') | ||
yield file_object | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🥇 Finally we have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙇 |
||
file_object.close() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mind adding a blank line for consistency with the rest of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eagle eyed review. Fixed in a force-pushed update to the patch, thank you. |
||
|
||
def put(self, fileobj, filepath): | ||
if not fileobj.closed: | ||
fileobj.seek(0) | ||
|
||
with open(filepath + '.tst', 'wb') as destination_file: | ||
shutil.copyfileobj(fileobj, destination_file) | ||
destination_file.flush() | ||
os.fsync(destination_file.fileno()) | ||
|
||
|
||
def remove(self, filepath): | ||
os.remove(filepath + '.tst') | ||
|
||
|
||
def getsize(self, filepath): | ||
return os.path.getsize(filepath + '.tst') | ||
|
||
|
||
def create_folder(self, filepath): | ||
if not filepath: | ||
return | ||
try: | ||
os.makedirs(filepath) | ||
except OSError as err: | ||
pass | ||
|
||
|
||
def list_folder(self, filepath): | ||
contents = [] | ||
files = os.listdir(filepath) | ||
|
||
for fi in files: | ||
if fi.endswith('.tst'): | ||
contents.append(fi.split('.tst')[0]) | ||
else: | ||
contents.append(fi) | ||
|
||
return contents | ||
|
||
|
||
|
||
# Set up the repository directory | ||
repository_name = 'test_repository' | ||
repository_directory = self.create_repository_directory() | ||
metadata_directory = os.path.join(repository_directory, | ||
repo_tool.METADATA_STAGED_DIRECTORY_NAME) | ||
targets_directory = os.path.join(repository_directory, | ||
repo_tool.TARGETS_DIRECTORY_NAME) | ||
|
||
# TestStorageBackend expects all files on disk to have an additional '.tst' | ||
# file extension | ||
for target in os.listdir(targets_directory): | ||
src = os.path.join(targets_directory, target) | ||
dst = os.path.join(targets_directory, target + '.tst') | ||
os.rename(src, dst) | ||
|
||
# (0) Create a repository with TestStorageBackend() | ||
storage_backend = TestStorageBackend() | ||
repository = repo_tool.create_new_repository(repository_directory, | ||
repository_name, | ||
storage_backend) | ||
|
||
# (1) Load the public and private keys of the top-level roles, and one | ||
# delegated role. | ||
keystore_directory = os.path.join('repository_data', 'keystore') | ||
|
||
# Load the public keys. | ||
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub') | ||
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub') | ||
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub') | ||
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub') | ||
|
||
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path) | ||
targets_pubkey = \ | ||
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path) | ||
snapshot_pubkey = \ | ||
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path) | ||
timestamp_pubkey = \ | ||
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path) | ||
|
||
# Load the private keys. | ||
root_privkey_path = os.path.join(keystore_directory, 'root_key') | ||
targets_privkey_path = os.path.join(keystore_directory, 'targets_key') | ||
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key') | ||
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key') | ||
|
||
root_privkey = \ | ||
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password') | ||
targets_privkey = \ | ||
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path, | ||
'password') | ||
snapshot_privkey = \ | ||
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path, | ||
'password') | ||
timestamp_privkey = \ | ||
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path, | ||
'password') | ||
|
||
|
||
# (2) Add top-level verification keys. | ||
repository.root.add_verification_key(root_pubkey) | ||
repository.targets.add_verification_key(targets_pubkey) | ||
repository.snapshot.add_verification_key(snapshot_pubkey) | ||
repository.timestamp.add_verification_key(timestamp_pubkey) | ||
|
||
|
||
# (3) Load top-level signing keys. | ||
repository.root.load_signing_key(root_privkey) | ||
repository.targets.load_signing_key(targets_privkey) | ||
repository.snapshot.load_signing_key(snapshot_privkey) | ||
repository.timestamp.load_signing_key(timestamp_privkey) | ||
|
||
|
||
# (4) Add target files. | ||
target1 = 'file1.txt' | ||
target2 = 'file2.txt' | ||
target3 = 'file3.txt' | ||
repository.targets.add_target(target1) | ||
repository.targets.add_target(target2) | ||
repository.targets.add_target(target3) | ||
|
||
# (6) Write repository. | ||
repository.writeall() | ||
|
||
|
||
# Ensure all of the metadata files exist at the mutated file location and | ||
# that those files are valid metadata | ||
for role in ['root.json.tst', 'targets.json.tst', 'snapshot.json.tst', | ||
'timestamp.json.tst']: | ||
role_filepath = os.path.join(metadata_directory, role) | ||
self.assertTrue(os.path.exists(role_filepath)) | ||
|
||
role_signable = securesystemslib.util.load_json_file(role_filepath) | ||
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is | ||
# an invalid signable. | ||
tuf.formats.check_signable_object_format(role_signable) | ||
|
||
|
||
|
||
|
||
|
||
class TestMetadata(unittest.TestCase): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this!