Skip to content

Port to securesystemslib with abstract files and directories (securesystemslib PR 232) #1024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-pinned.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pycparser==2.20 # via cffi
pynacl==1.3.0 # via securesystemslib
python-dateutil==2.8.1 # via securesystemslib
requests==2.23.0
securesystemslib[colors,crypto,pynacl]==0.14.2
securesystemslib[colors,crypto,pynacl]==0.15.0
six==1.14.0
subprocess32==3.5.4 ; python_version < '3' # via securesystemslib
urllib3==1.25.9 # via requests
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
'iso8601>=0.1.12',
'requests>=2.19.1',
'six>=1.11.0',
'securesystemslib>=0.12.0'
'securesystemslib>=0.15.0'
],
tests_require = [
'mock; python_version < "3.3"'
Expand Down
4 changes: 0 additions & 4 deletions tests/repository_data/generate_project_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@
project.expiration = datetime.datetime(2030, 1, 1, 0, 0)
project('role1').expiration = datetime.datetime(2030, 1, 1, 0, 0)

# Compress the project role metadata so that the unit tests have a pre-generated
# example of compressed metadata.
project.compressions = ['gz']

# Create the actual metadata files, which are saved to 'metadata.staged'.
if not options.dry_run:
project.write()
4 changes: 3 additions & 1 deletion tests/test_developer_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import tuf.exceptions

import securesystemslib
import securesystemslib.exceptions

from tuf.developer_tool import METADATA_DIRECTORY_NAME
from tuf.developer_tool import TARGETS_DIRECTORY_NAME
Expand Down Expand Up @@ -188,7 +189,8 @@ def test_load_project(self):

# Test non-existent project filepath.
nonexistent_path = os.path.join(local_tmp, 'nonexistent')
self.assertRaises(IOError, developer_tool.load_project, nonexistent_path)
self.assertRaises(securesystemslib.exceptions.StorageError,
developer_tool.load_project, nonexistent_path)

# Copy the pregenerated metadata.
project_data_filepath = os.path.join('repository_data', 'project')
Expand Down
152 changes: 50 additions & 102 deletions tests/test_repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import tuf.repository_tool as repo_tool

import securesystemslib
import securesystemslib.exceptions
import securesystemslib.rsa_keys
import securesystemslib.interface
import securesystemslib.storage
import six

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -126,8 +128,9 @@ def test_import_rsa_privatekey_from_file(self):
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(IOError, repo_lib.import_rsa_privatekey_from_file,
nonexistent_keypath, 'pw')
self.assertRaises(securesystemslib.exceptions.StorageError,
repo_lib.import_rsa_privatekey_from_file,
nonexistent_keypath, 'pw')

# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
Expand Down Expand Up @@ -160,7 +163,8 @@ def test_import_ed25519_privatekey_from_file(self):
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(IOError, repo_lib.import_ed25519_privatekey_from_file,
self.assertRaises(securesystemslib.exceptions.StorageError,
repo_lib.import_ed25519_privatekey_from_file,
nonexistent_keypath, 'pw')

# Invalid key file argument.
Expand Down Expand Up @@ -215,7 +219,7 @@ def test_get_metadata_filenames(self):
'targets.json': os.path.join(metadata_directory, 'targets.json'),
'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'),
'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')}
self.assertEqual(filenames, repo_lib.get_metadata_filenames())
self.assertEqual(filenames, repo_lib.get_metadata_filenames(metadata_directory))


# Test improperly formatted argument.
Expand All @@ -241,17 +245,23 @@ def test_get_metadata_fileinfo(self):
fileinfo = {'length': file_length, 'hashes': file_hashes}
self.assertTrue(tuf.formats.FILEINFO_SCHEMA.matches(fileinfo))

self.assertEqual(fileinfo, repo_lib.get_metadata_fileinfo(test_filepath))
storage_backend = securesystemslib.storage.FilesystemBackend()

self.assertEqual(fileinfo, repo_lib.get_metadata_fileinfo(test_filepath,
storage_backend))


# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_fileinfo, 3)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.get_metadata_fileinfo, 3,
storage_backend)


# Test non-existent file.
nonexistent_filepath = os.path.join(temporary_directory, 'oops.txt')
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.get_metadata_fileinfo,
nonexistent_filepath)
self.assertRaises(securesystemslib.exceptions.Error,
repo_lib.get_metadata_fileinfo,
nonexistent_filepath, storage_backend)



Expand Down Expand Up @@ -440,8 +450,9 @@ def test_generate_snapshot_metadata(self):

# Load a valid repository so that top-level roles exist in roledb and
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
storage_backend = securesystemslib.storage.FilesystemBackend()
repository = repo_tool.Repository(repository_directory, metadata_directory,
targets_directory)
targets_directory, storage_backend)

repository_junk = repo_tool.load_repository(repository_directory)

Expand All @@ -458,26 +469,27 @@ def test_generate_snapshot_metadata(self):
repo_lib.generate_snapshot_metadata(metadata_directory, version,
expiration_date,
targets_filename,
storage_backend,
consistent_snapshot=False)
self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata))


# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
3, version, expiration_date,
targets_filename, consistent_snapshot=False)
targets_filename, consistent_snapshot=False, storage_backend=storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, '3', expiration_date,
targets_filename, consistent_snapshot=False)
targets_filename, storage_backend, consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, version, '3',
targets_filename, consistent_snapshot=False)
targets_filename, storage_backend, consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, version, expiration_date,
3, consistent_snapshot=False)
3, storage_backend, consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, version, expiration_date,
targets_filename, 3)
targets_filename, 3, storage_backend)



Expand Down Expand Up @@ -599,85 +611,25 @@ def test_write_metadata_file(self):
version_number = root_signable['signed']['version'] + 1

self.assertFalse(os.path.exists(output_filename))
storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
consistent_snapshot=False)
consistent_snapshot=False, storage_backend=storage_backend)
self.assertTrue(os.path.exists(output_filename))

# Attempt to over-write the previously written metadata file. An exception
# is not raised in this case, only a debug message is logged.
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
consistent_snapshot=False)

# Try to write a consistent metadate file. An exception is not raised in
# this case. For testing purposes, root.json should be a hard link to the
# consistent metadata file. We should verify that root.json points to
# the latest consistent files.
tuf.settings.CONSISTENT_METHOD = 'hard_link'
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
consistent_snapshot=True)

# Test if the consistent files are properly named
# Filename format of a consistent file: <version number>.rolename.json
version_and_filename = str(version_number) + '.' + 'root.json'
first_version_output_file = os.path.join(temporary_directory, version_and_filename)
self.assertTrue(os.path.exists(first_version_output_file))

# Verify that the consistent file content is equal to 'output_filename'.
self.assertEqual(
securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(first_version_output_file))

# Try to add more consistent metadata files.
version_number += 1
root_signable['signed']['version'] = version_number
repo_lib.write_metadata_file(root_signable, output_filename,
version_number, consistent_snapshot=True)

# Test if the latest root.json points to the expected consistent file
# and consistent metadata do not all point to the same root.json
version_and_filename = str(version_number) + '.' + 'root.json'
second_version_output_file = os.path.join(temporary_directory, version_and_filename)
self.assertTrue(os.path.exists(second_version_output_file))

# Verify that the second version is equal to the second output file, and
# that the second output filename differs from the first.
self.assertEqual(securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(second_version_output_file))
self.assertNotEqual(securesystemslib.util.get_file_details(output_filename),
securesystemslib.util.get_file_details(first_version_output_file))

# Test for an improper settings.CONSISTENT_METHOD string value.
tuf.settings.CONSISTENT_METHOD = 'somebadidea'

# Test for invalid consistent methods on systems other than Windows,
# which always uses the copy method.
if platform.system() == 'Windows':
pass

else:
self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError,
repo_lib.write_metadata_file, root_signable, output_filename,
version_number, consistent_snapshot=True)

# Try to create a link to root.json when root.json doesn't exist locally.
# repository_lib should log a message if this is the case.
tuf.settings.CONSISTENT_METHOD = 'hard_link'
os.remove(output_filename)
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
consistent_snapshot=True)

# Reset CONSISTENT_METHOD so that subsequent tests work as expected.
tuf.settings.CONSISTENT_METHOD = 'copy'
consistent_snapshot=False, storage_backend=storage_backend)

# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
3, output_filename, version_number, False)
3, output_filename, version_number, False, storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
root_signable, 3, version_number, False)
root_signable, 3, version_number, False, storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, '3', False)
root_signable, output_filename, '3', False, storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, version_number, 3)
root_signable, output_filename, storage_backend, version_number, 3)



Expand Down Expand Up @@ -731,13 +683,6 @@ def test_create_tuf_client_directory(self):



def test__check_directory(self):
# Test for non-existent directory.
self.assertRaises(securesystemslib.exceptions.Error,
repo_lib._check_directory, 'non-existent')



def test__generate_and_write_metadata(self):
# Test for invalid, or unsupported, rolename.
# Load the root metadata provided in 'tuf/tests/repository_data/'.
Expand Down Expand Up @@ -774,9 +719,11 @@ def test__generate_and_write_metadata(self):
tuf.roledb.add_role('obsolete_role', targets_roleinfo,
repository_name=repository_name)

storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
targets_directory, metadata_directory, consistent_snapshot=False,
filenames=None, repository_name=repository_name)
targets_directory, metadata_directory, storage_backend,
consistent_snapshot=False, filenames=None,
repository_name=repository_name)

snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
Expand All @@ -785,7 +732,8 @@ def test__generate_and_write_metadata(self):
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
'obsolete_role.json')))
tuf.repository_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'], False, repository_name)
snapshot_signable['signed'], False, repository_name,
storage_backend)
self.assertFalse(os.path.exists(metadata_directory + 'obsolete_role.json'))
shutil.copyfile(targets_metadata, obsolete_metadata)

Expand All @@ -801,27 +749,29 @@ def test__delete_obsolete_metadata(self):
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath)
storage_backend = securesystemslib.storage.FilesystemBackend()

# Create role metadata that should not exist in snapshot.json.
role1_filepath = os.path.join('repository_data', 'repository', 'metadata',
'role1.json')
shutil.copyfile(role1_filepath, os.path.join(metadata_directory, 'role2.json'))

repo_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'], True, repository_name)
snapshot_signable['signed'], True, repository_name, storage_backend)

# _delete_obsolete_metadata should never delete root.json.
root_filepath = os.path.join('repository_data', 'repository', 'metadata',
'root.json')
shutil.copyfile(root_filepath, os.path.join(metadata_directory, 'root.json'))
repo_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'], True, repository_name)
snapshot_signable['signed'], True, repository_name, storage_backend)
self.assertTrue(os.path.exists(os.path.join(metadata_directory, 'root.json')))

# Verify what happens for a non-existent metadata directory (a debug
# message is logged).
repo_lib._delete_obsolete_metadata('non-existent',
snapshot_signable['signed'], True, repository_name)
self.assertRaises(securesystemslib.exceptions.StorageError,
repo_lib._delete_obsolete_metadata, 'non-existent',
snapshot_signable['signed'], True, repository_name, storage_backend)


def test__load_top_level_metadata(self):
Expand All @@ -843,12 +793,8 @@ def test__load_top_level_metadata(self):
signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
signable['signatures'].append(signable['signatures'][0])

repo_lib.write_metadata_file(signable, root_file, 8, False)

# Attempt to load a repository that contains a compressed Root file.
repository = repo_tool.create_new_repository(repository_directory, repository_name)
filenames = repo_lib.get_metadata_filenames(metadata_directory)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib.write_metadata_file(signable, root_file, 8, False, storage_backend)

filenames = repo_lib.get_metadata_filenames(metadata_directory)
repository = repo_tool.create_new_repository(repository_directory, repository_name)
Expand All @@ -872,7 +818,9 @@ def test__load_top_level_metadata(self):
if role_file.endswith('.json') and not role_file.startswith('root'):
role_filename = os.path.join(metadata_directory, role_file)
os.remove(role_filename)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
self.assertRaises(tuf.exceptions.RepositoryError,
repo_lib._load_top_level_metadata, repository, filenames,
repository_name)

# Remove the required Root file and verify that an exception is raised.
os.remove(os.path.join(metadata_directory, 'root.json'))
Expand Down
Loading