diff --git a/tests/test_arbitrary_package_attack.py b/tests/test_arbitrary_package_attack.py index dfd66cafaf..97d9e5dcaf 100755 --- a/tests/test_arbitrary_package_attack.py +++ b/tests/test_arbitrary_package_attack.py @@ -180,7 +180,7 @@ def test_without_tuf(self): client_target_path = os.path.join(self.client_directory, 'file1.txt') self.assertFalse(os.path.exists(client_target_path)) length, hashes = securesystemslib.util.get_file_details(target_path) - fileinfo = tuf.formats.make_fileinfo(length, hashes) + fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'targets', 'file1.txt') @@ -190,20 +190,20 @@ def test_without_tuf(self): self.assertTrue(os.path.exists(client_target_path)) length, hashes = securesystemslib.util.get_file_details(client_target_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) # Test: Download a target file that has been modified by an attacker. with open(target_path, 'wt') as file_object: file_object.write('add malicious content.') length, hashes = securesystemslib.util.get_file_details(target_path) - malicious_fileinfo = tuf.formats.make_fileinfo(length, hashes) + malicious_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # On Windows, the URL portion should not contain back slashes. six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path) length, hashes = securesystemslib.util.get_file_details(client_target_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Verify 'download_fileinfo' is unequal to the original trusted version. self.assertNotEqual(download_fileinfo, fileinfo) diff --git a/tests/test_endless_data_attack.py b/tests/test_endless_data_attack.py index 62fdd90453..cf06b73c24 100755 --- a/tests/test_endless_data_attack.py +++ b/tests/test_endless_data_attack.py @@ -184,7 +184,7 @@ def test_without_tuf(self): client_target_path = os.path.join(self.client_directory, 'file1.txt') self.assertFalse(os.path.exists(client_target_path)) length, hashes = securesystemslib.util.get_file_details(target_path) - fileinfo = tuf.formats.make_fileinfo(length, hashes) + fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'targets', 'file1.txt') @@ -194,7 +194,7 @@ def test_without_tuf(self): self.assertTrue(os.path.exists(client_target_path)) length, hashes = securesystemslib.util.get_file_details(client_target_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) # Test: Download a target file that has been modified by an attacker with @@ -202,7 +202,7 @@ def test_without_tuf(self): with open(target_path, 'a') as file_object: file_object.write('append large amount of data' * 100000) large_length, hashes = securesystemslib.util.get_file_details(target_path) - malicious_fileinfo = tuf.formats.make_fileinfo(large_length, hashes) + malicious_fileinfo = tuf.formats.make_targets_fileinfo(large_length, hashes) # Is the modified file actually larger? self.assertTrue(large_length > length) @@ -211,7 +211,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_target_path) length, hashes = securesystemslib.util.get_file_details(client_target_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Verify 'download_fileinfo' is unequal to the original trusted version. self.assertNotEqual(download_fileinfo, fileinfo) @@ -235,10 +235,10 @@ def test_with_tuf(self): # Verify the client's downloaded file matches the repository's. target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') length, hashes = securesystemslib.util.get_file_details(client_target_path) - fileinfo = tuf.formats.make_fileinfo(length, hashes) + fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) length, hashes = securesystemslib.util.get_file_details(client_target_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) # Modify 'file1.txt' and confirm that the TUF client only downloads up to @@ -257,7 +257,7 @@ def test_with_tuf(self): # extra data appended should be discarded by the client, so the downloaded # file size and hash should not have changed. length, hashes = securesystemslib.util.get_file_details(client_target_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) # Test that the TUF client does not download large metadata files, as well. diff --git a/tests/test_formats.py b/tests/test_formats.py index a439030358..7b3116e7f4 100755 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -119,10 +119,15 @@ def test_schemas(self): 'keyval': {'public': 'pubkey', 'private': 'privkey'}}), - 'FILEINFO_SCHEMA': (tuf.formats.FILEINFO_SCHEMA, - {'length': 1024, - 'hashes': {'sha256': 'A4582BCF323BCEF'}, - 'custom': {'type': 'paintjob'}}), + 'TARGETS_FILEINFO_SCHEMA': (tuf.formats.TARGETS_FILEINFO_SCHEMA, + {'length': 1024, + 'hashes': {'sha256': 'A4582BCF323BCEF'}, + 'custom': {'type': 'paintjob'}}), + + 'METADATA_FILEINFO_SCHEMA': (tuf.formats.METADATA_FILEINFO_SCHEMA, + {'length': 1024, + 'hashes': {'sha256': 'A4582BCF323BCEF'}, + 'version': 1}), 'FILEDICT_SCHEMA': (tuf.formats.FILEDICT_SCHEMA, {'metadata/root.json': {'length': 1024, @@ -241,7 +246,8 @@ def test_schemas(self): 'version': 8, 'expires': '1985-10-21T13:20:00Z', 'meta': {'metadattimestamp.json': {'length': 1024, - 'hashes': {'sha256': 'AB1245'}}}}), + 'hashes': {'sha256': 'AB1245'}, + 'version': 1}}}), 'MIRROR_SCHEMA': (tuf.formats.MIRROR_SCHEMA, {'url_prefix': 'http://localhost:8001', @@ -394,7 +400,7 @@ def test_build_dict_conforming_to_schema(self): length = 88 hashes = {'sha256': '3c7fe3eeded4a34'} expires = '1985-10-21T13:20:00Z' - filedict = {'snapshot.json': {'length': length, 'hashes': hashes}} + filedict = {'snapshot.json': {'length': length, 'hashes': hashes, 'version': 1}} # Try with and without _type and spec_version, both of which are @@ -797,28 +803,60 @@ def test_make_signable(self): - def test_make_fileinfo(self): + def test_make_targets_fileinfo(self): # Test conditions for valid arguments. length = 1024 hashes = {'sha256': 'A4582BCF323BCEF', 'sha512': 'A4582BCF323BFEF'} - version = 8 custom = {'type': 'paintjob'} - FILEINFO_SCHEMA = tuf.formats.FILEINFO_SCHEMA - make_fileinfo = tuf.formats.make_fileinfo - self.assertTrue(FILEINFO_SCHEMA.matches(make_fileinfo(length, hashes, version, custom))) - self.assertTrue(FILEINFO_SCHEMA.matches(make_fileinfo(length, hashes))) + TARGETS_FILEINFO_SCHEMA = tuf.formats.TARGETS_FILEINFO_SCHEMA + make_targets_fileinfo = tuf.formats.make_targets_fileinfo + self.assertTrue(TARGETS_FILEINFO_SCHEMA.matches(make_targets_fileinfo(length, hashes, custom))) + self.assertTrue(TARGETS_FILEINFO_SCHEMA.matches(make_targets_fileinfo(length, hashes))) # Test conditions for invalid arguments. bad_length = 'bad' bad_hashes = 'bad' bad_custom = 'bad' - self.assertRaises(securesystemslib.exceptions.FormatError, make_fileinfo, bad_length, hashes, custom) - self.assertRaises(securesystemslib.exceptions.FormatError, make_fileinfo, length, bad_hashes, custom) - self.assertRaises(securesystemslib.exceptions.FormatError, make_fileinfo, length, hashes, bad_custom) - self.assertRaises(securesystemslib.exceptions.FormatError, make_fileinfo, bad_length, hashes) - self.assertRaises(securesystemslib.exceptions.FormatError, make_fileinfo, length, bad_hashes) + self.assertRaises(securesystemslib.exceptions.FormatError, make_targets_fileinfo, + bad_length, hashes, custom) + self.assertRaises(securesystemslib.exceptions.FormatError, make_targets_fileinfo, + length, bad_hashes, custom) + self.assertRaises(securesystemslib.exceptions.FormatError, make_targets_fileinfo, + length, hashes, bad_custom) + self.assertRaises(securesystemslib.exceptions.FormatError, make_targets_fileinfo, + bad_length, hashes) + self.assertRaises(securesystemslib.exceptions.FormatError, make_targets_fileinfo, + length, bad_hashes) + + + + def test_make_metadata_fileinfo(self): + # Test conditions for valid arguments. + length = 1024 + hashes = {'sha256': 'A4582BCF323BCEF', 'sha512': 'A4582BCF323BFEF'} + version = 8 + + METADATA_FILEINFO_SCHEMA = tuf.formats.METADATA_FILEINFO_SCHEMA + make_metadata_fileinfo = tuf.formats.make_metadata_fileinfo + self.assertTrue(METADATA_FILEINFO_SCHEMA.matches(make_metadata_fileinfo( + version, length, hashes))) + self.assertTrue(METADATA_FILEINFO_SCHEMA.matches(make_metadata_fileinfo(version))) + + # Test conditions for invalid arguments. + bad_version = 'bad' + bad_length = 'bad' + bad_hashes = 'bad' + + self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata_fileinfo, + bad_version, length, hashes) + self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata_fileinfo, + version, bad_length, hashes) + self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata_fileinfo, + version, length, bad_hashes) + self.assertRaises(securesystemslib.exceptions.FormatError, make_metadata_fileinfo, + bad_version) diff --git a/tests/test_indefinite_freeze_attack.py b/tests/test_indefinite_freeze_attack.py index feb64c5332..6adb864638 100755 --- a/tests/test_indefinite_freeze_attack.py +++ b/tests/test_indefinite_freeze_attack.py @@ -230,7 +230,7 @@ def test_without_tuf(self): shutil.copy(timestamp_path, client_timestamp_path) length, hashes = securesystemslib.util.get_file_details(timestamp_path) - fileinfo = tuf.formats.make_fileinfo(length, hashes) + fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json') @@ -238,7 +238,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path) length, hashes = securesystemslib.util.get_file_details(client_timestamp_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the current local file. self.assertEqual(download_fileinfo, fileinfo) diff --git a/tests/test_replay_attack.py b/tests/test_replay_attack.py index 0c50f9e616..9c4620eaf2 100755 --- a/tests/test_replay_attack.py +++ b/tests/test_replay_attack.py @@ -205,7 +205,7 @@ def test_without_tuf(self): # The fileinfo of the previous version is saved to verify that it is indeed # accepted by the non-TUF client. length, hashes = securesystemslib.util.get_file_details(backup_timestamp) - previous_fileinfo = tuf.formats.make_fileinfo(length, hashes) + previous_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Modify the timestamp file on the remote repository. repository = repo_tool.load_repository(self.repository_directory) @@ -227,7 +227,7 @@ def test_without_tuf(self): # Save the fileinfo of the new version generated to verify that it is # saved by the client. length, hashes = securesystemslib.util.get_file_details(timestamp_path) - new_fileinfo = tuf.formats.make_fileinfo(length, hashes) + new_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json') @@ -238,7 +238,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path) length, hashes = securesystemslib.util.get_file_details(client_timestamp_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the new version. self.assertEqual(download_fileinfo, new_fileinfo) @@ -251,7 +251,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file.replace('\\', '/'), client_timestamp_path) length, hashes = securesystemslib.util.get_file_details(client_timestamp_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the previous version. self.assertEqual(download_fileinfo, previous_fileinfo) @@ -278,7 +278,7 @@ def test_with_tuf(self): # The fileinfo of the previous version is saved to verify that it is indeed # accepted by the non-TUF client. length, hashes = securesystemslib.util.get_file_details(backup_timestamp) - previous_fileinfo = tuf.formats.make_fileinfo(length, hashes) + previous_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Modify the timestamp file on the remote repository. repository = repo_tool.load_repository(self.repository_directory) @@ -300,7 +300,7 @@ def test_with_tuf(self): # Save the fileinfo of the new version generated to verify that it is # saved by the client. length, hashes = securesystemslib.util.get_file_details(timestamp_path) - new_fileinfo = tuf.formats.make_fileinfo(length, hashes) + new_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Refresh top-level metadata, including 'timestamp.json'. Installation of # new version of 'timestamp.json' is expected. @@ -309,7 +309,7 @@ def test_with_tuf(self): client_timestamp_path = os.path.join(self.client_directory, self.repository_name, 'metadata', 'current', 'timestamp.json') length, hashes = securesystemslib.util.get_file_details(client_timestamp_path) - download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + download_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the new version. self.assertEqual(download_fileinfo, new_fileinfo) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 53ca0000d9..11fbf4aa55 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -62,6 +62,8 @@ repo_lib.disable_console_log_messages() +TOP_LEVEL_METADATA_FILES = ['root.json', 'targets.json', 'timestamp.json', + 'snapshot.json'] class TestRepositoryToolFunctions(unittest.TestCase): @@ -230,7 +232,7 @@ def test_get_top_level_metadata_filenames(self): - def test_get_metadata_fileinfo(self): + def test_get_targets_metadata_fileinfo(self): # Test normal case. temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) test_filepath = os.path.join(temporary_directory, 'file.txt') @@ -239,31 +241,31 @@ def test_get_metadata_fileinfo(self): file_object.write('test file') # Generate test fileinfo object. It is assumed SHA256 and SHA512 hashes - # are computed by get_metadata_fileinfo(). + # are computed by get_targets_metadata_fileinfo(). file_length = os.path.getsize(test_filepath) sha256_digest_object = securesystemslib.hash.digest_filename(test_filepath) sha512_digest_object = securesystemslib.hash.digest_filename(test_filepath, algorithm='sha512') file_hashes = {'sha256': sha256_digest_object.hexdigest(), 'sha512': sha512_digest_object.hexdigest()} fileinfo = {'length': file_length, 'hashes': file_hashes} - self.assertTrue(tuf.formats.FILEINFO_SCHEMA.matches(fileinfo)) + self.assertTrue(tuf.formats.TARGETS_FILEINFO_SCHEMA.matches(fileinfo)) storage_backend = securesystemslib.storage.FilesystemBackend() - self.assertEqual(fileinfo, repo_lib.get_metadata_fileinfo(test_filepath, - storage_backend)) + self.assertEqual(fileinfo, repo_lib.get_targets_metadata_fileinfo(test_filepath, + storage_backend)) # Test improperly formatted argument. self.assertRaises(securesystemslib.exceptions.FormatError, - repo_lib.get_metadata_fileinfo, 3, + repo_lib.get_targets_metadata_fileinfo, 3, storage_backend) # Test non-existent file. nonexistent_filepath = os.path.join(temporary_directory, 'oops.txt') self.assertRaises(securesystemslib.exceptions.Error, - repo_lib.get_metadata_fileinfo, + repo_lib.get_targets_metadata_fileinfo, nonexistent_filepath, storage_backend) @@ -436,7 +438,7 @@ def test_generate_targets_metadata(self): - def test_generate_snapshot_metadata(self): + def _setup_generate_snapshot_metadata_test(self): # Test normal case. temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) original_repository_path = os.path.join('repository_data', @@ -445,9 +447,9 @@ def test_generate_snapshot_metadata(self): shutil.copytree(original_repository_path, repository_directory) metadata_directory = os.path.join(repository_directory, repo_lib.METADATA_STAGED_DIRECTORY_NAME) + targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME) - targets_filename = os.path.join(metadata_directory, - repo_lib.TARGETS_FILENAME) + version = 1 expiration_date = '1985-10-21T13:20:00Z' @@ -456,9 +458,12 @@ def test_generate_snapshot_metadata(self): storage_backend = securesystemslib.storage.FilesystemBackend() repository = repo_tool.Repository(repository_directory, metadata_directory, targets_directory, storage_backend) - repository_junk = repo_tool.load_repository(repository_directory) + # Load a valid repository so that top-level roles exist in roledb and + # generate_snapshot_metadata() has roles to specify in snapshot metadata. + storage_backend = securesystemslib.storage.FilesystemBackend() + # For testing purposes, store an invalid metadata file in the metadata directory # to verify that it isn't loaded by generate_snapshot_metadata(). Unknown # metadata file extensions should be ignored. @@ -466,12 +471,17 @@ def test_generate_snapshot_metadata(self): with open(invalid_metadata_file, 'w') as file_object: file_object.write('bad extension on metadata file') - targets_filename = 'targets' + return metadata_directory, version, expiration_date, \ + storage_backend + + + def test_generate_snapshot_metadata(self): + metadata_directory, version, expiration_date, storage_backend = \ + self._setup_generate_snapshot_metadata_test() snapshot_metadata = \ repo_lib.generate_snapshot_metadata(metadata_directory, version, expiration_date, - targets_filename, storage_backend, consistent_snapshot=False) self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata)) @@ -479,24 +489,115 @@ def test_generate_snapshot_metadata(self): # Test improperly formatted arguments. self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata, - 3, version, expiration_date, - targets_filename, consistent_snapshot=False, storage_backend=storage_backend) - self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata, - metadata_directory, '3', expiration_date, - targets_filename, storage_backend, consistent_snapshot=False) + 3, version, expiration_date, consistent_snapshot=False, + storage_backend=storage_backend) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata, - metadata_directory, version, '3', - targets_filename, storage_backend, consistent_snapshot=False) + metadata_directory, '3', expiration_date, storage_backend, + consistent_snapshot=False) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata, - metadata_directory, version, expiration_date, - 3, storage_backend, consistent_snapshot=False) + metadata_directory, version, '3', storage_backend, + consistent_snapshot=False) self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata, - metadata_directory, version, expiration_date, - targets_filename, 3, storage_backend) + metadata_directory, version, expiration_date, 3, + storage_backend) - def test_generate_timestamp_metadata(self): + def test_generate_snapshot_metadata_with_length(self): + metadata_directory, version, expiration_date, storage_backend = \ + self._setup_generate_snapshot_metadata_test() + + snapshot_metadata = \ + repo_lib.generate_snapshot_metadata(metadata_directory, version, + expiration_date, + storage_backend, + consistent_snapshot=False, + use_length=True) + self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata)) + + metadata_files_info_dict = snapshot_metadata['meta'] + for metadata_filename in sorted(os.listdir(metadata_directory), reverse=True): + + # In the metadata_directory, there are files with format: + # 1.root.json. The prefix number should be removed. + stripped_filename, version = \ + repo_lib._strip_version_number(metadata_filename, + consistent_snapshot=True) + + # In the repository, the file "role_file.xml" have been added to make + # sure that non-json files aren't loaded. This file should be filtered. + if stripped_filename.endswith('.json'): + if stripped_filename not in TOP_LEVEL_METADATA_FILES: + # Check that length is not calculated but hashes is + self.assertIn('length', metadata_files_info_dict[stripped_filename]) + self.assertNotIn('hashes', metadata_files_info_dict[stripped_filename]) + + + + def test_generate_snapshot_metadata_with_hashes(self): + metadata_directory, version, expiration_date, storage_backend = \ + self._setup_generate_snapshot_metadata_test() + + snapshot_metadata = \ + repo_lib.generate_snapshot_metadata(metadata_directory, version, + expiration_date, + storage_backend, + consistent_snapshot=False, + use_hashes=True) + self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata)) + + metadata_files_info_dict = snapshot_metadata['meta'] + for metadata_filename in sorted(os.listdir(metadata_directory), reverse=True): + + # In the metadata_directory, there are files with format: + # 1.root.json. The prefix number should be removed. + stripped_filename, version = \ + repo_lib._strip_version_number(metadata_filename, + consistent_snapshot=True) + + # In the repository, the file "role_file.xml" have been added to make + # sure that non-json files aren't loaded. This file should be filtered. + if stripped_filename.endswith('.json'): + if stripped_filename not in TOP_LEVEL_METADATA_FILES: + # Check that hashes is not calculated but length is + self.assertNotIn('length', metadata_files_info_dict[stripped_filename]) + self.assertIn('hashes', metadata_files_info_dict[stripped_filename]) + + + + def test_generate_snapshot_metadata_with_hashes_and_length(self): + metadata_directory, version, expiration_date, storage_backend = \ + self._setup_generate_snapshot_metadata_test() + + snapshot_metadata = \ + repo_lib.generate_snapshot_metadata(metadata_directory, version, + expiration_date, + storage_backend, + consistent_snapshot=False, + use_length=True, + use_hashes=True) + self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata)) + + metadata_files_info_dict = snapshot_metadata['meta'] + for metadata_filename in sorted(os.listdir(metadata_directory), reverse=True): + + # In the metadata_directory, there are files with format: + # 1.root.json. The prefix number should be removed. + stripped_filename, version = \ + repo_lib._strip_version_number(metadata_filename, + consistent_snapshot=True) + + # In the repository, the file "role_file.xml" have been added to make + # sure that non-json files aren't loaded. This file should be filtered. + if stripped_filename.endswith('.json'): + if stripped_filename not in TOP_LEVEL_METADATA_FILES: + # Check that both length and hashes are not are not calculated + self.assertIn('length', metadata_files_info_dict[stripped_filename]) + self.assertIn('hashes', metadata_files_info_dict[stripped_filename]) + + + + def _setup_generate_timestamp_metadata_test(self): # Test normal case. repository_name = 'test_repository' temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) @@ -508,8 +609,8 @@ def test_generate_timestamp_metadata(self): repo_lib.METADATA_STAGED_DIRECTORY_NAME) targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME) - snapshot_filename = os.path.join(metadata_directory, - repo_lib.SNAPSHOT_FILENAME) + snapshot_file_path = os.path.join(metadata_directory, + repo_lib.SNAPSHOT_FILENAME) # Set valid generate_timestamp_metadata() arguments. version = 1 @@ -524,7 +625,15 @@ def test_generate_timestamp_metadata(self): repository_junk = repo_tool.load_repository(repository_directory, repository_name) - timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_filename, + return snapshot_file_path, version, expiration_date, storage_backend, \ + repository_name + + + def test_generate_timestamp_metadata(self): + snapshot_file_path, version, expiration_date, storage_backend, \ + repository_name = self._setup_generate_timestamp_metadata_test() + + timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_file_path, version, expiration_date, storage_backend, repository_name) self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata)) @@ -534,14 +643,62 @@ def test_generate_timestamp_metadata(self): repo_lib.generate_timestamp_metadata, 3, version, expiration_date, storage_backend, repository_name) self.assertRaises(securesystemslib.exceptions.FormatError, - repo_lib.generate_timestamp_metadata, snapshot_filename, '3', + repo_lib.generate_timestamp_metadata, snapshot_file_path, '3', expiration_date, storage_backend, repository_name) self.assertRaises(securesystemslib.exceptions.FormatError, - repo_lib.generate_timestamp_metadata, snapshot_filename, version, '3', + repo_lib.generate_timestamp_metadata, snapshot_file_path, version, '3', storage_backend, repository_name) + def test_generate_timestamp_metadata_without_length(self): + snapshot_file_path, version, expiration_date, storage_backend, \ + repository_name = self._setup_generate_timestamp_metadata_test() + + timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_file_path, + version, expiration_date, storage_backend, repository_name, + use_length=False) + self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata)) + + # Check that length is not calculated but hashes is + timestamp_file_info = timestamp_metadata['meta'] + + self.assertNotIn('length', timestamp_file_info['snapshot.json']) + self.assertIn('hashes', timestamp_file_info['snapshot.json']) + + + + def test_generate_timestamp_metadata_without_hashes(self): + snapshot_file_path, version, expiration_date, storage_backend, \ + repository_name = self._setup_generate_timestamp_metadata_test() + + timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_file_path, + version, expiration_date, storage_backend, repository_name, + use_hashes=False) + self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata)) + + # Check that hashes is not calculated but length is + timestamp_file_info = timestamp_metadata['meta'] + + self.assertIn('length', timestamp_file_info['snapshot.json']) + self.assertNotIn('hashes', timestamp_file_info['snapshot.json']) + + + + def test_generate_timestamp_metadata_without_length_and_hashes(self): + snapshot_file_path, version, expiration_date, storage_backend, \ + repository_name = self._setup_generate_timestamp_metadata_test() + + timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_file_path, + version, expiration_date, storage_backend, repository_name, + use_hashes=False, use_length=False) + self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata)) + + # Check that length and hashes attributes are not added + timestamp_file_info = timestamp_metadata['meta'] + self.assertNotIn('length', timestamp_file_info['snapshot.json']) + self.assertNotIn('hashes', timestamp_file_info['snapshot.json']) + def test_sign_metadata(self): diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 7b94027803..e27919a6e9 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -110,6 +110,13 @@ def test_init(self): 'repository_directory', storage_backend, 3, 'targets_directory') self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository, 'repository_directory', 'metadata_directory', 3, storage_backend) + self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository, + 'repository_directory/', 'metadata_directory/', 'targets_directory/', + storage_backend, repository_name, use_timestamp_length=3) + self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository, + 'repository_directory/', 'metadata_directory/', 'targets_directory/', + storage_backend, repository_name, use_timestamp_length=False, + use_timestamp_hashes=3) @@ -457,8 +464,8 @@ def test_writeall_no_files(self): # Add target fileinfo target1_hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'} - target1_fileinfo = tuf.formats.make_fileinfo(555, target1_hashes) - target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes) + target1_fileinfo = tuf.formats.make_targets_fileinfo(555, target1_hashes) + target2_fileinfo = tuf.formats.make_targets_fileinfo(37, target2_hashes) target1 = 'file1.txt' target2 = 'file2.txt' repository.targets.add_target(target1, fileinfo=target1_fileinfo) @@ -1688,7 +1695,7 @@ def test_add_target_to_bin(self): # Test adding a target with fileinfo target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'} - target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes) + target2_fileinfo = tuf.formats.make_targets_fileinfo(37, target2_hashes) target2_filepath = 'file2.txt' rolename = self.targets_object.add_target_to_bin(target2_filepath, 16, @@ -1986,6 +1993,19 @@ def test_create_new_repository(self): self.assertTrue(os.path.exists(metadata_directory)) self.assertTrue(os.path.exists(targets_directory)) + # Test passing custom arguments to control the computation + # of length and hashes for timestamp and snapshot roles. + repository = repo_tool.create_new_repository(repository_directory, + repository_name, use_timestamp_length=True, use_timestamp_hashes=True, + use_snapshot_length=True, use_snapshot_hashes=True) + + # Verify that the argument for optional hashes and length for + # snapshot and timestamp are properly set. + self.assertTrue(repository._use_timestamp_length) + self.assertTrue(repository._use_timestamp_hashes) + self.assertTrue(repository._use_snapshot_length) + self.assertTrue(repository._use_snapshot_hashes) + # Test for a repository name that doesn't exist yet. Note: # The 'test_repository' repository name is created in setup() before this # test case is run. @@ -2098,6 +2118,19 @@ def test_load_repository(self): repo_tool.load_repository, 3) + # Test passing custom arguments to control the computation + # of length and hashes for timestamp and snapshot roles. + repository = repo_tool.load_repository(repository_directory, + 'my-repo', use_timestamp_length=True, use_timestamp_hashes=True, + use_snapshot_length=True, use_snapshot_hashes=True) + + # Verify that the argument for optional hashes and length for + # snapshot and timestamp are properly set. + self.assertTrue(repository._use_timestamp_length) + self.assertTrue(repository._use_timestamp_hashes) + self.assertTrue(repository._use_snapshot_length) + self.assertTrue(repository._use_snapshot_hashes) + # Test for invalid 'repository_directory' (i.e., does not contain the # minimum required metadata. root_filepath = os.path.join(repository_directory, diff --git a/tests/test_updater.py b/tests/test_updater.py index 24c0f236be..9a55d3bc2f 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -494,7 +494,7 @@ def test_1__update_fileinfo(self): self.assertTrue(tuf.formats.FILEDICT_SCHEMA.matches(fileinfo_dict)) root_filepath = os.path.join(self.client_metadata_current, 'root.json') length, hashes = securesystemslib.util.get_file_details(root_filepath) - root_fileinfo = tuf.formats.make_fileinfo(length, hashes) + root_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) self.assertTrue('root.json' in fileinfo_dict) self.assertEqual(fileinfo_dict['root.json'], root_fileinfo) @@ -515,18 +515,18 @@ def test_2__fileinfo_has_changed(self): # Verify that the method returns 'False' if file info was not changed. root_filepath = os.path.join(self.client_metadata_current, 'root.json') length, hashes = securesystemslib.util.get_file_details(root_filepath) - root_fileinfo = tuf.formats.make_fileinfo(length, hashes) + root_fileinfo = tuf.formats.make_targets_fileinfo(length, hashes) self.assertFalse(self.repository_updater._fileinfo_has_changed('root.json', root_fileinfo)) # Verify that the method returns 'True' if length or hashes were changed. new_length = 8 - new_root_fileinfo = tuf.formats.make_fileinfo(new_length, hashes) + new_root_fileinfo = tuf.formats.make_targets_fileinfo(new_length, hashes) self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json', new_root_fileinfo)) # Hashes were changed. new_hashes = {'sha256': self.random_string()} - new_root_fileinfo = tuf.formats.make_fileinfo(length, new_hashes) + new_root_fileinfo = tuf.formats.make_targets_fileinfo(length, new_hashes) self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json', new_root_fileinfo)) @@ -1260,7 +1260,7 @@ def test_6_download_target(self): length, hashes = \ securesystemslib.util.get_file_details(download_filepath, securesystemslib.settings.HASH_ALGORITHMS) - download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes) + download_targetfileinfo = tuf.formats.make_targets_fileinfo(length, hashes) # Add any 'custom' data from the repository's target fileinfo to the # 'download_targetfileinfo' object being tested. diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 2515ef66fb..23a6b4c37d 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -2142,7 +2142,7 @@ def _fileinfo_has_changed(self, metadata_filename, new_fileinfo): A dict object representing the new file information for 'metadata_filename'. 'new_fileinfo' may be 'None' when updating 'root' without having 'snapshot' available. This - dict conforms to 'tuf.formats.FILEINFO_SCHEMA' and has + dict conforms to 'tuf.formats.TARGETS_FILEINFO_SCHEMA' and has the form: {'length': 23423 @@ -2236,7 +2236,7 @@ def _update_fileinfo(self, metadata_filename): # to the fileinfo store. file_length, hashes = securesystemslib.util.get_file_details( current_filepath) - metadata_fileinfo = tuf.formats.make_fileinfo(file_length, hashes) + metadata_fileinfo = tuf.formats.make_targets_fileinfo(file_length, hashes) self.fileinfo[metadata_filename] = metadata_fileinfo diff --git a/tuf/formats.py b/tuf/formats.py index 4dfb6afa4b..caa1490256 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -230,33 +230,40 @@ # Information about target files, like file length and file hash(es). This # schema allows the storage of multiple hashes for the same file (e.g., sha256 # and sha512 may be computed for the same file and stored). -FILEINFO_SCHEMA = SCHEMA.Object( - object_name = 'FILEINFO_SCHEMA', +TARGETS_FILEINFO_SCHEMA = SCHEMA.Object( + object_name = 'TARGETS_FILEINFO_SCHEMA', length = LENGTH_SCHEMA, hashes = HASHDICT_SCHEMA, - version = SCHEMA.Optional(METADATAVERSION_SCHEMA), custom = SCHEMA.Optional(SCHEMA.Object())) +# Information about snapshot and timestamp files. This schema allows for optional +# length and hashes, but version is mandatory. +METADATA_FILEINFO_SCHEMA = SCHEMA.Object( + object_name = 'METADATA_FILEINFO_SCHEMA', + length = SCHEMA.Optional(LENGTH_SCHEMA), + hashes = SCHEMA.Optional(HASHDICT_SCHEMA), + version = METADATAVERSION_SCHEMA) + # A dict holding the version or file information for a particular metadata # role. The dict keys hold the relative file paths, and the dict values the # corresponding version numbers and/or file information. FILEINFODICT_SCHEMA = SCHEMA.DictOf( key_schema = RELPATH_SCHEMA, value_schema = SCHEMA.OneOf([VERSIONINFO_SCHEMA, - FILEINFO_SCHEMA])) + METADATA_FILEINFO_SCHEMA])) # A dict holding the information for a particular target / file. The dict keys # hold the relative file paths, and the dict values the corresponding file # information. FILEDICT_SCHEMA = SCHEMA.DictOf( key_schema = RELPATH_SCHEMA, - value_schema = FILEINFO_SCHEMA) + value_schema = TARGETS_FILEINFO_SCHEMA) # A dict holding a target info. TARGETINFO_SCHEMA = SCHEMA.Object( object_name = 'TARGETINFO_SCHEMA', filepath = RELPATH_SCHEMA, - fileinfo = FILEINFO_SCHEMA) + fileinfo = TARGETS_FILEINFO_SCHEMA) # A list of TARGETINFO_SCHEMA. TARGETINFOS_SCHEMA = SCHEMA.ListOf(TARGETINFO_SCHEMA) @@ -298,14 +305,14 @@ NUMBINS_SCHEMA = SCHEMA.Integer(lo=1) # The fileinfo format of targets specified in the repository and -# developer tools. The fields match that of FILEINFO_SCHEMA, only all +# developer tools. The fields match that of TARGETS_FILEINFO_SCHEMA, only all # fields are optional. CUSTOM_SCHEMA = SCHEMA.DictOf( key_schema = SCHEMA.AnyString(), value_schema = SCHEMA.Any() ) -LOOSE_FILEINFO_SCHEMA = SCHEMA.Object( - object_name = "LOOSE_FILEINFO_SCHEMA", +LOOSE_TARGETS_FILEINFO_SCHEMA = SCHEMA.Object( + object_name = "LOOSE_TARGETS_FILEINFO_SCHEMA", length = SCHEMA.Optional(LENGTH_SCHEMA), hashes = SCHEMA.Optional(HASHDICT_SCHEMA), version = SCHEMA.Optional(METADATAVERSION_SCHEMA), @@ -314,7 +321,7 @@ PATH_FILEINFO_SCHEMA = SCHEMA.DictOf( key_schema = RELPATH_SCHEMA, - value_schema = LOOSE_FILEINFO_SCHEMA) + value_schema = LOOSE_TARGETS_FILEINFO_SCHEMA) # TUF roledb ROLEDB_SCHEMA = SCHEMA.Object( @@ -376,7 +383,7 @@ spec_version = SPECIFICATION_VERSION_SCHEMA, version = METADATAVERSION_SCHEMA, expires = securesystemslib.formats.ISO8601_DATETIME_SCHEMA, - meta = FILEDICT_SCHEMA) + meta = FILEINFODICT_SCHEMA) # project.cfg file: stores information about the project in a json dictionary @@ -508,7 +515,7 @@ def build_dict_conforming_to_schema(schema, **kwargs): schema - A schema.Object, like TIMESTAMP_SCHEMA, FILEINFO_SCHEMA, + A schema.Object, like TIMESTAMP_SCHEMA, TARGETS_FILEINFO_SCHEMA, securesystemslib.formats.SIGNATURE_SCHEMA, etc. **kwargs @@ -759,11 +766,11 @@ def parse_base64(base64_string): -def make_fileinfo(length, hashes, version=None, custom=None): +def make_targets_fileinfo(length, hashes, custom=None): """ - Create a dictionary conformant to 'FILEINFO_SCHEMA'. - This dict describes both metadata and target files. + Create a dictionary conformant to 'TARGETS_FILEINFO_SCHEMA'. + This dict describes a target file. length: @@ -773,36 +780,68 @@ def make_fileinfo(length, hashes, version=None, custom=None): A dict of hashes in 'HASHDICT_SCHEMA' format, which has the form: {'sha256': 123df8a9b12, 'sha512': 324324dfc121, ...} - version: - An optional integer representing the version of the file. - custom: An optional object providing additional information about the file. - securesystemslib.exceptions.FormatError, if the 'FILEINFO_SCHEMA' to be + securesystemslib.exceptions.FormatError, if the 'TARGETS_FILEINFO_SCHEMA' to be returned does not have the correct format. - - If any of the arguments are incorrectly formatted, the dict - returned will be checked for formatting errors, and if found, - will raise a 'securesystemslib.exceptions.FormatError' exception. - - A dictionary conformant to 'FILEINFO_SCHEMA', representing the file - information of a metadata or target file. + A dictionary conformant to 'TARGETS_FILEINFO_SCHEMA', representing the file + information of a target file. """ fileinfo = {'length' : length, 'hashes' : hashes} - if version is not None: - fileinfo['version'] = version - if custom is not None: fileinfo['custom'] = custom # Raise 'securesystemslib.exceptions.FormatError' if the check fails. - FILEINFO_SCHEMA.check_match(fileinfo) + TARGETS_FILEINFO_SCHEMA.check_match(fileinfo) + + return fileinfo + + + +def make_metadata_fileinfo(version, length=None, hashes=None): + """ + + Create a dictionary conformant to 'METADATA_FILEINFO_SCHEMA'. + This dict describes one of the metadata files used for timestamp and + snapshot roles. + + + version: + An integer representing the version of the file. + + length: + An optional integer representing the size of the file. + + hashes: + An optional dict of hashes in 'HASHDICT_SCHEMA' format, which has the form: + {'sha256': 123df8a9b12, 'sha512': 324324dfc121, ...} + + + + securesystemslib.exceptions.FormatError, if the 'METADATA_FILEINFO_SCHEMA' to be + returned does not have the correct format. + + + A dictionary conformant to 'METADATA_FILEINFO_SCHEMA', representing the file + information of a metadata file. + """ + + fileinfo = {'version' : version} + + if length: + fileinfo['length'] = length + + if hashes: + fileinfo['hashes'] = hashes + + # Raise 'securesystemslib.exceptions.FormatError' if the check fails. + METADATA_FILEINFO_SCHEMA.check_match(fileinfo) return fileinfo diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 545edf82b0..fe77ad8413 100644 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -96,11 +96,15 @@ HASH_FUNCTION = tuf.settings.DEFAULT_HASH_ALGORITHM + + def _generate_and_write_metadata(rolename, metadata_filename, targets_directory, metadata_directory, storage_backend, consistent_snapshot=False, filenames=None, allow_partially_signed=False, increment_version_number=True, repository_name='default', - use_existing_fileinfo=False): + use_existing_fileinfo=False, use_timestamp_length=True, + use_timestamp_hashes=True, use_snapshot_length=False, + use_snapshot_hashes=False): """ Non-public function that can generate and write the metadata for the specified 'rolename'. It also increments the version number of 'rolename' if @@ -127,19 +131,23 @@ def _generate_and_write_metadata(rolename, metadata_filename, elif rolename == 'snapshot': - targets_filename = TARGETS_FILENAME[:-len(METADATA_EXTENSION)] metadata = generate_snapshot_metadata(metadata_directory, - roleinfo['version'], roleinfo['expires'], targets_filename, - storage_backend, consistent_snapshot, repository_name) + roleinfo['version'], roleinfo['expires'], + storage_backend, consistent_snapshot, repository_name, + use_length=use_snapshot_length, use_hashes=use_snapshot_hashes) _log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'], SNAPSHOT_EXPIRES_WARN_SECONDS) elif rolename == 'timestamp': - snapshot_filename = filenames['snapshot'] - metadata = generate_timestamp_metadata(snapshot_filename, roleinfo['version'], - roleinfo['expires'], storage_backend, repository_name) + # If filenames don't have "snapshot_filename" key, defaults to "snapshot.json" + snapshot_file_path = (filenames and filenames['snapshot']) \ + or SNAPSHOT_FILENAME + + metadata = generate_timestamp_metadata(snapshot_file_path, roleinfo['version'], + roleinfo['expires'], storage_backend, repository_name, + use_length=use_timestamp_length, use_hashes=use_timestamp_hashes) _log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'], TIMESTAMP_EXPIRES_WARN_SECONDS) @@ -155,9 +163,10 @@ def _generate_and_write_metadata(rolename, metadata_filename, # Don't hash-prefix consistent target files if they are handled out of band consistent_targets = consistent_snapshot and not use_existing_fileinfo - metadata = generate_targets_metadata(targets_directory, roleinfo['paths'], - roleinfo['version'], roleinfo['expires'], roleinfo['delegations'], - consistent_targets, use_existing_fileinfo, storage_backend) + metadata = generate_targets_metadata(targets_directory, + roleinfo['paths'], roleinfo['version'], roleinfo['expires'], + roleinfo['delegations'], consistent_targets, use_existing_fileinfo, + storage_backend) # Before writing 'rolename' to disk, automatically increment its version # number (if 'increment_version_number' is True) so that the caller does not @@ -921,11 +930,11 @@ def get_top_level_metadata_filenames(metadata_directory): -def get_metadata_fileinfo(filename, storage_backend, custom=None): +def get_targets_metadata_fileinfo(filename, storage_backend, custom=None): """ Retrieve the file information of 'filename'. The object returned - conforms to 'tuf.formats.FILEINFO_SCHEMA'. The information + conforms to 'tuf.formats.TARGETS_FILEINFO_SCHEMA'. The information generated for 'filename' is stored in metadata files like 'targets.json'. The fileinfo object returned has the form: @@ -953,7 +962,7 @@ def get_metadata_fileinfo(filename, storage_backend, custom=None): such as file size and its hash. - A dictionary conformant to 'tuf.formats.FILEINFO_SCHEMA'. This + A dictionary conformant to 'tuf.formats.TARGETS_FILEINFO_SCHEMA'. This dictionary contains the length, hashes, and custom data about the 'filename' metadata file. SHA256 hashes are generated by default. """ @@ -974,7 +983,7 @@ def get_metadata_fileinfo(filename, storage_backend, custom=None): filesize, filehashes = securesystemslib.util.get_file_details(filename, tuf.settings.FILE_HASH_ALGORITHMS, storage_backend) - return tuf.formats.make_fileinfo(filesize, filehashes, custom=custom) + return tuf.formats.make_targets_fileinfo(filesize, filehashes, custom=custom) @@ -1498,7 +1507,7 @@ def _generate_targets_fileinfo(target_files, targets_directory, custom_data = fileinfo.get('custom', None) filedict[relative_targetpath] = \ - get_metadata_fileinfo(target_path, storage_backend, custom_data) + get_targets_metadata_fileinfo(target_path, storage_backend, custom_data) # Copy 'target_path' to 'digest_target' if consistent hashing is enabled. if write_consistent_targets: @@ -1512,10 +1521,9 @@ def _generate_targets_fileinfo(target_files, targets_directory, - def generate_snapshot_metadata(metadata_directory, version, expiration_date, - targets_filename, storage_backend, consistent_snapshot=False, - repository_name='default'): + storage_backend, consistent_snapshot=False, + repository_name='default', use_length=False, use_hashes=False): """ Create the snapshot metadata. The minimum metadata must exist (i.e., @@ -1537,10 +1545,6 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, The expiration date of the metadata file. Conformant to 'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'. - targets_filename: - The filename of the top-level targets role. The hash and file size of - this file is listed in the snapshot role. - storage_backend: An object which implements securesystemslib.storage.StorageBackendInterface. @@ -1554,6 +1558,22 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, The name of the repository. If not supplied, 'rolename' is added to the 'default' repository. + use_length: + Whether to include the optional length attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + + use_hashes: + Whether to include the optional hashes attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + securesystemslib.exceptions.FormatError, if the arguments are improperly formatted. @@ -1575,16 +1595,33 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, securesystemslib.formats.PATH_SCHEMA.check_match(metadata_directory) tuf.formats.METADATAVERSION_SCHEMA.check_match(version) securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date) - securesystemslib.formats.PATH_SCHEMA.check_match(targets_filename) securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_length) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_hashes) # Snapshot's 'fileinfodict' shall contain the version number of Root, # Targets, and all delegated roles of the repository. fileinfodict = {} - fileinfodict[TARGETS_FILENAME] = get_metadata_versioninfo(targets_filename, + + length, hashes = securesystemslib.util.get_file_details( + os.path.join(metadata_directory, TARGETS_FILENAME), + tuf.settings.FILE_HASH_ALGORITHMS, storage_backend) + + length = (use_length and length) or None + hashes = (use_hashes and hashes) or None + + targets_role = TARGETS_FILENAME[:-len(METADATA_EXTENSION)] + + targets_file_version = get_metadata_versioninfo(targets_role, repository_name) + # Make file info dictionary with make_metadata_fileinfo because + # in the tuf spec length and hashes are optional for all + # METAFILES in snapshot.json including the top-level targets file. + fileinfodict[TARGETS_FILENAME] = tuf.formats.make_metadata_fileinfo( + targets_file_version['version'], length, hashes) + # Search the metadata directory and generate the versioninfo of all the role # files found there. This information is stored in the 'meta' field of # 'snapshot.json'. @@ -1607,9 +1644,26 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, # list these roles found in the metadata directory. if tuf.roledb.role_exists(rolename, repository_name) and \ rolename not in tuf.roledb.TOP_LEVEL_ROLES: - fileinfodict[metadata_name] = get_metadata_versioninfo(rolename, + + length = None + hashes = None + # We want to make sure we are calculating length and hashes only when + # at least one of them is needed. Otherwise, for adoptors of tuf with + # lots of delegations, this will cause unnecessary overhead. + if use_length or use_hashes: + length, hashes = securesystemslib.util.get_file_details( + os.path.join(metadata_directory, metadata_filename), + tuf.settings.FILE_HASH_ALGORITHMS) + + length = (use_length and length) or None + hashes = (use_hashes and hashes) or None + + file_version = get_metadata_versioninfo(rolename, repository_name) + fileinfodict[metadata_name] = tuf.formats.make_metadata_fileinfo( + file_version['version'], length, hashes) + else: logger.debug('Metadata file has an unsupported file' ' extension: ' + metadata_filename) @@ -1633,16 +1687,17 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, -def generate_timestamp_metadata(snapshot_filename, version, expiration_date, - storage_backend, repository_name): + +def generate_timestamp_metadata(snapshot_file_path, version, expiration_date, + storage_backend, repository_name, use_length=True, use_hashes=True): """ Generate the timestamp metadata object. The 'snapshot.json' file must exist. - snapshot_filename: - The required filename of the snapshot metadata file. The timestamp role + snapshot_file_path: + Path to the required snapshot metadata file. The timestamp role needs to the calculate the file size and hash of this file. version: @@ -1662,6 +1717,16 @@ def generate_timestamp_metadata(snapshot_filename, version, expiration_date, The name of the repository. If not supplied, 'rolename' is added to the 'default' repository. + use_length: + Whether to include the optional length attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_hashes: + Whether to include the optional hashes attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + securesystemslib.exceptions.FormatError, if the generated timestamp metadata object cannot be formatted correctly, or one of the arguments is improperly @@ -1678,18 +1743,27 @@ def generate_timestamp_metadata(snapshot_filename, version, expiration_date, # This check ensures arguments have the appropriate number of objects and # object types, and that all dict keys are properly named. # Raise 'securesystemslib.exceptions.FormatError' if the check fails. - securesystemslib.formats.PATH_SCHEMA.check_match(snapshot_filename) + securesystemslib.formats.PATH_SCHEMA.check_match(snapshot_file_path) tuf.formats.METADATAVERSION_SCHEMA.check_match(version) securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_length) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_hashes) - # Retrieve the versioninfo of the Snapshot metadata file. snapshot_fileinfo = {} - length, hashes = securesystemslib.util.get_file_details(snapshot_filename, + + length, hashes = securesystemslib.util.get_file_details(snapshot_file_path, tuf.settings.FILE_HASH_ALGORITHMS, storage_backend) + + length = (use_length and length) or None + hashes = (use_hashes and hashes) or None + + snapshot_filename = os.path.basename(snapshot_file_path) + # Retrieve the versioninfo of the Snapshot metadata file. snapshot_version = get_metadata_versioninfo('snapshot', repository_name) - snapshot_fileinfo[SNAPSHOT_FILENAME] = \ - tuf.formats.make_fileinfo(length, hashes, version=snapshot_version['version']) + snapshot_fileinfo[snapshot_filename] = \ + tuf.formats.make_metadata_fileinfo(snapshot_version['version'], + length, hashes) # Generate the timestamp metadata object. # Use generalized build_dict_conforming_to_schema func to produce a dict that diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 55e960153d..3a2f4dce28 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -178,6 +178,32 @@ class Repository(object): The name of the repository. If not supplied, 'rolename' is added to the 'default' repository. + use_timestamp_length: + Whether to include the optional length attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_timestamp_hashes: + Whether to include the optional hashes attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_snapshot_length: + Whether to include the optional length attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + + use_snapshot_hashes: + Whether to include the optional hashes attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + securesystemslib.exceptions.FormatError, if the arguments are improperly formatted. @@ -191,7 +217,9 @@ class Repository(object): """ def __init__(self, repository_directory, metadata_directory, - targets_directory, storage_backend, repository_name='default'): + targets_directory, storage_backend, repository_name='default', + use_timestamp_length=True, use_timestamp_hashes=True, + use_snapshot_length=False, use_snapshot_hashes=False): # Do the arguments have the correct format? # Ensure the arguments have the appropriate number of objects and object @@ -201,12 +229,20 @@ def __init__(self, repository_directory, metadata_directory, securesystemslib.formats.PATH_SCHEMA.check_match(metadata_directory) securesystemslib.formats.PATH_SCHEMA.check_match(targets_directory) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_timestamp_length) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_timestamp_hashes) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_snapshot_length) + securesystemslib.formats.BOOLEAN_SCHEMA.check_match(use_snapshot_hashes) self._repository_directory = repository_directory self._metadata_directory = metadata_directory self._targets_directory = targets_directory self._repository_name = repository_name self._storage_backend = storage_backend + self._use_timestamp_length = use_timestamp_length + self._use_timestamp_hashes = use_timestamp_hashes + self._use_snapshot_length = use_snapshot_length + self._use_snapshot_hashes = use_snapshot_hashes try: tuf.roledb.create_roledb(repository_name) @@ -330,14 +366,18 @@ def writeall(self, consistent_snapshot=False, use_existing_fileinfo=False): filenames['snapshot'], self._targets_directory, self._metadata_directory, self._storage_backend, consistent_snapshot, filenames, - repository_name=self._repository_name) + repository_name=self._repository_name, + use_snapshot_length=self._use_snapshot_length, + use_snapshot_hashes=self._use_snapshot_hashes) # Generate the 'timestamp.json' metadata file. if 'timestamp' in dirty_rolenames: repo_lib._generate_and_write_metadata('timestamp', filenames['timestamp'], self._targets_directory, self._metadata_directory, self._storage_backend, consistent_snapshot, - filenames, repository_name=self._repository_name) + filenames, repository_name=self._repository_name, + use_timestamp_length=self._use_timestamp_length, + use_timestamp_hashes=self._use_timestamp_hashes) tuf.roledb.unmark_dirty(dirty_rolenames, self._repository_name) @@ -1958,11 +1998,10 @@ def add_target(self, filepath, custom=None, fileinfo=None): fileinfo: An optional fileinfo dictionary, conforming to - tuf.formats.FILEINFO_SCHEMA, providing full information about the + tuf.formats.TARGETS_FILEINFO_SCHEMA, providing full information about the file, i.e: { 'length': 101, 'hashes': { 'sha256': '123EDF...' }, - 'version': 2, # optional 'custom': { 'permissions': '600'} # optional } NOTE: if a custom value is passed, the fileinfo parameter must be None. @@ -1992,7 +2031,7 @@ def add_target(self, filepath, custom=None, fileinfo=None): " custom or fileinfo, not both.") if fileinfo: - tuf.formats.FILEINFO_SCHEMA.check_match(fileinfo) + tuf.formats.TARGETS_FILEINFO_SCHEMA.check_match(fileinfo) if custom is None: custom = {} @@ -2649,7 +2688,7 @@ def add_target_to_bin(self, target_filepath, number_of_bins=DEFAULT_NUM_BINS, Note: 'number_of_bins' must be a power of 2. fileinfo: - An optional fileinfo object, conforming to tuf.formats.FILEINFO_SCHEMA, + An optional fileinfo object, conforming to tuf.formats.TARGETS_FILEINFO_SCHEMA, providing full information about the file. @@ -2843,7 +2882,8 @@ def _keys_to_keydict(keys): def create_new_repository(repository_directory, repository_name='default', - storage_backend=None): + storage_backend=None, use_timestamp_length=True, use_timestamp_hashes=True, + use_snapshot_length=False, use_snapshot_hashes=False): """ Create a new repository, instantiate barebones metadata for the top-level @@ -2867,6 +2907,32 @@ def create_new_repository(repository_directory, repository_name='default', securesystemslib.storage.StorageBackendInterface. When no object is passed a FilesystemBackend will be instantiated and used. + use_timestamp_length: + Whether to include the optional length attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_timestamp_hashes: + Whether to include the optional hashes attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_snapshot_length: + Whether to include the optional length attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + + use_snapshot_hashes: + Whether to include the optional hashes attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + securesystemslib.exceptions.FormatError, if the arguments are improperly formatted. @@ -2919,7 +2985,8 @@ def create_new_repository(repository_directory, repository_name='default', # have been set and contain default values (e.g., Root roles has a threshold # of 1, expires 1 year into the future, etc.) repository = Repository(repository_directory, metadata_directory, - targets_directory, storage_backend, repository_name) + targets_directory, storage_backend, repository_name, use_timestamp_length, + use_timestamp_hashes, use_snapshot_length, use_snapshot_hashes) return repository @@ -2928,7 +2995,8 @@ def create_new_repository(repository_directory, repository_name='default', def load_repository(repository_directory, repository_name='default', - storage_backend=None): + storage_backend=None, use_timestamp_length=True, use_timestamp_hashes=True, + use_snapshot_length=False, use_snapshot_hashes=False): """ Return a repository object containing the contents of metadata files loaded @@ -2948,6 +3016,32 @@ def load_repository(repository_directory, repository_name='default', securesystemslib.storage.StorageBackendInterface. When no object is passed a FilesystemBackend will be instantiated and used. + use_timestamp_length: + Whether to include the optional length attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_timestamp_hashes: + Whether to include the optional hashes attribute of the snapshot + metadata file in the timestamp metadata. + Default is True. + + use_snapshot_length: + Whether to include the optional length attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + + use_snapshot_hashes: + Whether to include the optional hashes attribute for targets + metadata files in the snapshot metadata. + Default is False to save bandwidth but without losing security + from rollback attacks. + Read more at section 5.6 from the Mercury paper: + https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy + securesystemslib.exceptions.FormatError, if 'repository_directory' or any of the metadata files are improperly formatted. @@ -2979,7 +3073,8 @@ def load_repository(repository_directory, repository_name='default', # The Repository() object loaded (i.e., containing all the metadata roles # found) and returned. repository = Repository(repository_directory, metadata_directory, - targets_directory, storage_backend, repository_name) + targets_directory, storage_backend, repository_name, use_timestamp_length, + use_timestamp_hashes, use_snapshot_length, use_snapshot_hashes) filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory)