From 954331c8af75e1d192e0be293621bfd043ebd6fd Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Thu, 14 Oct 2021 15:14:17 +0300 Subject: [PATCH 1/5] ngtests: Add top-level-roles update tests Add ngclient/updater tests following the top-level-roles metadata update from the specification (Detailed client workflow) using RepositorySimulator. Signed-off-by: Teodora Sechkova --- tests/test_updater_top_level_update.py | 373 +++++++++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 tests/test_updater_top_level_update.py diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py new file mode 100644 index 0000000000..45eea20375 --- /dev/null +++ b/tests/test_updater_top_level_update.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python + +# Copyright 2021, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +"""Test ngclient Updater top-level metadata update workflow""" + +import os +import sys +import tempfile +import unittest +from datetime import datetime, timedelta + +from tests import utils +from tests.repository_simulator import RepositorySimulator +from tuf.api.metadata import Metadata +from tuf.exceptions import ( + BadVersionNumberError, + ExpiredMetadataError, + ReplayedMetadataError, + RepositoryError, + UnsignedMetadataError, +) +from tuf.ngclient import Updater + +class TestRefresh(unittest.TestCase): + """Test update of top-level metadata following + 'Detailed client workflow' in the specification.""" + + past_datetime = datetime.utcnow().replace(microsecond=0) - timedelta(days=5) + + def setUp(self): + self.temp_dir = tempfile.TemporaryDirectory() + self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") + self.targets_dir = os.path.join(self.temp_dir.name, "targets") + os.mkdir(self.metadata_dir) + os.mkdir(self.targets_dir) + + self.sim = RepositorySimulator() + + with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: + root = self.sim.download_bytes( + "https://example.com/metadata/1.root.json", 100000 + ) + f.write(root) + + def tearDown(self): + self.temp_dir.cleanup() + + def _run_refresh(self) -> Updater: + """Create a new Updater instance and refresh""" + updater = Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + ) + updater.refresh() + return updater + + def _init_updater(self) -> Updater: + """Create a new Updater instance""" + return Updater( + self.metadata_dir, + "https://example.com/metadata/", + self.targets_dir, + "https://example.com/targets/", + self.sim, + ) + + def test_first_time_refresh(self) -> None: + # Metadata dir contains only the mandatory initial root.json + metadata_files = os.listdir(self.metadata_dir) + self.assertListEqual(metadata_files, ["root.json"]) + + # Add one more root version to repository so that + # refresh() updates from local trusted root (v1) to + # remote root (v2) + self.sim.root.version += 1 + self.sim.publish_root() + + self._run_refresh() + + # Top-level metadata can be found in metadata dir + metadata_files_after_refresh = os.listdir(self.metadata_dir) + metadata_files_after_refresh.sort() + self.assertListEqual( + metadata_files_after_refresh, + ["root.json", "snapshot.json", "targets.json", "timestamp.json"], + ) + + def test_trusted_root_missing(self) -> None: + os.remove(os.path.join(self.metadata_dir, "root.json")) + with self.assertRaises(OSError): + self._run_refresh() + + def test_trusted_root_expired(self) -> None: + # Create an expired root version + self.sim.root.expires = self.past_datetime + self.sim.root.version += 1 + self.sim.publish_root() + + # Update to latest root which is expired but still + # saved as a local root. + updater = self._init_updater() + with self.assertRaises(ExpiredMetadataError): + updater.refresh() + + with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: + self.assertEqual(f.read(), self.sim.signed_roots[-1]) + + # Local root metadata can be loaded even if expired + updater = self._init_updater() + + # Create a non-expired root version and refresh + self.sim.root.expires = self.sim.safe_expiry + self.sim.root.version += 1 + self.sim.publish_root() + + updater.refresh() + + with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: + self.assertEqual(f.read(), self.sim.signed_roots[-1]) + + def test_trusted_root_unsigned(self) -> None: + # Local trusted root is not signed + root_path = os.path.join(self.metadata_dir, "root.json") + md_root = Metadata.from_file(root_path) + md_root.signatures.clear() + md_root.to_file(root_path) + + with self.assertRaises(UnsignedMetadataError): + self._run_refresh() + + def test_max_root_rotations(self) -> None: + # Root must stop looking for new versions after Y number of + # intermediate files were downloaded. + updater = self._init_updater() + updater.config.max_root_rotations = 3 + + # Create some number of roots greater than 'max_root_rotations' + while self.sim.root.version < updater.config.max_root_rotations + 3: + self.sim.root.version += 1 + self.sim.publish_root() + + root_path = os.path.join(self.metadata_dir, "root.json") + md_root = Metadata.from_file(root_path) + initial_root_version = md_root.signed.version + + updater.refresh() + + # Assert that root version was increased with no more than 'max_root_rotations' + md_root = Metadata.from_file(root_path) + self.assertEqual( + md_root.signed.version, + initial_root_version + updater.config.max_root_rotations, + ) + + def test_intermediate_root_incorrectly_signed(self) -> None: + # Check for an arbitrary software attack + + # Intermediate root v2 is unsigned + self.sim.root.version += 1 + root_signers = self.sim.signers["root"].copy() + self.sim.signers["root"].clear() + self.sim.publish_root() + + # Final root v3 is correctly signed + self.sim.root.version += 1 + self.sim.signers["root"] = root_signers + self.sim.publish_root() + + # Incorrectly signed intermediate root is detected + with self.assertRaises(UnsignedMetadataError): + self._run_refresh() + + def test_intermediate_root_expired(self) -> None: + # The expiration of the new (intermediate) root metadata file + # does not matter yet + + # Intermediate root v2 is expired + self.sim.root.expires = self.past_datetime + self.sim.root.version += 1 + self.sim.publish_root() + + # Final root v3 is up to date + self.sim.root.expires = self.sim.safe_expiry + self.sim.root.version += 1 + self.sim.publish_root() + + self._run_refresh() + md_root = Metadata.from_file( + os.path.join(self.metadata_dir, "root.json") + ) + self.assertEqual(md_root.signed.version, self.sim.root.version) + + def test_final_root_incorrectly_signed(self) -> None: + # Check for an arbitrary software attack + self.sim.root.version += 1 + self.sim.signers["root"].clear() + self.sim.publish_root() + + with self.assertRaises(UnsignedMetadataError): + self._run_refresh() + + def test_new_root_same_version(self) -> None: + # Check for a rollback_attack + # Repository serves a root file with the same version as previous + self.sim.publish_root() + with self.assertRaises(ReplayedMetadataError): + self._run_refresh() + + def test_new_root_nonconsecutive_version(self) -> None: + # Repository serves non-consecutive root version + self.sim.root.version += 2 + self.sim.publish_root() + with self.assertRaises(ReplayedMetadataError): + self._run_refresh() + + def test_final_root_expired(self) -> None: + # Check for a freeze attack + # Final root is expired + self.sim.root.expires = self.past_datetime + self.sim.root.version += 1 + self.sim.publish_root() + + with self.assertRaises(ExpiredMetadataError): + self._run_refresh() + + def test_new_timestamp_unsigned(self) -> None: + # Check for an arbitrary software attack + self.sim.signers["timestamp"].clear() + with self.assertRaises(UnsignedMetadataError): + self._run_refresh() + + def test_new_timestamp_version_rollback(self) -> None: + # Check for a rollback attack + self.sim.timestamp.version = 2 + self._run_refresh() + + self.sim.timestamp.version = 1 + with self.assertRaises(ReplayedMetadataError): + self._run_refresh() + + def test_new_timestamp_snapshot_rollback(self) -> None: + # Check for a rollback attack. + self.sim.snapshot.version = 2 + self.sim.update_timestamp() + self._run_refresh() + + # Snapshot meta version is smaller than previous + self.sim.timestamp.snapshot_meta.version = 1 + self.sim.timestamp.version += 1 + + with self.assertRaises(ReplayedMetadataError): + self._run_refresh() + + def test_new_timestamp_expired(self) -> None: + # Check for a freeze attack + self.sim.timestamp.expires = self.past_datetime + self.sim.update_timestamp() + + with self.assertRaises(ExpiredMetadataError): + self._run_refresh() + + def test_new_snapshot_hash_mismatch(self) -> None: + # Check against timestamp role’s snapshot hash + + # Update timestamp with snapshot's hashes + self.sim.compute_metafile_hashes_length = True + self.sim.update_timestamp() # timestamp v2 + self._run_refresh() + + # Modify snapshot contents without updating + # timestamp's snapshot hash + self.sim.snapshot.expires += timedelta(days=1) + self.sim.snapshot.version += 1 # snapshot v2 + self.sim.timestamp.snapshot_meta.version = self.sim.snapshot.version + self.sim.timestamp.version += 1 # timestamp v3 + + # Hash mismatch error + with self.assertRaises(RepositoryError): + self._run_refresh() + + def test_new_snapshot_unsigned(self) -> None: + # Check for an arbitrary software attack + self.sim.signers["snapshot"].clear() + with self.assertRaises(UnsignedMetadataError): + self._run_refresh() + + # TODO: RepositorySimulator works always with consistent snapshot + # enabled which forces the client to look for the snapshot version + # written in timestamp (which leads to "Unknown snapshot version"). + # This fails the test for a snapshot version mismatch. + + # def test_new_snapshot_version_mismatch(self): + # # Check against timestamp role’s snapshot version + + # # Increase snapshot version without updating + # # timestamp's snapshot version + # self.sim.snapshot.version += 1 + # with self.assertRaises(BadVersionNumberError): + # self._run_refresh() + + def test_new_snapshot_version_rollback(self) -> None: + # Check for a rollback attack + self.sim.snapshot.version = 2 + self.sim.update_timestamp() + self._run_refresh() + + self.sim.snapshot.version = 1 + self.sim.update_timestamp() + + with self.assertRaises(ReplayedMetadataError): + self._run_refresh() + + def test_new_snapshot_expired(self) -> None: + # Check for a freeze attack + self.sim.snapshot.expires = self.past_datetime + self.sim.update_snapshot() + + with self.assertRaises(ExpiredMetadataError): + self._run_refresh() + + def test_new_targets_hash_mismatch(self) -> None: + # Check against snapshot role’s targets hashes + + # Update snapshot with target's hashes + self.sim.compute_metafile_hashes_length = True + self.sim.update_snapshot() + self._run_refresh() + + # Modify targets contents without updating + # snapshot's targets hashes + self.sim.targets.version += 1 + self.sim.snapshot.meta["targets.json"].version = self.sim.targets.version + self.sim.snapshot.version += 1 + self.sim.update_timestamp() + + with self.assertRaises(RepositoryError): + self._run_refresh() + + def test_new_targets_unsigned(self) -> None: + # Check for an arbitrary software attack + self.sim.signers["targets"].clear() + with self.assertRaises(UnsignedMetadataError): + self._run_refresh() + + # TODO: RepositorySimulator works always with consistent snapshot + # enabled which forces the client to look for the targets version + # written in snapshot (which leads to "Unknown targets version"). + # This fails the test for a targets version mismatch. + + # def test_new_targets_version_mismatch(self): + # # Check against snapshot role’s targets version + # self.sim.targets.version += 1 + # with self.assertRaises(BadVersionNumberError): + # self._run_refresh() + + def test_new_targets_expired(self) -> None: + # Check for a freeze attack. + self.sim.targets.expires = self.past_datetime + self.sim.update_snapshot() + + with self.assertRaises(ExpiredMetadataError): + self._run_refresh() + + +if __name__ == "__main__": + + utils.configure_test_logging(sys.argv) + unittest.main() From 8a2c7857ac94fffea519ac0bb2d599844a33abde Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Fri, 29 Oct 2021 14:56:57 +0300 Subject: [PATCH 2/5] ngtests: Add addtional asserts for files on disk Extend the TestRefresh cases with additional checks for expected metadata files and their content written on the file system. Signed-off-by: Teodora Sechkova --- tests/test_updater_top_level_update.py | 116 ++++++++++++++++++++----- 1 file changed, 94 insertions(+), 22 deletions(-) diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 45eea20375..564a26a525 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -8,12 +8,13 @@ import os import sys import tempfile +from typing import List, Optional import unittest from datetime import datetime, timedelta from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import Metadata +from tuf.api.metadata import Metadata, TOP_LEVEL_ROLE_NAMES from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -69,10 +70,21 @@ def _init_updater(self) -> Updater: self.sim, ) + def _assert_files_exist(self, roles: List[str]) -> None: + """Assert that local metadata files exist for 'roles'""" + expected_files = sorted([f"{role}.json" for role in roles]) + local_metadata_files = sorted(os.listdir(self.metadata_dir)) + self.assertListEqual(local_metadata_files, expected_files) + + def _assert_content_equals(self, role: str, version: Optional[int]=None) -> None: + """Assert that local file content is the expected""" + expected_content = self.sim._fetch_metadata(role, version) + with open(os.path.join(self.metadata_dir, f"{role}.json"), "rb") as f: + self.assertEqual(f.read(), expected_content) + def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json - metadata_files = os.listdir(self.metadata_dir) - self.assertListEqual(metadata_files, ["root.json"]) + self._assert_files_exist(["root"]) # Add one more root version to repository so that # refresh() updates from local trusted root (v1) to @@ -82,19 +94,19 @@ def test_first_time_refresh(self) -> None: self._run_refresh() - # Top-level metadata can be found in metadata dir - metadata_files_after_refresh = os.listdir(self.metadata_dir) - metadata_files_after_refresh.sort() - self.assertListEqual( - metadata_files_after_refresh, - ["root.json", "snapshot.json", "targets.json", "timestamp.json"], - ) + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + for role in TOP_LEVEL_ROLE_NAMES: + version = 2 if role == "root" else None + self._assert_content_equals(role, version) def test_trusted_root_missing(self) -> None: os.remove(os.path.join(self.metadata_dir, "root.json")) with self.assertRaises(OSError): self._run_refresh() + # Metadata dir is empty + self.assertFalse(os.listdir(self.metadata_dir)) + def test_trusted_root_expired(self) -> None: # Create an expired root version self.sim.root.expires = self.past_datetime @@ -107,8 +119,8 @@ def test_trusted_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): updater.refresh() - with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: - self.assertEqual(f.read(), self.sim.signed_roots[-1]) + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 2) # Local root metadata can be loaded even if expired updater = self._init_updater() @@ -117,11 +129,11 @@ def test_trusted_root_expired(self) -> None: self.sim.root.expires = self.sim.safe_expiry self.sim.root.version += 1 self.sim.publish_root() - updater.refresh() - with open(os.path.join(self.metadata_dir, "root.json"), "rb") as f: - self.assertEqual(f.read(), self.sim.signed_roots[-1]) + # Root is successfully updated to latest version + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_content_equals("root", 3) def test_trusted_root_unsigned(self) -> None: # Local trusted root is not signed @@ -133,6 +145,11 @@ def test_trusted_root_unsigned(self) -> None: with self.assertRaises(UnsignedMetadataError): self._run_refresh() + # The update failed, no changes in metadata + self._assert_files_exist(["root"]) + md_root_after = Metadata.from_file(root_path) + self.assertEqual(md_root.to_bytes(), md_root_after.to_bytes()) + def test_max_root_rotations(self) -> None: # Root must stop looking for new versions after Y number of # intermediate files were downloaded. @@ -175,6 +192,11 @@ def test_intermediate_root_incorrectly_signed(self) -> None: with self.assertRaises(UnsignedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + + def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file # does not matter yet @@ -190,20 +212,24 @@ def test_intermediate_root_expired(self) -> None: self.sim.publish_root() self._run_refresh() - md_root = Metadata.from_file( - os.path.join(self.metadata_dir, "root.json") - ) - self.assertEqual(md_root.signed.version, self.sim.root.version) + + # Successfully updated to root v3 + self._assert_files_exist(TOP_LEVEL_ROLE_NAMES) + self._assert_content_equals("root", 3) def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack - self.sim.root.version += 1 + self.sim.root.version += 1 # root v2 self.sim.signers["root"].clear() self.sim.publish_root() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + def test_new_root_same_version(self) -> None: # Check for a rollback_attack # Repository serves a root file with the same version as previous @@ -211,6 +237,10 @@ def test_new_root_same_version(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + def test_new_root_nonconsecutive_version(self) -> None: # Repository serves non-consecutive root version self.sim.root.version += 2 @@ -218,6 +248,10 @@ def test_new_root_nonconsecutive_version(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + # The update failed, latest root version is v1 + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 1) + def test_final_root_expired(self) -> None: # Check for a freeze attack # Final root is expired @@ -228,12 +262,18 @@ def test_final_root_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + # The update failed but final root is persisted on the file system + self._assert_files_exist(["root"]) + self._assert_content_equals("root", 2) + def test_new_timestamp_unsigned(self) -> None: # Check for an arbitrary software attack self.sim.signers["timestamp"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + self._assert_files_exist(["root"]) + def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack self.sim.timestamp.version = 2 @@ -243,19 +283,25 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + self.assertEqual(md_timestamp.signed.version, 2) + def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. self.sim.snapshot.version = 2 - self.sim.update_timestamp() + self.sim.update_timestamp() # timestamp v2 self._run_refresh() # Snapshot meta version is smaller than previous self.sim.timestamp.snapshot_meta.version = 1 - self.sim.timestamp.version += 1 + self.sim.timestamp.version += 1 # timestamp v3 with self.assertRaises(ReplayedMetadataError): self._run_refresh() + md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + self.assertEqual(md_timestamp.signed.version, 2) + def test_new_timestamp_expired(self) -> None: # Check for a freeze attack self.sim.timestamp.expires = self.past_datetime @@ -264,6 +310,8 @@ def test_new_timestamp_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + self._assert_files_exist(["root"]) + def test_new_snapshot_hash_mismatch(self) -> None: # Check against timestamp role’s snapshot hash @@ -283,12 +331,20 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() + md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + self.assertEqual(md_timestamp.signed.version, 3) + + md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + self.assertEqual(md_snapshot.signed.version, 1) + def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack self.sim.signers["snapshot"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp"]) + # TODO: RepositorySimulator works always with consistent snapshot # enabled which forces the client to look for the snapshot version # written in timestamp (which leads to "Unknown snapshot version"). @@ -315,6 +371,9 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() + md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + self.assertEqual(md_snapshot.signed.version, 2) + def test_new_snapshot_expired(self) -> None: # Check for a freeze attack self.sim.snapshot.expires = self.past_datetime @@ -323,6 +382,9 @@ def test_new_snapshot_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp"]) + + def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -341,12 +403,20 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() + md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + self.assertEqual(md_snapshot.signed.version, 3) + + md_targets = Metadata.from_file(os.path.join(self.metadata_dir, "targets.json")) + self.assertEqual(md_targets.signed.version, 1) + def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack self.sim.signers["targets"].clear() with self.assertRaises(UnsignedMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp", "snapshot"]) + # TODO: RepositorySimulator works always with consistent snapshot # enabled which forces the client to look for the targets version # written in snapshot (which leads to "Unknown targets version"). @@ -366,6 +436,8 @@ def test_new_targets_expired(self) -> None: with self.assertRaises(ExpiredMetadataError): self._run_refresh() + self._assert_files_exist(["root", "timestamp", "snapshot"]) + if __name__ == "__main__": From e51642a290b29474d136b24f65b6ae4819f2b32c Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Fri, 29 Oct 2021 17:10:17 +0300 Subject: [PATCH 3/5] ngtests: Fix formatiing and linter issues Fix formatting and some potential linter and typing errors. Signed-off-by: Teodora Sechkova --- tests/test_updater_top_level_update.py | 62 +++++++++++++++++--------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 564a26a525..40ad3aa634 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -8,13 +8,13 @@ import os import sys import tempfile -from typing import List, Optional import unittest from datetime import datetime, timedelta +from typing import Iterable, Optional from tests import utils from tests.repository_simulator import RepositorySimulator -from tuf.api.metadata import Metadata, TOP_LEVEL_ROLE_NAMES +from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES, Metadata from tuf.exceptions import ( BadVersionNumberError, ExpiredMetadataError, @@ -24,13 +24,14 @@ ) from tuf.ngclient import Updater + class TestRefresh(unittest.TestCase): """Test update of top-level metadata following 'Detailed client workflow' in the specification.""" past_datetime = datetime.utcnow().replace(microsecond=0) - timedelta(days=5) - def setUp(self): + def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") self.targets_dir = os.path.join(self.temp_dir.name, "targets") @@ -45,7 +46,7 @@ def setUp(self): ) f.write(root) - def tearDown(self): + def tearDown(self) -> None: self.temp_dir.cleanup() def _run_refresh(self) -> Updater: @@ -70,13 +71,15 @@ def _init_updater(self) -> Updater: self.sim, ) - def _assert_files_exist(self, roles: List[str]) -> None: + def _assert_files_exist(self, roles: Iterable[str]) -> None: """Assert that local metadata files exist for 'roles'""" expected_files = sorted([f"{role}.json" for role in roles]) local_metadata_files = sorted(os.listdir(self.metadata_dir)) self.assertListEqual(local_metadata_files, expected_files) - def _assert_content_equals(self, role: str, version: Optional[int]=None) -> None: + def _assert_content_equals( + self, role: str, version: Optional[int] = None + ) -> None: """Assert that local file content is the expected""" expected_content = self.sim._fetch_metadata(role, version) with open(os.path.join(self.metadata_dir, f"{role}.json"), "rb") as f: @@ -167,7 +170,8 @@ def test_max_root_rotations(self) -> None: updater.refresh() - # Assert that root version was increased with no more than 'max_root_rotations' + # Assert that root version was increased with no more + # than 'max_root_rotations' md_root = Metadata.from_file(root_path) self.assertEqual( md_root.signed.version, @@ -196,7 +200,6 @@ def test_intermediate_root_incorrectly_signed(self) -> None: self._assert_files_exist(["root"]) self._assert_content_equals("root", 1) - def test_intermediate_root_expired(self) -> None: # The expiration of the new (intermediate) root metadata file # does not matter yet @@ -219,7 +222,7 @@ def test_intermediate_root_expired(self) -> None: def test_final_root_incorrectly_signed(self) -> None: # Check for an arbitrary software attack - self.sim.root.version += 1 # root v2 + self.sim.root.version += 1 # root v2 self.sim.signers["root"].clear() self.sim.publish_root() @@ -283,23 +286,27 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + md_timestamp = Metadata.from_file( + os.path.join(self.metadata_dir, "timestamp.json") + ) self.assertEqual(md_timestamp.signed.version, 2) def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. self.sim.snapshot.version = 2 - self.sim.update_timestamp() # timestamp v2 + self.sim.update_timestamp() # timestamp v2 self._run_refresh() # Snapshot meta version is smaller than previous self.sim.timestamp.snapshot_meta.version = 1 - self.sim.timestamp.version += 1 # timestamp v3 + self.sim.timestamp.version += 1 # timestamp v3 with self.assertRaises(ReplayedMetadataError): self._run_refresh() - md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + md_timestamp = Metadata.from_file( + os.path.join(self.metadata_dir, "timestamp.json") + ) self.assertEqual(md_timestamp.signed.version, 2) def test_new_timestamp_expired(self) -> None: @@ -317,7 +324,7 @@ def test_new_snapshot_hash_mismatch(self) -> None: # Update timestamp with snapshot's hashes self.sim.compute_metafile_hashes_length = True - self.sim.update_timestamp() # timestamp v2 + self.sim.update_timestamp() # timestamp v2 self._run_refresh() # Modify snapshot contents without updating @@ -325,16 +332,20 @@ def test_new_snapshot_hash_mismatch(self) -> None: self.sim.snapshot.expires += timedelta(days=1) self.sim.snapshot.version += 1 # snapshot v2 self.sim.timestamp.snapshot_meta.version = self.sim.snapshot.version - self.sim.timestamp.version += 1 # timestamp v3 + self.sim.timestamp.version += 1 # timestamp v3 # Hash mismatch error with self.assertRaises(RepositoryError): self._run_refresh() - md_timestamp = Metadata.from_file(os.path.join(self.metadata_dir, "timestamp.json")) + md_timestamp = Metadata.from_file( + os.path.join(self.metadata_dir, "timestamp.json") + ) self.assertEqual(md_timestamp.signed.version, 3) - md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + md_snapshot = Metadata.from_file( + os.path.join(self.metadata_dir, "snapshot.json") + ) self.assertEqual(md_snapshot.signed.version, 1) def test_new_snapshot_unsigned(self) -> None: @@ -371,7 +382,9 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + md_snapshot = Metadata.from_file( + os.path.join(self.metadata_dir, "snapshot.json") + ) self.assertEqual(md_snapshot.signed.version, 2) def test_new_snapshot_expired(self) -> None: @@ -384,7 +397,6 @@ def test_new_snapshot_expired(self) -> None: self._assert_files_exist(["root", "timestamp"]) - def test_new_targets_hash_mismatch(self) -> None: # Check against snapshot role’s targets hashes @@ -396,17 +408,23 @@ def test_new_targets_hash_mismatch(self) -> None: # Modify targets contents without updating # snapshot's targets hashes self.sim.targets.version += 1 - self.sim.snapshot.meta["targets.json"].version = self.sim.targets.version + self.sim.snapshot.meta[ + "targets.json" + ].version = self.sim.targets.version self.sim.snapshot.version += 1 self.sim.update_timestamp() with self.assertRaises(RepositoryError): self._run_refresh() - md_snapshot = Metadata.from_file(os.path.join(self.metadata_dir, "snapshot.json")) + md_snapshot = Metadata.from_file( + os.path.join(self.metadata_dir, "snapshot.json") + ) self.assertEqual(md_snapshot.signed.version, 3) - md_targets = Metadata.from_file(os.path.join(self.metadata_dir, "targets.json")) + md_targets = Metadata.from_file( + os.path.join(self.metadata_dir, "targets.json") + ) self.assertEqual(md_targets.signed.version, 1) def test_new_targets_unsigned(self) -> None: From 8418d5267fef4030b52ddbd026081951f772e617 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Tue, 9 Nov 2021 13:59:44 +0200 Subject: [PATCH 4/5] ngtests: Add asserts for expected version Define _assert_version_equals for checking if the local metadata file's version is as expected. Signed-off-by: Teodora Sechkova --- tests/test_updater_top_level_update.py | 53 +++++++++----------------- 1 file changed, 17 insertions(+), 36 deletions(-) diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 40ad3aa634..df9649fd9a 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -85,6 +85,11 @@ def _assert_content_equals( with open(os.path.join(self.metadata_dir, f"{role}.json"), "rb") as f: self.assertEqual(f.read(), expected_content) + def _assert_version_equals(self, role: str, expected_version: int) -> None: + """Assert that local metadata version is the expected""" + md = Metadata.from_file(os.path.join(self.metadata_dir, f"{role}.json")) + self.assertEqual(md.signed.version, expected_version) + def test_first_time_refresh(self) -> None: # Metadata dir contains only the mandatory initial root.json self._assert_files_exist(["root"]) @@ -164,18 +169,17 @@ def test_max_root_rotations(self) -> None: self.sim.root.version += 1 self.sim.publish_root() - root_path = os.path.join(self.metadata_dir, "root.json") - md_root = Metadata.from_file(root_path) + md_root = Metadata.from_file( + os.path.join(self.metadata_dir, "root.json") + ) initial_root_version = md_root.signed.version updater.refresh() # Assert that root version was increased with no more # than 'max_root_rotations' - md_root = Metadata.from_file(root_path) - self.assertEqual( - md_root.signed.version, - initial_root_version + updater.config.max_root_rotations, + self._assert_version_equals( + "root", initial_root_version + updater.config.max_root_rotations ) def test_intermediate_root_incorrectly_signed(self) -> None: @@ -286,10 +290,7 @@ def test_new_timestamp_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - md_timestamp = Metadata.from_file( - os.path.join(self.metadata_dir, "timestamp.json") - ) - self.assertEqual(md_timestamp.signed.version, 2) + self._assert_version_equals("timestamp", 2) def test_new_timestamp_snapshot_rollback(self) -> None: # Check for a rollback attack. @@ -304,10 +305,7 @@ def test_new_timestamp_snapshot_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - md_timestamp = Metadata.from_file( - os.path.join(self.metadata_dir, "timestamp.json") - ) - self.assertEqual(md_timestamp.signed.version, 2) + self._assert_version_equals("timestamp", 2) def test_new_timestamp_expired(self) -> None: # Check for a freeze attack @@ -338,15 +336,8 @@ def test_new_snapshot_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - md_timestamp = Metadata.from_file( - os.path.join(self.metadata_dir, "timestamp.json") - ) - self.assertEqual(md_timestamp.signed.version, 3) - - md_snapshot = Metadata.from_file( - os.path.join(self.metadata_dir, "snapshot.json") - ) - self.assertEqual(md_snapshot.signed.version, 1) + self._assert_version_equals("timestamp", 3) + self._assert_version_equals("snapshot", 1) def test_new_snapshot_unsigned(self) -> None: # Check for an arbitrary software attack @@ -382,10 +373,7 @@ def test_new_snapshot_version_rollback(self) -> None: with self.assertRaises(ReplayedMetadataError): self._run_refresh() - md_snapshot = Metadata.from_file( - os.path.join(self.metadata_dir, "snapshot.json") - ) - self.assertEqual(md_snapshot.signed.version, 2) + self._assert_version_equals("snapshot", 2) def test_new_snapshot_expired(self) -> None: # Check for a freeze attack @@ -417,15 +405,8 @@ def test_new_targets_hash_mismatch(self) -> None: with self.assertRaises(RepositoryError): self._run_refresh() - md_snapshot = Metadata.from_file( - os.path.join(self.metadata_dir, "snapshot.json") - ) - self.assertEqual(md_snapshot.signed.version, 3) - - md_targets = Metadata.from_file( - os.path.join(self.metadata_dir, "targets.json") - ) - self.assertEqual(md_targets.signed.version, 1) + self._assert_version_equals("snapshot", 3) + self._assert_version_equals("targets", 1) def test_new_targets_unsigned(self) -> None: # Check for an arbitrary software attack From d66c3baf2753523c600ee7f6aca6371e671d0c12 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Tue, 9 Nov 2021 14:04:54 +0200 Subject: [PATCH 5/5] RepoSim: remove metadata version check Except for 'root' role, RepositorySimulator does not keep previous metadata versions, it always serves the latest one. The metadata version check during fetch serves mostly for informative purposes and removing it allows generating test metadata with mismatching version. Signed-off-by: Teodora Sechkova --- tests/repository_simulator.py | 2 -- tests/test_updater_top_level_update.py | 35 +++++++++++--------------- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index f5f4209bef..56bb9dafe0 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -249,8 +249,6 @@ def _fetch_metadata( if md is None: raise FetcherHTTPError(f"Unknown role {role}", 404) - if version is not None and version != md.signed.version: - raise FetcherHTTPError(f"Unknown {role} version {version}", 404) md.signatures.clear() for signer in self.signers[role].values(): diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index df9649fd9a..2dbb973979 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -347,19 +347,15 @@ def test_new_snapshot_unsigned(self) -> None: self._assert_files_exist(["root", "timestamp"]) - # TODO: RepositorySimulator works always with consistent snapshot - # enabled which forces the client to look for the snapshot version - # written in timestamp (which leads to "Unknown snapshot version"). - # This fails the test for a snapshot version mismatch. + def test_new_snapshot_version_mismatch(self): + # Check against timestamp role’s snapshot version - # def test_new_snapshot_version_mismatch(self): - # # Check against timestamp role’s snapshot version + # Increase snapshot version without updating timestamp + self.sim.snapshot.version += 1 + with self.assertRaises(BadVersionNumberError): + self._run_refresh() - # # Increase snapshot version without updating - # # timestamp's snapshot version - # self.sim.snapshot.version += 1 - # with self.assertRaises(BadVersionNumberError): - # self._run_refresh() + self._assert_files_exist(["root", "timestamp"]) def test_new_snapshot_version_rollback(self) -> None: # Check for a rollback attack @@ -416,16 +412,15 @@ def test_new_targets_unsigned(self) -> None: self._assert_files_exist(["root", "timestamp", "snapshot"]) - # TODO: RepositorySimulator works always with consistent snapshot - # enabled which forces the client to look for the targets version - # written in snapshot (which leads to "Unknown targets version"). - # This fails the test for a targets version mismatch. + def test_new_targets_version_mismatch(self): + # Check against snapshot role’s targets version - # def test_new_targets_version_mismatch(self): - # # Check against snapshot role’s targets version - # self.sim.targets.version += 1 - # with self.assertRaises(BadVersionNumberError): - # self._run_refresh() + # Increase targets version without updating snapshot + self.sim.targets.version += 1 + with self.assertRaises(BadVersionNumberError): + self._run_refresh() + + self._assert_files_exist(["root", "timestamp", "snapshot"]) def test_new_targets_expired(self) -> None: # Check for a freeze attack.