diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 3e11082ba8..13bb55003a 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -12,7 +12,7 @@ import unittest import copy -from typing import Dict, Callable +from typing import Dict from tests import utils @@ -31,28 +31,12 @@ logger = logging.getLogger(__name__) -# DataSet is only here so type hints can be used: -# It is a dict of name to test dict -DataSet = Dict[str, str] - -# Test runner decorator: Runs the test as a set of N SubTests, -# (where N is number of items in dataset), feeding the actual test -# function one test case at a time -def run_sub_tests_with_dataset(dataset: DataSet): - def real_decorator(function: Callable[["TestSerialization", str], None]): - def wrapper(test_cls: "TestSerialization"): - for case, data in dataset.items(): - with test_cls.subTest(case=case): - function(test_cls, data) - return wrapper - return real_decorator - class TestSerialization(unittest.TestCase): # Snapshot instances with meta = {} are valid, but for a full valid # repository it's required that meta has at least one element inside it. - invalid_signed: DataSet = { + invalid_signed: utils.DataSet = { "no _type": '{"spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', @@ -81,14 +65,14 @@ class TestSerialization(unittest.TestCase): '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', } - @run_sub_tests_with_dataset(invalid_signed) + @utils.run_sub_tests_with_dataset(invalid_signed) def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(copy.deepcopy(case_dict)) - valid_keys: DataSet = { + valid_keys: utils.DataSet = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', "unrecognized field": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ @@ -97,14 +81,14 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): "keyval": {"public": "foo", "foo": "bar"}}', } - @run_sub_tests_with_dataset(valid_keys) + @utils.run_sub_tests_with_dataset(valid_keys) def test_valid_key_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: DataSet = { + invalid_keys: utils.DataSet = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', "no scheme": '{"keyid": "id", "keytype": "rsa", "keyval": {"public": "foo"}}', @@ -115,14 +99,14 @@ def test_valid_key_serialization(self, test_case_data: str): "keyval wrong type": '{"keyid": "id", "keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": 1}', } - @run_sub_tests_with_dataset(invalid_keys) + @utils.run_sub_tests_with_dataset(invalid_keys) def test_invalid_key_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, KeyError)): keyid = case_dict.pop("keyid") Key.from_dict(keyid, copy.copy(case_dict)) - invalid_roles: DataSet = { + invalid_roles: utils.DataSet = { "no threshold": '{"keyids": ["keyid"]}', "no keyids": '{"threshold": 3}', "wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}', @@ -130,28 +114,28 @@ def test_invalid_key_serialization(self, test_case_data: Dict[str, str]): "duplicate keyids": '{"keyids": ["keyid", "keyid"], "threshold": 3}', } - @run_sub_tests_with_dataset(invalid_roles) + @utils.run_sub_tests_with_dataset(invalid_roles) def test_invalid_role_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(copy.deepcopy(case_dict)) - valid_roles: DataSet = { + valid_roles: utils.DataSet = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', "empty keyids": '{"keyids": [], "threshold": 1}', "unrecognized field": '{"keyids": ["keyid"], "threshold": 3, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_roles) + @utils.run_sub_tests_with_dataset(valid_roles) def test_role_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: DataSet = { + valid_roots: utils.DataSet = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ "keys": { \ @@ -178,14 +162,14 @@ def test_role_serialization(self, test_case_data: str): "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_roots) + @utils.run_sub_tests_with_dataset(valid_roots) def test_root_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_metafiles: DataSet = { + invalid_metafiles: utils.DataSet = { "wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}', "length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}', "length below 0": '{"version": 1, "length": -1, "hashes": {"sha256" : "abc"}}', @@ -194,14 +178,14 @@ def test_root_serialization(self, test_case_data: str): "hashes values wrong type": '{"version": 1, "length": 1, "hashes": {"sha256": 1}}', } - @run_sub_tests_with_dataset(invalid_metafiles) + @utils.run_sub_tests_with_dataset(invalid_metafiles) def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(copy.deepcopy(case_dict)) - valid_metafiles: DataSet = { + valid_metafiles: utils.DataSet = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', "no hashes": '{"length": 12, "version": 1}', @@ -209,38 +193,38 @@ def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): "many hashes": '{"hashes": {"sha256" : "abc", "sha512": "cde"}, "length": 12, "version": 1}', } - @run_sub_tests_with_dataset(valid_metafiles) + @utils.run_sub_tests_with_dataset(valid_metafiles) def test_metafile_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) metafile = MetaFile.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, metafile.to_dict()) - invalid_timestamps: DataSet = { + invalid_timestamps: utils.DataSet = { "no metafile": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z"}', } - @run_sub_tests_with_dataset(invalid_timestamps) + @utils.run_sub_tests_with_dataset(invalid_timestamps) def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(copy.deepcopy(case_dict)) - valid_timestamps: DataSet = { + valid_timestamps: utils.DataSet = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', "unrecognized field": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_timestamps) + @utils.run_sub_tests_with_dataset(valid_timestamps) def test_timestamp_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: DataSet = { + valid_snapshots: utils.DataSet = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ "file1.txt": {"hashes": {"sha256" : "abc"}, "version": 1}, \ @@ -253,14 +237,14 @@ def test_timestamp_serialization(self, test_case_data: str): "meta": { "file.txt": { "hashes": {"sha256" : "abc"}, "version": 1 }}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_snapshots) + @utils.run_sub_tests_with_dataset(valid_snapshots) def test_snapshot_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: DataSet = { + valid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ @@ -279,14 +263,14 @@ def test_snapshot_serialization(self, test_case_data: str): "terminating": false, "threshold": 1}', } - @run_sub_tests_with_dataset(valid_delegated_roles) + @utils.run_sub_tests_with_dataset(valid_delegated_roles) def test_delegated_role_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: DataSet = { + invalid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', @@ -295,14 +279,14 @@ def test_delegated_role_serialization(self, test_case_data: str): "paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}', } - @run_sub_tests_with_dataset(invalid_delegated_roles) + @utils.run_sub_tests_with_dataset(invalid_delegated_roles) def test_invalid_delegated_role_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) with self.assertRaises(ValueError): DelegatedRole.from_dict(copy.copy(case_dict)) - invalid_delegations: DataSet = { + invalid_delegations: utils.DataSet = { "empty delegations": '{}', "bad keys": '{"keys": "foo", \ "roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}', @@ -316,14 +300,14 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str): }', } - @run_sub_tests_with_dataset(invalid_delegations) + @utils.run_sub_tests_with_dataset(invalid_delegations) def test_invalid_delegation_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(copy.deepcopy(case_dict)) - valid_delegations: DataSet = { + valid_delegations: utils.DataSet = { "all": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ @@ -341,28 +325,28 @@ def test_invalid_delegation_serialization(self, test_case_data: str): }', } - @run_sub_tests_with_dataset(valid_delegations) + @utils.run_sub_tests_with_dataset(valid_delegations) def test_delegation_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: DataSet = { + invalid_targetfiles: utils.DataSet = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}' # The remaining cases are the same as for invalid_hashes and # invalid_length datasets. } - @run_sub_tests_with_dataset(invalid_targetfiles) + @utils.run_sub_tests_with_dataset(invalid_targetfiles) def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises(KeyError): TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt") - valid_targetfiles: DataSet = { + valid_targetfiles: utils.DataSet = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', "no custom": '{"length": 12, "hashes": {"sha256" : "abc"}}', @@ -370,14 +354,14 @@ def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): "custom" : {"foo": "bar"}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_targetfiles) + @utils.run_sub_tests_with_dataset(valid_targetfiles) def test_targetfile_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: DataSet = { + valid_targets: utils.DataSet = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} }, \ @@ -403,7 +387,7 @@ def test_targetfile_serialization(self, test_case_data: str): "targets": {}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_targets) + @utils.run_sub_tests_with_dataset(valid_targets) def test_targets_serialization(self, test_case_data): case_dict = json.loads(test_case_data) targets = Targets.from_dict(copy.deepcopy(case_dict)) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index f0e08f4680..d42c0ccebd 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -10,7 +10,7 @@ import os import sys import tempfile -from typing import Optional +from typing import Optional, Tuple from tuf.exceptions import UnsignedMetadataError import unittest @@ -19,6 +19,7 @@ from tests import utils from tests.repository_simulator import RepositorySimulator + class TestUpdater(unittest.TestCase): # set dump_dir to trigger repository state dumps dump_dir:Optional[str] = None @@ -79,39 +80,50 @@ def test_refresh(self): self._run_refresh() - def test_targets(self): - targets = { - "targetpath": b"content", - "åäö": b"more content" - } + targets: utils.DataSet = { + "standard case": ("targetpath", b"content", "targetpath"), + "non-asci case": ("åäö", b"more content", "%C3%A5%C3%A4%C3%B6"), + "subdirectory case": ("a/b/c/targetpath", b"dir target content", "a%2Fb%2Fc%2Ftargetpath"), + } + + @utils.run_sub_tests_with_dataset(targets) + def test_targets(self, test_case_data: Tuple[str, bytes, str]): + targetpath, content, encoded_path = test_case_data + # target does not exist yet + updater = self._run_refresh() + self.assertIsNone(updater.get_one_valid_targetinfo(targetpath)) # Add targets to repository self.sim.targets.version += 1 - for targetpath, content in targets.items(): - self.sim.add_target("targets", content, targetpath) + self.sim.add_target("targets", content, targetpath) self.sim.update_snapshot() updater = self._run_refresh() - for targetpath, content in targets.items(): - # target now exists, is not in cache yet - file_info = updater.get_one_valid_targetinfo(targetpath) - self.assertIsNotNone(file_info) - self.assertEqual( - updater.updated_targets([file_info], self.targets_dir), - [file_info] - ) - - # download target, assert it is in cache and content is correct - local_path = updater.download_target(file_info, self.targets_dir) - self.assertEqual( - updater.updated_targets([file_info], self.targets_dir), [] - ) - self.assertTrue(local_path.startswith(self.targets_dir)) - with open(local_path, "rb") as f: - self.assertEqual(f.read(), content) - - # TODO: run the same download tests for target paths like "dir/file2") - # This currently fails because issue #1576 + # target now exists, is not in cache yet + file_info = updater.get_one_valid_targetinfo(targetpath) + self.assertIsNotNone(file_info) + self.assertEqual( + updater.updated_targets([file_info], self.targets_dir), + [file_info] + ) + + # Assert consistent_snapshot is True and downloaded targets have prefix. + self.assertTrue(self.sim.root.consistent_snapshot) + self.assertTrue(updater.config.prefix_targets_with_hash) + # download target, assert it is in cache and content is correct + local_path = updater.download_target(file_info, self.targets_dir) + self.assertEqual( + updater.updated_targets([file_info], self.targets_dir), [] + ) + self.assertTrue(local_path.startswith(self.targets_dir)) + with open(local_path, "rb") as f: + self.assertEqual(f.read(), content) + + # Assert that the targetpath was URL encoded as expected. + encoded_absolute_path = os.path.join(self.targets_dir, encoded_path) + self.assertEqual(local_path, encoded_absolute_path) + + def test_keys_and_signatures(self): """Example of the two trickiest test areas: keys and root updates""" diff --git a/tests/utils.py b/tests/utils.py index 6a3ee66b1d..15f2892414 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,8 +20,10 @@ Provide common utilities for TUF tests """ -import argparse from contextlib import contextmanager +from typing import Dict, Any, Callable +import unittest +import argparse import errno import logging import socket @@ -40,6 +42,22 @@ TEST_HOST_ADDRESS = '127.0.0.1' +# DataSet is only here so type hints can be used. +DataSet = Dict[str, Any] + +# Test runner decorator: Runs the test as a set of N SubTests, +# (where N is number of items in dataset), feeding the actual test +# function one test case at a time +def run_sub_tests_with_dataset(dataset: DataSet): + def real_decorator(function: Callable[[unittest.TestCase, Any], None]): + def wrapper(test_cls: unittest.TestCase): + for case, data in dataset.items(): + with test_cls.subTest(case=case): + function(test_cls, data) + return wrapper + return real_decorator + + class TestServerProcessError(Exception): def __init__(self, value="TestServerProcess"): diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 20fda0e400..fdb62ac4fc 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -229,6 +229,7 @@ def download_target( download URL. Default is the value provided in Updater() Raises: + ValueError: Invalid arguments TODO: download-related errors TODO: file write errors @@ -251,8 +252,9 @@ def download_target( consistent_snapshot = self._trusted_set.root.signed.consistent_snapshot if consistent_snapshot and self.config.prefix_targets_with_hash: hashes = list(targetinfo.hashes.values()) - target_filepath = f"{hashes[0]}.{target_filepath}" - full_url = parse.urljoin(target_base_url, target_filepath) + dirname, sep, basename = target_filepath.rpartition("/") + target_filepath = f"{dirname}{sep}{hashes[0]}.{basename}" + full_url = f"{target_base_url}{target_filepath}" with self._fetcher.download_file( full_url, targetinfo.length