diff --git a/docs/TUTORIAL.md b/docs/TUTORIAL.md index 8efea6e3fb..313efd142a 100644 --- a/docs/TUTORIAL.md +++ b/docs/TUTORIAL.md @@ -352,15 +352,8 @@ the target filepaths to metadata. # in targets metadata. >>> repository = load_repository('repository') -# get_filepaths_in_directory() returns a list of file paths in a directory. It -# can also return files in sub-directories if 'recursive_walk' is True. ->>> list_of_targets = repository.get_filepaths_in_directory( -... "repository/targets/", recursive_walk=False, followlinks=True) - -# Note: Since we set the 'recursive_walk' argument to false, the 'myproject' -# sub-directory is excluded from 'list_of_targets'. ->>> list_of_targets -['/path/to/repository/targets/file2.txt', '/path/to/repository/targets/file1.txt', '/path/to/repository/targets/file3.txt'] +# Create a list of all targets in the directory. +>>> list_of_targets = ['file1.txt', 'file2.txt', 'file3.txt'] # Add the list of target paths to the metadata of the top-level Targets role. # Any target file paths that might already exist are NOT replaced, and @@ -376,8 +369,11 @@ the target filepaths to metadata. # (octal number specifying file access for owner, group, others e.g., 0755) is # added alongside the default fileinfo. All target objects in metadata include # the target's filepath, hash, and length. ->>> target4_filepath = os.path.abspath("repository/targets/myproject/file4.txt") ->>> octal_file_permissions = oct(os.stat(target4_filepath).st_mode)[4:] +# Note: target path passed to add_target() method has to be relative +# to the targets directory or an exception is raised. +>>> target4_filepath = 'myproject/file4.txt' +>>> target4_abspath = os.path.abspath(os.path.join('repository', 'targets', target4_filepath)) +>>> octal_file_permissions = oct(os.stat(target4_abspath).st_mode)[4:] >>> custom_file_permissions = {'file_permissions': octal_file_permissions} >>> repository.targets.add_target(target4_filepath, custom_file_permissions) ``` @@ -498,7 +494,6 @@ targets and generate signed metadata. # Make a delegation (delegate trust of 'myproject/*.txt' files) from "targets" # to "unclaimed", where "unclaimed" initially contains zero targets. -# NOTE: Please ignore the warning about the path pattern's location (see #963) >>> repository.targets.delegate('unclaimed', [public_unclaimed_key], ['myproject/*.txt']) # Thereafter, we can access the delegated role by its name to e.g. add target @@ -635,8 +630,7 @@ to some role. >>> repository.targets('unclaimed').remove_target("myproject/file4.txt") # Get a list of target paths for the hashed bins. ->>> targets = repository.get_filepaths_in_directory( -... 'repository/targets/myproject', recursive_walk=True) +>>> targets = ['myproject/file4.txt'] # Delegate trust to 32 hashed bin roles. Each role is responsible for the set # of target files, determined by the path hash prefix. TUF evenly distributes diff --git a/tests/test_mix_and_match_attack.py b/tests/test_mix_and_match_attack.py index d0af8d1da6..a35c8854a2 100755 --- a/tests/test_mix_and_match_attack.py +++ b/tests/test_mix_and_match_attack.py @@ -218,7 +218,7 @@ def test_with_tuf(self): # new version is generated. with open(file3_path, 'wt') as file_object: file_object.write('This is role2\'s target file.') - repository.targets('role1').add_target(file3_path) + repository.targets('role1').add_target(os.path.basename(file3_path)) repository.writeall() diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index b0ded5095f..e4ac72bdf1 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -213,14 +213,14 @@ def test_writeall(self): # (4) Add target files. - target1 = os.path.join(targets_directory, 'file1.txt') - target2 = os.path.join(targets_directory, 'file2.txt') - target3 = os.path.join(targets_directory, 'file3.txt') + target1 = 'file1.txt' + target2 = 'file2.txt' + target3 = 'file3.txt' repository.targets.add_target(target1) repository.targets.add_target(target2) # (5) Perform delegation. - repository.targets.delegate('role1', [role1_pubkey], [os.path.basename(target3)]) + repository.targets.delegate('role1', [role1_pubkey], [target3]) repository.targets('role1').load_signing_key(role1_privkey) # (6) Write repository. @@ -449,8 +449,8 @@ def test_writeall_no_files(self): target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'} target1_fileinfo = tuf.formats.make_fileinfo(555, target1_hashes) target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes) - target1 = os.path.join(targets_directory, 'file1.txt') - target2 = os.path.join(targets_directory, 'file2.txt') + target1 = 'file1.txt' + target2 = 'file2.txt' repository.targets.add_target(target1, fileinfo=target1_fileinfo) repository.targets.add_target(target2, fileinfo=target2_fileinfo) @@ -1064,11 +1064,10 @@ def test_call(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') # Create Targets() object to be tested. targets_object = repo_tool.Targets(self.targets_directory) - targets_object.delegate('role1', [public_key], [os.path.basename(target1_filepath)]) + targets_object.delegate('role1', [public_key], ['file1.txt']) self.assertTrue(isinstance(targets_object('role1'), repo_tool.Targets)) @@ -1087,18 +1086,16 @@ def test_get_delegated_rolenames(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') - target2_filepath = os.path.join(self.targets_directory, 'file2.txt') # Set needed arguments by delegate(). public_keys = [public_key] threshold = 1 self.targets_object.delegate('tuf', public_keys, [], threshold, False, - [target1_filepath], path_hash_prefixes=None) + ['file1.txt'], path_hash_prefixes=None) self.targets_object.delegate('warehouse', public_keys, [], threshold, False, - [target2_filepath], path_hash_prefixes=None) + ['file2.txt'], path_hash_prefixes=None) # Test that get_delegated_rolenames returns the expected delegations. expected_delegated_rolenames = ['targets/tuf/', 'targets/warehouse'] @@ -1112,11 +1109,11 @@ def test_target_files(self): # Verify the targets object initially contains zero target files. self.assertEqual(self.targets_object.target_files, {}) - target_filepath = os.path.join(self.targets_directory, 'file1.txt') + target_filepath = 'file1.txt' self.targets_object.add_target(target_filepath) self.assertEqual(len(self.targets_object.target_files), 1) - self.assertTrue('file1.txt' in self.targets_object.target_files) + self.assertTrue(target_filepath in self.targets_object.target_files) @@ -1127,12 +1124,11 @@ def test_delegations(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') # Set needed arguments by delegate(). public_keys = [public_key] rolename = 'tuf' - paths = [os.path.basename(target1_filepath)] + paths = ['file1.txt'] threshold = 1 self.targets_object.delegate(rolename, public_keys, paths, threshold, @@ -1160,42 +1156,53 @@ def test_add_target(self): # Verify the targets object initially contains zero target files. self.assertEqual(self.targets_object.target_files, {}) - target_filepath = os.path.join(self.targets_directory, 'file1.txt') + target_filepath = 'file1.txt' self.targets_object.add_target(target_filepath) self.assertEqual(len(self.targets_object.target_files), 1) - self.assertTrue('file1.txt' in self.targets_object.target_files) + self.assertTrue(target_filepath in self.targets_object.target_files) # Test the 'custom' parameter of add_target(), where additional information # may be specified for the target. - target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + target2_filepath = 'file2.txt' + target2_fullpath = os.path.join(self.targets_directory, target2_filepath) # The file permission of the target (octal number specifying file access # for owner, group, others (e.g., 0755). - octal_file_permissions = oct(os.stat(target2_filepath).st_mode)[4:] + octal_file_permissions = oct(os.stat(target2_fullpath).st_mode)[4:] custom_file_permissions = {'file_permissions': octal_file_permissions} self.targets_object.add_target(target2_filepath, custom_file_permissions) self.assertEqual(len(self.targets_object.target_files), 2) - self.assertTrue('file2.txt' in self.targets_object.target_files) + self.assertTrue(target2_filepath in self.targets_object.target_files) self.assertEqual(self.targets_object.target_files['file2.txt']['custom'], custom_file_permissions) # Attempt to replace target that has already been added. - octal_file_permissions2 = oct(os.stat(target_filepath).st_mode)[4:] + octal_file_permissions2 = oct(os.stat(target2_fullpath).st_mode)[4:] custom_file_permissions2 = {'file_permissions': octal_file_permissions} self.targets_object.add_target(target2_filepath, custom_file_permissions2) - self.assertEqual(self.targets_object.target_files['file2.txt']['custom'], - custom_file_permissions2) + self.assertEqual(self.targets_object.target_files[target2_filepath]['custom'], + custom_file_permissions2) # Test improperly formatted arguments. - self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.add_target, 3) - self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.add_target, 3, - custom_file_permissions) - self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.add_target, - target_filepath, 3) + self.assertRaises(securesystemslib.exceptions.FormatError, + self.targets_object.add_target, 3) + self.assertRaises(securesystemslib.exceptions.FormatError, + self.targets_object.add_target, 3, custom_file_permissions) + self.assertRaises(securesystemslib.exceptions.FormatError, + self.targets_object.add_target, target_filepath, 3) + + # A target path starting with a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_target, '/file1.txt') + # A target path using a backward slash as a separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_target, 'subdir\\file1.txt') + # Should not access the file system to check for non-existent files + self.targets_object.add_target('non-existent') @@ -1204,27 +1211,44 @@ def test_add_targets(self): # Verify the targets object initially contains zero target files. self.assertEqual(self.targets_object.target_files, {}) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') - target2_filepath = os.path.join(self.targets_directory, 'file2.txt') - target3_filepath = os.path.join(self.targets_directory, 'file3.txt') + target1_filepath = 'file1.txt' + target2_filepath = 'file2.txt' + target3_filepath = 'file3.txt' # Add a 'target1_filepath' duplicate for testing purposes # ('target1_filepath' should not be added twice.) target_files = \ - ['file1.txt', 'file2.txt', 'file3.txt', 'file1.txt'] + [target1_filepath, target2_filepath, 'file3.txt', target1_filepath] self.targets_object.add_targets(target_files) self.assertEqual(len(self.targets_object.target_files), 3) self.assertEqual(self.targets_object.target_files, - {'file1.txt': {}, 'file2.txt': {}, 'file3.txt': {}}) + {target1_filepath: {}, target2_filepath: {}, target3_filepath: {}}) # Attempt to replace targets that has already been added. self.targets_object.add_targets(target_files) # Test improperly formatted arguments. - self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.add_targets, 3) + self.assertRaises(securesystemslib.exceptions.FormatError, + self.targets_object.add_targets, 3) + + # A target path starting with a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_targets, ['/file1.txt']) + + # A target path using a backward slash as a separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_targets, ['subdir\\file1.txt']) + # Check if the addition of the whole list is rolled back in case of + # wrong target path + target_files = self.targets_object.target_files + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_targets, ['file4.txt', '/file5.txt']) + self.assertEqual(self.targets_object.target_files, target_files) + # Should not access the file system to check for non-existent files + self.targets_object.add_targets(['non-existent']) def test_remove_target(self): @@ -1233,25 +1257,19 @@ def test_remove_target(self): self.assertEqual(self.targets_object.target_files, {}) # Add a target so that remove_target() has something to remove. - target_filepath = os.path.join(self.targets_directory, 'file1.txt') + target_filepath = 'file1.txt' self.targets_object.add_target(target_filepath) # Test remove_target()'s behavior. - self.targets_object.remove_target(os.path.basename(target_filepath)) + self.targets_object.remove_target(target_filepath) self.assertEqual(self.targets_object.target_files, {}) # Test improperly formatted arguments. self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.remove_target, 3) - - # Test invalid filepath argument (i.e., non-existent or invalid file.) - self.assertRaises(securesystemslib.exceptions.Error, - self.targets_object.remove_target, - '/non-existent.txt') - # Test for filepath that hasn't been added yet. - target5_filepath = os.path.join(self.targets_directory, 'file5.txt') + target5_filepath = 'file5.txt' self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.remove_target, target5_filepath) @@ -1264,8 +1282,8 @@ def test_clear_targets(self): self.assertEqual(self.targets_object.target_files, {}) # Add targets, to be tested by clear_targets(). - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') - target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + target1_filepath = 'file1.txt' + target2_filepath = 'file2.txt' self.targets_object.add_targets([target1_filepath, target2_filepath]) self.targets_object.clear_targets() @@ -1280,14 +1298,11 @@ def test_delegate(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') - target2_filepath = os.path.join(self.targets_directory, 'file2.txt') - # Set needed arguments by delegate(). public_keys = [public_key] rolename = 'tuf' - list_of_targets = [target1_filepath, target2_filepath] + list_of_targets = ['file1.txt', 'file2.txt'] threshold = 1 paths = ['*'] path_hash_prefixes = ['e3a3', '8fae', 'd543'] @@ -1299,22 +1314,20 @@ def test_delegate(self): self.assertEqual(self.targets_object.get_delegated_rolenames(), ['tuf']) - # Test for targets that do not exist under the targets directory. - self.targets_object.revoke(rolename) - self.targets_object.delegate(rolename, public_keys, paths, threshold, - terminating=False, list_of_targets=['non-existent.txt'], - path_hash_prefixes=path_hash_prefixes) - for delegation in self.targets_object.delegations: - self.assertFalse('non-existent.txt' in delegation.target_files) - # Test for delegated paths that do not exist. # An exception should not be raised for non-existent delegated paths, since # these paths may not necessarily exist when the delegation is done, # and also because the delegated paths can be glob patterns. - self.targets_object.delegate(rolename, public_keys, ['non-existent.txt'], + self.targets_object.delegate(rolename, public_keys, ['non-existent'], threshold, terminating=False, list_of_targets=list_of_targets, path_hash_prefixes=path_hash_prefixes) + # Test for delegated targets that do not exist. + # An exception should not be raised for non-existent delegated targets, + # since at this point the file system should not be accessed yet + self.targets_object.delegate(rolename, public_keys, [], threshold, + terminating=False, list_of_targets=['non-existent.txt'], + path_hash_prefixes=path_hash_prefixes) # Test improperly formatted arguments. self.assertRaises(securesystemslib.exceptions.FormatError, @@ -1349,10 +1362,20 @@ def test_delegate(self): self.targets_object.delegate, rolename, public_keys, paths, threshold, list_of_targets, path_hash_prefixes) - # Test non-existent target paths. - self.assertRaises(securesystemslib.exceptions.Error, - self.targets_object.delegate, rolename, public_keys, [], threshold, - ['/non-existent'], path_hash_prefixes) + # A path or target starting with a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.delegate, rolename, public_keys, ['/*']) + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.delegate, rolename, public_keys, [], + list_of_targets=['/file1.txt']) + + # A path or target using '\' as a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.delegate, rolename, public_keys, ['subpath\\*']) + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.delegate, rolename, public_keys, [], + list_of_targets=['subpath\\file1.txt']) + @@ -1361,11 +1384,10 @@ def test_delegate_hashed_bins(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') # Set needed arguments by delegate_hashed_bins(). public_keys = [public_key] - list_of_targets = [target1_filepath] + list_of_targets = ['file1.txt'] # A helper function to check that the range of prefixes the role is @@ -1430,13 +1452,18 @@ def check_prefixes_match_range(): list_of_targets, public_keys, number_of_bins=3) # Invalid 'list_of_targets'. - # TODO - """ - invalid_targets = ['/non-existent'] - self.assertRaises(securesystemslib.exceptions.Error, + # A path or target starting with a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.delegate_hashed_bins, + ['/file1.txt'], public_keys, + number_of_bins=2) + + # A path or target using '\' as a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, self.targets_object.delegate_hashed_bins, - invalid_targets, public_keys, number_of_bins=16) - """ + ['subpath\\file1.txt'], public_keys, + number_of_bins=2) + def test_add_target_to_bin(self): # Test normal case. @@ -1445,7 +1472,7 @@ def test_add_target_to_bin(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'targets_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + target1_filepath = 'file1.txt' # Set needed arguments by delegate_hashed_bins(). public_keys = [public_key] @@ -1462,7 +1489,7 @@ def test_add_target_to_bin(self): # Add 'target1_filepath' and verify that the relative path of # 'target1_filepath' is added to the correct bin. - self.targets_object.add_target_to_bin(os.path.basename(target1_filepath), 16) + self.targets_object.add_target_to_bin(target1_filepath, 16) for delegation in self.targets_object.delegations: if delegation.rolename == '5': @@ -1477,27 +1504,27 @@ def test_add_target_to_bin(self): self.assertRaises(securesystemslib.exceptions.Error, empty_targets_role.add_target_to_bin, - os.path.basename(target1_filepath), 16) + target1_filepath, 16) # Test for a required hashed bin that does not exist. self.targets_object.revoke('5') self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_target_to_bin, - os.path.basename(target1_filepath), 16) + target1_filepath, 16) # Test adding a target with fileinfo target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'} target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes) - target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + target2_filepath = 'file2.txt' - self.targets_object.add_target_to_bin(os.path.basename(target2_filepath), 16, fileinfo=target2_fileinfo) + self.targets_object.add_target_to_bin(target2_filepath, 16, fileinfo=target2_fileinfo) for delegation in self.targets_object.delegations: if delegation.rolename == '0': - self.assertTrue('file2.txt' in delegation.target_files) + self.assertTrue(target2_filepath in delegation.target_files) else: - self.assertFalse('file2.txt' in delegation.target_files) + self.assertFalse(target2_filepath in delegation.target_files) # Test improperly formatted argument. self.assertRaises(securesystemslib.exceptions.FormatError, @@ -1511,7 +1538,7 @@ def test_remove_target_from_bin(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'targets_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + target1_filepath = 'file1.txt' # Set needed arguments by delegate_hashed_bins(). public_keys = [public_key] @@ -1528,7 +1555,7 @@ def test_remove_target_from_bin(self): # Add 'target1_filepath' and verify that the relative path of # 'target1_filepath' is added to the correct bin. - self.targets_object.add_target_to_bin(os.path.basename(target1_filepath), 16) + self.targets_object.add_target_to_bin(target1_filepath, 16) for delegation in self.targets_object.delegations: if delegation.rolename == '5': @@ -1539,10 +1566,10 @@ def test_remove_target_from_bin(self): # Test the remove_target_from_bin() method. Verify that 'target1_filepath' # has been removed. - self.targets_object.remove_target_from_bin(os.path.basename(target1_filepath), 16) + self.targets_object.remove_target_from_bin(target1_filepath, 16) for delegation in self.targets_object.delegations: - self.assertTrue('file1.txt' not in delegation.target_files) + self.assertTrue(target1_filepath not in delegation.target_files) # Test improperly formatted argument. @@ -1551,7 +1578,7 @@ def test_remove_target_from_bin(self): # Invalid target file path argument. self.assertRaises(securesystemslib.exceptions.Error, - self.targets_object.remove_target_from_bin, '/non-existent', 16) + self.targets_object.remove_target_from_bin, 'non-existent', 16) @@ -1611,7 +1638,7 @@ def test_add_paths(self): # delegated paths are not added to a child role that was not requested). self.targets_object.delegate('junk_role', public_keys, []) - paths = ['/tuf_files/*'] + paths = ['tuf_files/*'] self.targets_object.add_paths(paths, 'tuf') # Retrieve 'targets_object' roleinfo, and verify the roleinfo contains the @@ -1620,7 +1647,7 @@ def test_add_paths(self): 'test_repository') delegated_role = targets_object_roleinfo['delegations']['roles'][0] - self.assertEqual(['/tuf_files/*'], delegated_role['paths']) + self.assertEqual(['tuf_files/*'], delegated_role['paths']) # Try to add a delegated path that has already been set. # add_paths() should simply log a message in this case. @@ -1638,14 +1665,18 @@ def test_add_paths(self): self.assertRaises(securesystemslib.exceptions.Error, self.targets_object.add_paths, paths, 'non_delegated_rolename') + # A path starting with a directory separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_paths, ['/tuf_files/*'], 'tuf') + + # A path using a backward slash as a separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object.add_paths, ['tuf_files\\*'], 'tuf') + # add_paths() should not raise an exception for non-existent # paths, which it previously did. - self.targets_object.add_paths(['/non-existent'], 'tuf') + self.targets_object.add_paths(['non-existent'], 'tuf') - # add_paths() should not raise an exception for paths that - # are not located in the repository's targets directory. - repository_directory = os.path.join('repository_data', 'repository') - self.targets_object.add_paths([repository_directory], 'tuf') @@ -1655,12 +1686,11 @@ def test_revoke(self): keystore_directory = os.path.join('repository_data', 'keystore') public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub') public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath) - target1_filepath = os.path.join(self.targets_directory, 'file1.txt') # Set needed arguments by delegate(). public_keys = [public_key] rolename = 'tuf' - paths = [target1_filepath] + paths = ['file1.txt'] threshold = 1 self.targets_object.delegate(rolename, public_keys, [], threshold, False, @@ -1676,6 +1706,33 @@ def test_revoke(self): + def test_check_path(self): + # Test that correct path does not raise exception: using '/' as a separator + # and does not start with a directory separator + self.targets_object._check_path('file1.txt') + + # Test that non-existent path does not raise exception (_check_path + # checks only the path string for compliance) + self.targets_object._check_path('non-existent.txt') + self.targets_object._check_path('subdir/non-existent') + + # Test improperly formatted pathname argument. + self.assertRaises(securesystemslib.exceptions.FormatError, + self.targets_object._check_path, 3) + + # Test invalid pathname + # Starting with os separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object._check_path, '/file1.txt') + + # Starting with Windows-style separator + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object._check_path, '\\file1.txt') + + # Using Windows-style separator ('\') + self.assertRaises(tuf.exceptions.InvalidNameError, + self.targets_object._check_path, 'subdir\\non-existent') + class TestRepositoryToolFunctions(unittest.TestCase): diff --git a/tests/test_root_versioning_integration.py b/tests/test_root_versioning_integration.py index 0e7bf22543..1eeb08d6ff 100755 --- a/tests/test_root_versioning_integration.py +++ b/tests/test_root_versioning_integration.py @@ -165,15 +165,15 @@ def test_root_role_versioning(self): repository.timestamp.load_signing_key(timestamp_privkey) # (4) Add target files. - target1 = os.path.join(targets_directory, 'file1.txt') - target2 = os.path.join(targets_directory, 'file2.txt') - target3 = os.path.join(targets_directory, 'file3.txt') + target1 = 'file1.txt' + target2 = 'file2.txt' + target3 = 'file3.txt' repository.targets.add_target(target1) repository.targets.add_target(target2) # (5) Perform delegation. - repository.targets.delegate('role1', [role1_pubkey], [os.path.basename(target3)]) + repository.targets.delegate('role1', [role1_pubkey], [target3]) repository.targets('role1').load_signing_key(role1_privkey) # (6) Write repository. diff --git a/tests/test_tutorial.py b/tests/test_tutorial.py index c73a6c737b..e3102c66cc 100644 --- a/tests/test_tutorial.py +++ b/tests/test_tutorial.py @@ -223,14 +223,10 @@ def test_tutorial(self): repository = load_repository('repository') - list_of_targets = repository.get_filepaths_in_directory( - os.path.join('repository', 'targets'), recursive_walk=False, followlinks=True) - - self.assertListEqual(sorted(list_of_targets), [ - os.path.abspath(os.path.join('repository', 'targets', 'file1.txt')), - os.path.abspath(os.path.join('repository', 'targets', 'file2.txt')), - os.path.abspath(os.path.join('repository', 'targets', 'file3.txt'))]) + # TODO: replace the hard-coded list of targets with a helper + # method that returns a list of normalized relative target paths + list_of_targets = ['file1.txt', 'file2.txt', 'file3.txt'] repository.targets.add_targets(list_of_targets) @@ -238,16 +234,16 @@ def test_tutorial(self): self.assertTrue('file2.txt' in repository.targets.target_files) self.assertTrue('file3.txt' in repository.targets.target_files) - - target4_filepath = os.path.abspath(os.path.join( - 'repository', 'targets', 'myproject', 'file4.txt')) - octal_file_permissions = oct(os.stat(target4_filepath).st_mode)[4:] + target4_filepath = 'myproject/file4.txt' + target4_abspath = os.path.abspath(os.path.join( + 'repository', 'targets', target4_filepath)) + octal_file_permissions = oct(os.stat(target4_abspath).st_mode)[4:] custom_file_permissions = {'file_permissions': octal_file_permissions} repository.targets.add_target(target4_filepath, custom_file_permissions) # Note that target filepaths specified in the repo use '/' even on Windows. # (This is important to make metadata platform-independent.) self.assertTrue( - os.path.join('myproject/file4.txt') in repository.targets.target_files) + os.path.join(target4_filepath) in repository.targets.target_files) # Skipping user entry of password @@ -346,8 +342,7 @@ def test_tutorial(self): # ----- Tutorial Section: Delegate to Hashed Bins repository.targets('unclaimed').remove_target("myproject/file4.txt") - targets = repository.get_filepaths_in_directory( - os.path.join('repository', 'targets', 'myproject'), recursive_walk=True) + targets = ['myproject/file4.txt'] # Patch logger to assert that it accurately logs the output of hashed bin # delegation. The logger is called multiple times, first with info level diff --git a/tests/test_updater.py b/tests/test_updater.py index 2947b4d121..d9655aaa80 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -863,7 +863,7 @@ def test_3__update_metadata_if_changed(self): # Modify one target file on the remote repository. repository = repo_tool.load_repository(self.repository_directory) - target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt') + target3 = 'file3.txt' repository.targets.add_target(target3) repository.root.version = repository.root.version + 1 @@ -936,7 +936,7 @@ def test_4_refresh(self): unsafely_update_root_if_necessary=False) repository = repo_tool.load_repository(self.repository_directory) - target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt') + target3 = 'file3.txt' repository.targets.add_target(target3) repository.targets.load_signing_key(self.role_keys['targets']['private']) @@ -969,8 +969,6 @@ def test_4_refresh(self): # Verify that the client's metadata was updated. targets_metadata = self.repository_updater.metadata['current']['targets'] - targets_directory = os.path.join(self.repository_directory, 'targets') - target3 = target3[len(targets_directory) + 1:] self.assertTrue(target3 in targets_metadata['targets']) # Verify the expected version numbers of the updated roles. @@ -1142,12 +1140,13 @@ def test_6_get_one_valid_targetinfo(self): # Test updater.get_one_valid_targetinfo() backtracking behavior (enabled by # default.) targets_directory = os.path.join(self.repository_directory, 'targets') - foo_directory = os.path.join(targets_directory, 'foo') + os.makedirs(os.path.join(targets_directory, 'foo')) + + foo_package = 'foo/foo1.1.tar.gz' foo_pattern = 'foo/foo*.tar.gz' - os.makedirs(foo_directory) - foo_package = os.path.join(foo_directory, 'foo1.1.tar.gz') - with open(foo_package, 'wb') as file_object: + foo_fullpath = os.path.join(targets_directory, foo_package) + with open(foo_fullpath, 'wb') as file_object: file_object.write(b'new release') # Modify delegations on the remote repository to test backtracking behavior. @@ -1452,7 +1451,7 @@ def test_7_updated_targets(self): length, hashes = securesystemslib.util.get_file_details(target1) - repository.targets.add_target(target1) + repository.targets.add_target(os.path.basename(target1)) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) @@ -1461,7 +1460,7 @@ def test_7_updated_targets(self): length, hashes = securesystemslib.util.get_file_details(target1) - repository.targets.add_target(target1) + repository.targets.add_target(os.path.basename(target1)) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) @@ -2065,7 +2064,7 @@ def test_get_valid_targetinfo(self): repository.targets.remove_target(os.path.basename(target1)) custom_field = {"custom": "my_custom_data"} - repository.targets.add_target(target1, custom_field) + repository.targets.add_target(os.path.basename(target1), custom_field) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) @@ -2091,7 +2090,7 @@ def test_get_valid_targetinfo(self): repository.targets.remove_target(os.path.basename(target1)) - repository.targets.add_target(target1) + repository.targets.add_target(os.path.basename(target1)) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 6c77af374c..262e2f16d8 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1319,7 +1319,7 @@ def generate_targets_metadata(targets_directory, target_files, version, raise securesystemslib.exceptions.Error('use_existing_hashes option set' ' but fileinfo\'s length is not set') - filedict[target.replace('\\', '/').lstrip('/')] = fileinfo + filedict[target] = fileinfo else: filedict = _generate_targets_fileinfo(target_files, targets_directory, @@ -1393,7 +1393,7 @@ def _generate_targets_fileinfo(target_files, targets_directory, # the target's fileinfo dictionary) if specified here. custom_data = fileinfo.get('custom', None) - filedict[relative_targetpath.replace('\\', '/').lstrip('/')] = \ + filedict[relative_targetpath] = \ get_metadata_fileinfo(target_path, custom_data) # Copy 'target_path' to 'digest_target' if consistent hashing is enabled. diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index e036a109f2..a5b24d517e 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -1854,6 +1854,9 @@ def add_paths(self, paths, child_rolename): securesystemslib.exceptions.Error, if 'child_rolename' has not been delegated yet. + tuf.exceptions.InvalidNameError, if any path in 'paths' does not match + pattern. + Modifies this Targets' delegations field. @@ -1868,10 +1871,6 @@ def add_paths(self, paths, child_rolename): securesystemslib.formats.PATHS_SCHEMA.check_match(paths) tuf.formats.ROLENAME_SCHEMA.check_match(child_rolename) - # A list of relative and verified paths or glob patterns to be added to the - # child role's entry in the parent's delegations field. - relative_paths = [] - # Ensure that 'child_rolename' exists, otherwise it will not have an entry # in the parent role's delegations field. if not tuf.roledb.role_exists(child_rolename, self._repository_name): @@ -1879,17 +1878,11 @@ def add_paths(self, paths, child_rolename): ' not exist.') for path in paths: - # Are the delegated paths or glob patterns located in the repository's - # targets directory? If so, log it - the paths don't necessarily need to - # be located in the repository's directory. Append a trailing path - # separator with os.path.join(path, ''). - targets_directory = os.path.join(self._targets_directory, '') - if not path.startswith(targets_directory): - logger.debug(repr(path) + ' is not located in the' - ' repository\'s targets' - ' directory: ' + repr(self._targets_directory)) - - relative_paths.append(path) + # Check if the delegated paths or glob patterns are relative and use + # forward slash as a separator or raise an exception. Paths' existence + # on the file system is not verified. If the path is incorrect, + # the targetfile won't be matched successfully during a client update. + self._check_path(path) # Get the current role's roleinfo, so that its delegations field can be # updated. @@ -1898,7 +1891,7 @@ def add_paths(self, paths, child_rolename): # Update the delegated paths of 'child_rolename' to add relative paths. for role in roleinfo['delegations']['roles']: if role['name'] == child_rolename: - for relative_path in relative_paths: + for relative_path in paths: if relative_path not in role['paths']: role['paths'].append(relative_path) @@ -1915,13 +1908,17 @@ def add_paths(self, paths, child_rolename): def add_target(self, filepath, custom=None, fileinfo=None): """ - Add a filepath (must be located in the repository's targets directory) to - the Targets object. + Add a filepath (must be relative to the repository's targets directory) + to the Targets object. + + This method does not access the file system. 'filepath' must already + exist on the file system. - This method does not actually create 'filepath' on the file system. - 'filepath' must already exist on the file system. If 'filepath' - has already been added, it will be replaced with any new file - or 'custom' information. + If 'filepath' does not exist the file will still be added to 'roleinfo'. + Only later calls to write() and writeall() will fail. + + If 'filepath' has already been added, it will be replaced with any new + file or 'custom' information. >>> >>> @@ -1929,8 +1926,8 @@ def add_target(self, filepath, custom=None, fileinfo=None): filepath: - The path of the target file. It must exist in the repository's targets - directory. + The path of the target file. It must be relative to the repository's + targets directory. custom: An optional object providing additional information about the file. @@ -1943,6 +1940,8 @@ def add_target(self, filepath, custom=None, fileinfo=None): securesystemslib.exceptions.FormatError, if 'filepath' is improperly formatted. + tuf.exceptions.InvalidNameError, if 'filepath' does not match pattern. + Adds 'filepath' to this role's list of targets. This role's 'tuf.roledb.py' entry is also updated. @@ -1955,7 +1954,7 @@ def add_target(self, filepath, custom=None, fileinfo=None): # Ensure the arguments have the appropriate number of objects and object # types, and that all dict keys are properly named. Raise # 'securesystemslib.exceptions.FormatError' if there is a mismatch. - securesystemslib.formats.PATH_SCHEMA.check_match(filepath) + tuf.formats.RELPATH_SCHEMA.check_match(filepath) if fileinfo and custom: raise securesystemslib.exceptions.Error("Can only take one of" @@ -1966,43 +1965,41 @@ def add_target(self, filepath, custom=None, fileinfo=None): if custom is None: custom = {} - else: tuf.formats.CUSTOM_SCHEMA.check_match(custom) - filepath = os.path.join(self._targets_directory, filepath) - # Add 'filepath' (i.e., relative to the targets directory) to the role's # list of targets. 'filepath' will not be verified as an allowed path # according to some delegating role. Not verifying 'filepath' here allows - # freedom to add targets and parent restrictions in any order, minimize the - # number of times these checks are performed, and allow any role to - # delegate trust of packages to this Targes role. + # freedom to add targets and parent restrictions in any order, minimize + # the number of times these checks are performed, and allow any role to + # delegate trust of packages to this Targets role. - # Update the role's 'tuf.roledb.py' entry and avoid duplicates. Make - # sure to exclude the path separator when calculating the length of the - # targets directory. - targets_directory_length = len(self._targets_directory) + 1 + # Check if the target is relative and uses forward slash as a separator + # or raise an exception. File's existence on the file system is not + # verified. If the file does not exist relative to the targets directory, + # later calls to write() will fail. + self._check_path(filepath) + + # Update the role's 'tuf.roledb.py' entry and avoid duplicates. roleinfo = tuf.roledb.get_roleinfo(self._rolename, self._repository_name) - relative_path = filepath[targets_directory_length:].replace('\\', '/') - if relative_path not in roleinfo['paths']: - logger.debug('Adding new target: ' + repr(relative_path)) + if filepath not in roleinfo['paths']: + logger.debug('Adding new target: ' + repr(filepath)) else: - logger.debug('Replacing target: ' + repr(relative_path)) + logger.debug('Replacing target: ' + repr(filepath)) if fileinfo: - roleinfo['paths'].update({relative_path: fileinfo}) + roleinfo['paths'].update({filepath: fileinfo}) else: - roleinfo['paths'].update({relative_path: {'custom': custom}}) + roleinfo['paths'].update({filepath: {'custom': custom}}) tuf.roledb.update_roleinfo(self._rolename, roleinfo, repository_name=self._repository_name) - def add_targets(self, list_of_targets): """ @@ -2023,6 +2020,9 @@ def add_targets(self, list_of_targets): securesystemslib.exceptions.FormatError, if the arguments are improperly formatted. + tuf.exceptions.InvalidNameError, if any target in 'list_of_targets' + does not match pattern. + This Targets' roleinfo is updated with the paths in 'list_of_targets'. @@ -2036,29 +2036,20 @@ def add_targets(self, list_of_targets): # Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch. tuf.formats.RELPATHS_SCHEMA.check_match(list_of_targets) - # Update the tuf.roledb entry. - targets_directory_length = len(self._targets_directory) - relative_list_of_targets = [] - - # Ensure the paths in 'list_of_targets' are valid and are located in the - # repository's targets directory. The paths of 'list_of_targets' will be - # verified as allowed paths according to this Targets parent role when - # write() or writeall() is called. Not verifying filepaths here allows the - # freedom to add targets and parent restrictions in any order, and minimize - # the number of times these checks are performed. + # Ensure the paths in 'list_of_targets' are relative and use forward slash + # as a separator or raise an exception. The paths of 'list_of_targets' + # will be verified as existing and allowed paths according to this Targets + # parent role when write() or writeall() is called. Not verifying + # filepaths here allows the freedom to add targets and parent restrictions + # in any order and minimize the number of times these checks are performed. for target in list_of_targets: - filepath = os.path.join(self._targets_directory, target) - - relative_list_of_targets.append( - filepath[targets_directory_length + 1:].replace('\\', '/')) - + self._check_path(target) # Update this Targets 'tuf.roledb.py' entry. roleinfo = tuf.roledb.get_roleinfo(self._rolename, self._repository_name) - for relative_target in relative_list_of_targets: + for relative_target in list_of_targets: if relative_target not in roleinfo['paths']: logger.debug('Adding new target: ' + repr(relative_target)) - else: logger.debug('Replacing target: ' + repr(relative_target)) roleinfo['paths'].update({relative_target: {}}) @@ -2284,9 +2275,10 @@ def delegate(self, rolename, public_keys, paths, threshold=1, securesystemslib.exceptions.FormatError, if any of the arguments are improperly formatted. - securesystemslib.exceptions.Error, if the delegated role already exists - or if any target in 'list_of_targets' is an invalid path (i.e., not - located in the repository's targets directory). + securesystemslib.exceptions.Error, if the delegated role already exists. + + tuf.exceptions.InvalidNameError, if any path in 'paths' or target in + 'list_of_targets' does not match pattern. A new Target object is created for 'rolename' that is accessible to the @@ -2320,26 +2312,22 @@ def delegate(self, rolename, public_keys, paths, threshold=1, # Ensure the paths of 'list_of_targets' are located in the repository's # targets directory. relative_targetpaths = {} - targets_directory_length = len(self._targets_directory) if list_of_targets: for target in list_of_targets: - target = os.path.join(self._targets_directory, target) - if not os.path.isfile(target): - logger.warning(repr(target) + ' does not exist in the' - ' repository\'s targets directory: ' + repr(self._targets_directory)) - - relative_targetpaths.update({target[targets_directory_length:]: {}}) + # Check if the target path is relative or raise an exception. File's + # existence on the file system is not verified. If the file does not + # exist relative to the targets directory, later calls to write() + # will fail. + self._check_path(target) + relative_targetpaths.update({target: {}}) for path in paths: - if path.startswith(os.sep): - raise tuf.exceptions.Error('One of the given paths contains a leading' - ' path separator: ' + repr(path) + '. All delegated paths should' - ' be relative to the repo\'s targets directory.') - - if not path.startswith(self._targets_directory + os.sep): - logger.warning(repr(path) + ' is not located in the repository\'s' - ' targets directory: ' + repr(self._targets_directory)) + # Check if the delegated paths or glob patterns are relative or + # raise an exception. Paths' existence on the file system is not + # verified. If the path is incorrect, the targetfile won't be matched + # successfully during a client update. + self._check_path(path) # The new targets object is added as an attribute to this Targets object. new_targets_object = self._create_delegated_target(rolename, keyids, @@ -2492,9 +2480,12 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, formatted. securesystemslib.exceptions.Error, if 'number_of_bins' is not a power of - 2, or one of the targets in 'list_of_targets' is not located in the + 2, or one of the targets in 'list_of_targets' is not relative to the repository's targets directory. + tuf.exceptions.InvalidNameError, if any target in 'list_of_targets' + does not match pattern. + Delegates multiple target roles from the current parent role. @@ -2539,11 +2530,16 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, ordered_roles.append(role) for target_path in list_of_targets: + # Check if the target path is relative or raise an exception. File's + # existence on the file system is not verified. If the file does not + # exist relative to the targets directory, later calls to write() and + # writeall() will fail. + self._check_path(target_path) # Determine the hash prefix of 'target_path' by computing the digest of # its path relative to the targets directory. # We must hash a target path as it appears in the metadata - hash_prefix = _get_hash(target_path.replace('\\', '/').lstrip('/'))[:prefix_length] + hash_prefix = _get_hash(target_path)[:prefix_length] ordered_roles[int(hash_prefix, 16) // bin_size]["target_paths"].append(target_path) keyids, keydict = _keys_to_keydict(keys_of_hashed_bins) @@ -2566,9 +2562,8 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, # operation at the end of the iteration. relative_paths = {} - targets_directory_length = len(self._targets_directory) for path in bin_role['target_paths']: - relative_paths.update({path[targets_directory_length:]: {}}) + relative_paths.update({path: {}}) # Delegate from the "unclaimed" targets role to each 'bin_role' target = self._create_delegated_target(bin_role['name'], keyids, 1, @@ -2754,6 +2749,44 @@ def delegations(self): + + def _check_path(self, pathname): + """ + + Check if a path matches the definition of a PATHPATTERN or a + TARGETPATH (uses the forward slash (/) as directory separator and + does not start with a directory separator). Checks are performed only + on the path string, without accessing the file system. + + + pathname: + A file path or a glob pattern. + + + securesystemslib.exceptions.FormatError, if 'pathname' is improperly + formatted. + + tuf.exceptions.InvalidNameError, if 'pathname' does not match pattern. + + + None. + """ + + tuf.formats.RELPATH_SCHEMA.check_match(pathname) + + if '\\' in pathname: + raise tuf.exceptions.InvalidNameError('Path ' + repr(pathname) + + ' does not use the forward slash (/) as directory separator.') + + if pathname.startswith('/'): + raise tuf.exceptions.InvalidNameError('Path ' + repr(pathname) + + ' starts with a directory separator. All paths should be relative' + ' to targets directory.') + + + + + def _keys_to_keydict(keys): """ Iterate over a list of keys and return a list of keyids and a dict mapping @@ -2777,6 +2810,7 @@ def _keys_to_keydict(keys): + def _get_hash(target_filepath): """ @@ -2805,7 +2839,6 @@ def _create_bin_name(low, high, prefix_len): Create a string name of a delegated hash bin, where name will be a range of zero-padded (up to prefix_len) strings i.e. for low=00, high=07, prefix_len=3 the returned name would be '000-007'. - """ if low == high: return "{low:0{len}x}".format(low=low, len=prefix_len)