diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index e50886d781..f7d315781f 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -44,11 +44,11 @@ updater.refresh() """ +import datetime import logging import os import tempfile from dataclasses import dataclass, field -from datetime import datetime, timedelta from typing import Dict, Iterator, List, Optional, Tuple from urllib import parse @@ -125,8 +125,10 @@ def __init__(self) -> None: self.fetch_tracker = FetchTracker() - now = datetime.utcnow() - self.safe_expiry = now.replace(microsecond=0) + timedelta(days=30) + now = datetime.datetime.utcnow() + self.safe_expiry = now.replace(microsecond=0) + datetime.timedelta( + days=30 + ) self._initialize() diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index 4ddd25ba64..9ecf4956fa 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -6,13 +6,13 @@ """Test ngclient Updater top-level metadata update workflow""" import builtins +import datetime import os import sys import tempfile import unittest -from datetime import datetime, timedelta from typing import Iterable, Optional -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, Mock, call, patch from tests import utils from tests.repository_simulator import RepositorySimulator @@ -43,7 +43,9 @@ class TestRefresh(unittest.TestCase): # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None - past_datetime = datetime.utcnow().replace(microsecond=0) - timedelta(days=5) + past_datetime = datetime.datetime.utcnow().replace( + microsecond=0 + ) - datetime.timedelta(days=5) def setUp(self) -> None: # pylint: disable=consider-using-with @@ -306,6 +308,81 @@ def test_new_timestamp_unsigned(self) -> None: self._assert_files_exist([Root.type]) + @patch.object(datetime, "datetime", wraps=datetime.datetime) + def test_expired_timestamp_version_rollback(self, mock_time: Mock) -> None: + """Verifies that local timestamp is used in rollback checks even if it is expired. + + The timestamp updates and rollback checks are performed + with the following timing: + - Timestamp v1 expiry set to day 7 + - First updater refresh performed on day 0 + - Repository publishes timestamp v2 on day 0 + - Timestamp v2 expiry set to day 21 + - Second updater refresh performed on day 18: + assert that rollback check uses expired timestamp v1""" + + now = datetime.datetime.utcnow() + self.sim.timestamp.expires = now + datetime.timedelta(days=7) + + self.sim.timestamp.version = 2 + + # Make a successful update of valid metadata which stores it in cache + self._run_refresh() + + self.sim.timestamp.expires = now + datetime.timedelta(days=21) + + self.sim.timestamp.version = 1 + + mock_time.utcnow.return_value = ( + datetime.datetime.utcnow() + datetime.timedelta(days=18) + ) + with patch("datetime.datetime", mock_time): + # Check that a rollback protection is performed even if + # local timestamp has expired + with self.assertRaises(BadVersionNumberError): + self._run_refresh() + + self._assert_version_equals(Timestamp.type, 2) + + @patch.object(datetime, "datetime", wraps=datetime.datetime) + def test_expired_timestamp_snapshot_rollback(self, mock_time: Mock) -> None: + """Verifies that rollback protection is done even if local timestamp has expired. + + The snapshot updates and rollback protection checks are performed + with the following timing: + - Timestamp v1 expiry set to day 7 + - Repository bumps snapshot to v3 on day 0 + - First updater refresh performed on day 0 + - Timestamp v2 expiry set to day 21 + - Second updater refresh performed on day 18: + assert that rollback protection is done with expired timestamp v1""" + + now = datetime.datetime.utcnow() + self.sim.timestamp.expires = now + datetime.timedelta(days=7) + + # Bump the snapshot version number to 3 + self.sim.update_snapshot() + self.sim.update_snapshot() + + # Make a successful update of valid metadata which stores it in cache + self._run_refresh() + + self.sim.snapshot.version = 1 + # Snapshot version number is set to 2, which is still less than 3 + self.sim.update_snapshot() + self.sim.timestamp.expires = now + datetime.timedelta(days=21) + + mock_time.utcnow.return_value = ( + datetime.datetime.utcnow() + datetime.timedelta(days=18) + ) + with patch("datetime.datetime", mock_time): + # Assert that rollback protection is done even if + # local timestamp has expired + with self.assertRaises(BadVersionNumberError): + self._run_refresh() + + self._assert_version_equals(Timestamp.type, 3) + def test_new_timestamp_version_rollback(self) -> None: # Check for a rollback attack self.sim.timestamp.version = 2 @@ -379,7 +456,7 @@ def test_new_snapshot_hash_mismatch(self) -> None: # Modify snapshot contents without updating # timestamp's snapshot hash - self.sim.snapshot.expires += timedelta(days=1) + self.sim.snapshot.expires += datetime.timedelta(days=1) self.sim.snapshot.version += 1 # snapshot v2 self.sim.timestamp.snapshot_meta.version = self.sim.snapshot.version self.sim.timestamp.version += 1 # timestamp v3 @@ -662,6 +739,44 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None: expected_calls = [("root", 2), ("timestamp", None)] self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls) + @patch.object(datetime, "datetime", wraps=datetime.datetime) + def test_expired_metadata(self, mock_time: Mock) -> None: + """Verifies that expired local timestamp/snapshot can be used for + updating from remote. + + The updates and verifications are performed with the following timing: + - Timestamp v1 expiry set to day 7 + - First updater refresh performed on day 0 + - Repository bumps snapshot and targets to v2 on day 0 + - Timestamp v2 expiry set to day 21 + - Second updater refresh performed on day 18, + it is successful and timestamp/snaphot final versions are v2""" + + now = datetime.datetime.utcnow() + self.sim.timestamp.expires = now + datetime.timedelta(days=7) + + # Make a successful update of valid metadata which stores it in cache + self._run_refresh() + + self.sim.targets.version += 1 + self.sim.update_snapshot() + self.sim.timestamp.expires = now + datetime.timedelta(days=21) + + # Mocking time so that local timestam has expired + # but the new timestamp has not + mock_time.utcnow.return_value = ( + datetime.datetime.utcnow() + datetime.timedelta(days=18) + ) + with patch("datetime.datetime", mock_time): + self._run_refresh() + + # Assert that the final version of timestamp/snapshot is version 2 + # which means a successful refresh is performed + # with expired local metadata + for role in ["timestamp", "snapshot", "targets"]: + md = Metadata.from_file(f"{self.metadata_dir}/{role}.json") + self.assertEqual(md.signed.version, 2) + if __name__ == "__main__": if "--dump" in sys.argv: diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 754fb8962e..d08694f091 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -59,9 +59,9 @@ >>> trusted_set.update_snapshot(f.read()) """ +import datetime import logging from collections import abc -from datetime import datetime from typing import Dict, Iterator, Optional from tuf.api import exceptions @@ -91,7 +91,7 @@ def __init__(self, root_data: bytes): error type and content will contain more details. """ self._trusted_set: Dict[str, Metadata] = {} - self.reference_time = datetime.utcnow() + self.reference_time = datetime.datetime.utcnow() # Load and validate the local root metadata. Valid initial trusted root # metadata is required