diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 425fff1d2d..9e07b0be25 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -512,6 +512,7 @@ def test_generate_timestamp_metadata(self): version = 1 expiration_date = '1985-10-21T13:20:00Z' + storage_backend = securesystemslib.storage.FilesystemBackend() # Load a valid repository so that top-level roles exist in roledb and # generate_snapshot_metadata() has roles to specify in snapshot metadata. repository = repo_tool.Repository(repository_directory, metadata_directory, @@ -521,20 +522,20 @@ def test_generate_timestamp_metadata(self): repository_name) timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_filename, - version, expiration_date, repository_name) + version, expiration_date, storage_backend, repository_name) self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata)) # Test improperly formatted arguments. self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_timestamp_metadata, 3, version, expiration_date, - repository_name) + storage_backend, repository_name) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_timestamp_metadata, snapshot_filename, '3', - expiration_date, repository_name) + expiration_date, storage_backend, repository_name) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_timestamp_metadata, snapshot_filename, version, '3', - repository_name) + storage_backend, repository_name) diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 9ca2991f3e..8116c245fd 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -116,6 +116,22 @@ def test_init(self): + def create_repository_directory(self): + # Create a repository directory and copy in test targets data + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + targets_directory = os.path.join(temporary_directory, 'repository', + repo_tool.TARGETS_DIRECTORY_NAME) + original_targets_directory = os.path.join('repository_data', + 'repository', 'targets') + shutil.copytree(original_targets_directory, targets_directory) + + # In this case, create_new_repository() creates the 'repository/' + # sub-directory in 'temporary_directory' if it does not exist. + return os.path.join(temporary_directory, 'repository') + + + + def test_writeall(self): # Test creation of a TUF repository. # @@ -129,16 +145,7 @@ def test_writeall(self): # Copy the target files from 'tuf/tests/repository_data' so that writeall() # has target fileinfo to include in metadata. repository_name = 'test_repository' - temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) - targets_directory = os.path.join(temporary_directory, 'repository', - repo_tool.TARGETS_DIRECTORY_NAME) - original_targets_directory = os.path.join('repository_data', - 'repository', 'targets') - shutil.copytree(original_targets_directory, targets_directory) - - # In this case, create_new_repository() creates the 'repository/' - # sub-directory in 'temporary_directory' if it does not exist. - repository_directory = os.path.join(temporary_directory, 'repository') + repository_directory = self.create_repository_directory() metadata_directory = os.path.join(repository_directory, repo_tool.METADATA_STAGED_DIRECTORY_NAME) @@ -550,6 +557,170 @@ def test_get_filepaths_in_directory(self): + def test_writeall_abstract_storage(self): + # Test creation of a TUF repository with a custom storage backend to ensure + # that functions relying on a storage backend being supplied operate + # correctly + + + class TestStorageBackend(securesystemslib.storage.StorageBackendInterface): + """ + An implementation of securesystemslib.storage.StorageBackendInterface + which mutates filenames on put()/get(), translating filename in memory + to filename + '.tst' on-disk, such that trying to read the + expected/canonical file paths from local storage doesn't find the TUF + metadata files. + """ + + from contextlib import contextmanager + + + @contextmanager + def get(self, filepath): + file_object = open(filepath + '.tst', 'rb') + yield file_object + file_object.close() + + + def put(self, fileobj, filepath): + if not fileobj.closed: + fileobj.seek(0) + + with open(filepath + '.tst', 'wb') as destination_file: + shutil.copyfileobj(fileobj, destination_file) + destination_file.flush() + os.fsync(destination_file.fileno()) + + + def remove(self, filepath): + os.remove(filepath + '.tst') + + + def getsize(self, filepath): + return os.path.getsize(filepath + '.tst') + + + def create_folder(self, filepath): + if not filepath: + return + try: + os.makedirs(filepath) + except OSError as err: + pass + + + def list_folder(self, filepath): + contents = [] + files = os.listdir(filepath) + + for fi in files: + if fi.endswith('.tst'): + contents.append(fi.split('.tst')[0]) + else: + contents.append(fi) + + return contents + + + + # Set up the repository directory + repository_name = 'test_repository' + repository_directory = self.create_repository_directory() + metadata_directory = os.path.join(repository_directory, + repo_tool.METADATA_STAGED_DIRECTORY_NAME) + targets_directory = os.path.join(repository_directory, + repo_tool.TARGETS_DIRECTORY_NAME) + + # TestStorageBackend expects all files on disk to have an additional '.tst' + # file extension + for target in os.listdir(targets_directory): + src = os.path.join(targets_directory, target) + dst = os.path.join(targets_directory, target + '.tst') + os.rename(src, dst) + + # (0) Create a repository with TestStorageBackend() + storage_backend = TestStorageBackend() + repository = repo_tool.create_new_repository(repository_directory, + repository_name, + storage_backend) + + # (1) Load the public and private keys of the top-level roles, and one + # delegated role. + keystore_directory = os.path.join('repository_data', 'keystore') + + # Load the public keys. + root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub') + targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub') + snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub') + timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub') + + root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path) + targets_pubkey = \ + repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path) + snapshot_pubkey = \ + repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path) + timestamp_pubkey = \ + repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path) + + # Load the private keys. + root_privkey_path = os.path.join(keystore_directory, 'root_key') + targets_privkey_path = os.path.join(keystore_directory, 'targets_key') + snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key') + timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key') + + root_privkey = \ + repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password') + targets_privkey = \ + repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path, + 'password') + snapshot_privkey = \ + repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path, + 'password') + timestamp_privkey = \ + repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path, + 'password') + + + # (2) Add top-level verification keys. + repository.root.add_verification_key(root_pubkey) + repository.targets.add_verification_key(targets_pubkey) + repository.snapshot.add_verification_key(snapshot_pubkey) + repository.timestamp.add_verification_key(timestamp_pubkey) + + + # (3) Load top-level signing keys. + repository.root.load_signing_key(root_privkey) + repository.targets.load_signing_key(targets_privkey) + repository.snapshot.load_signing_key(snapshot_privkey) + repository.timestamp.load_signing_key(timestamp_privkey) + + + # (4) Add target files. + target1 = 'file1.txt' + target2 = 'file2.txt' + target3 = 'file3.txt' + repository.targets.add_target(target1) + repository.targets.add_target(target2) + repository.targets.add_target(target3) + + # (6) Write repository. + repository.writeall() + + + # Ensure all of the metadata files exist at the mutated file location and + # that those files are valid metadata + for role in ['root.json.tst', 'targets.json.tst', 'snapshot.json.tst', + 'timestamp.json.tst']: + role_filepath = os.path.join(metadata_directory, role) + self.assertTrue(os.path.exists(role_filepath)) + + role_signable = securesystemslib.util.load_json_file(role_filepath) + # Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is + # an invalid signable. + tuf.formats.check_signable_object_format(role_signable) + + + class TestMetadata(unittest.TestCase): diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index c6736947c3..a888ca2a26 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -135,7 +135,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, elif rolename == 'timestamp': snapshot_filename = filenames['snapshot'] metadata = generate_timestamp_metadata(snapshot_filename, roleinfo['version'], - roleinfo['expires'], repository_name) + roleinfo['expires'], storage_backend, repository_name) _log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'], TIMESTAMP_EXPIRES_WARN_SECONDS) @@ -1173,8 +1173,14 @@ def generate_targets_metadata(targets_directory, target_files, version, target_files: The target files tracked by 'targets.json'. 'target_files' is a - dictionary of target paths that are relative to the targets directory and - a fileinfo dict matching tuf.formats.LOOSE_FILEINFO_SCHEMA + dictionary mapping target paths (relative to the targets directory) to + a dict matching tuf.formats.LOOSE_FILEINFO_SCHEMA. LOOSE_FILEINFO_SCHEMA + can support multiple different value patterns: + 1) an empty dictionary - for when fileinfo should be generated + 2) a dictionary matching tuf.formats.CUSTOM_SCHEMA - for when fileinfo + should be generated, with the supplied custom metadata attached + 3) a dictionary matching tuf.formats.FILEINFO_SCHEMA - for when full + fileinfo is provided in conjunction with use_existing_fileinfo version: The metadata version number. Clients use the version number to @@ -1192,6 +1198,9 @@ def generate_targets_metadata(targets_directory, target_files, version, write_consistent_targets: Boolean that indicates whether file digests should be prepended to the target files. + NOTE: it is an error for write_consistent_targets to be True when + use_existing_fileinfo is also True. We can not create consistent targets + for a target file where the fileinfo isn't generated by tuf. use_existing_fileinfo: Boolean that indicates whether to use the complete fileinfo, including @@ -1253,6 +1262,8 @@ def generate_targets_metadata(targets_directory, target_files, version, filedict = {} if use_existing_fileinfo: + # Use the provided fileinfo dicts, conforming to FILEINFO_SCHEMA, rather than + # generating fileinfo for target, fileinfo in six.iteritems(target_files): # Ensure all fileinfo entries in target_files have a non-empty hashes dict @@ -1260,6 +1271,7 @@ def generate_targets_metadata(targets_directory, target_files, version, raise securesystemslib.exceptions.Error('use_existing_hashes option set' ' but no hashes exist in roledb for ' + repr(target)) + # and a non-empty length if fileinfo.get('length', -1) < 0: raise securesystemslib.exceptions.Error('use_existing_hashes option set' ' but fileinfo\'s length is not set') @@ -1267,6 +1279,8 @@ def generate_targets_metadata(targets_directory, target_files, version, filedict[target] = fileinfo else: + # Generate the fileinfo dicts by accessing the target files on storage. + # Default to accessing files on local storage. if storage_backend is None: storage_backend = securesystemslib.storage.FilesystemBackend() @@ -1472,7 +1486,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, def generate_timestamp_metadata(snapshot_filename, version, expiration_date, - repository_name): + storage_backend, repository_name): """ Generate the timestamp metadata object. The 'snapshot.json' file must @@ -1492,14 +1506,14 @@ def generate_timestamp_metadata(snapshot_filename, version, expiration_date, The expiration date of the metadata file, conformant to 'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'. - repository_name: - The name of the repository. If not supplied, 'rolename' is added to the - 'default' repository. - storage_backend: An object which implements securesystemslib.storage.StorageBackendInterface. + repository_name: + The name of the repository. If not supplied, 'rolename' is added to the + 'default' repository. + securesystemslib.exceptions.FormatError, if the generated timestamp metadata object cannot be formatted correctly, or one of the arguments is improperly @@ -1524,7 +1538,7 @@ def generate_timestamp_metadata(snapshot_filename, version, expiration_date, # Retrieve the versioninfo of the Snapshot metadata file. snapshot_fileinfo = {} length, hashes = securesystemslib.util.get_file_details(snapshot_filename, - tuf.settings.FILE_HASH_ALGORITHMS) + tuf.settings.FILE_HASH_ALGORITHMS, storage_backend) snapshot_version = get_metadata_versioninfo('snapshot', repository_name) snapshot_fileinfo[SNAPSHOT_FILENAME] = \ tuf.formats.make_fileinfo(length, hashes, version=snapshot_version['version'])