|
5 | 5 |
|
6 | 6 | """Test ngclient Updater top-level metadata update workflow"""
|
7 | 7 |
|
| 8 | +import builtins |
8 | 9 | import os
|
9 | 10 | import sys
|
10 | 11 | import tempfile
|
11 | 12 | import unittest
|
12 | 13 | from datetime import datetime, timedelta
|
13 | 14 | from typing import Iterable, Optional
|
| 15 | +from unittest.mock import MagicMock, patch |
14 | 16 |
|
15 | 17 | from tests import utils
|
16 | 18 | from tests.repository_simulator import RepositorySimulator
|
17 | 19 | from tuf.api.metadata import (
|
| 20 | + SPECIFICATION_VERSION, |
18 | 21 | TOP_LEVEL_ROLE_NAMES,
|
| 22 | + DelegatedRole, |
19 | 23 | Metadata,
|
20 | 24 | Root,
|
21 | 25 | Snapshot,
|
@@ -575,6 +579,63 @@ def test_new_targets_fast_forward_recovery(self) -> None:
|
575 | 579 | self._run_refresh()
|
576 | 580 | self._assert_version_equals(Targets.type, 1)
|
577 | 581 |
|
| 582 | + @patch.object(builtins, "open", wraps=builtins.open) |
| 583 | + def test_not_loading_targets_twice(self, wrapped_open: MagicMock) -> None: |
| 584 | + # Do not load targets roles more than once when traversing |
| 585 | + # the delegations tree |
| 586 | + |
| 587 | + # Add new delegated targets, update the snapshot |
| 588 | + spec_version = ".".join(SPECIFICATION_VERSION) |
| 589 | + targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) |
| 590 | + role = DelegatedRole("role1", [], 1, False, ["*"], None) |
| 591 | + self.sim.add_delegation("targets", role, targets) |
| 592 | + self.sim.update_snapshot() |
| 593 | + |
| 594 | + # Run refresh, top-level roles are loaded |
| 595 | + updater = self._run_refresh() |
| 596 | + # Clean up calls to open during refresh() |
| 597 | + wrapped_open.reset_mock() |
| 598 | + |
| 599 | + # First time looking for "somepath", only 'role1' must be loaded |
| 600 | + updater.get_targetinfo("somepath") |
| 601 | + wrapped_open.assert_called_once_with( |
| 602 | + os.path.join(self.metadata_dir, "role1.json"), "rb" |
| 603 | + ) |
| 604 | + wrapped_open.reset_mock() |
| 605 | + # Second call to get_targetinfo, all metadata is already loaded |
| 606 | + updater.get_targetinfo("somepath") |
| 607 | + wrapped_open.assert_not_called() |
| 608 | + |
| 609 | + def test_snapshot_rollback_with_local_snapshot_hash_mismatch(self) -> None: |
| 610 | + # Test triggering snapshot rollback check on a newly downloaded snapshot |
| 611 | + # when the local snapshot is loaded even when there is a hash mismatch |
| 612 | + # with timestamp.snapshot_meta. |
| 613 | + |
| 614 | + # By raising this flag on timestamp update the simulator would: |
| 615 | + # 1) compute the hash of the new modified version of snapshot |
| 616 | + # 2) assign the hash to timestamp.snapshot_meta |
| 617 | + # The purpose is to create a hash mismatch between timestamp.meta and |
| 618 | + # the local snapshot, but to have hash match between timestamp.meta and |
| 619 | + # the next snapshot version. |
| 620 | + self.sim.compute_metafile_hashes_length = True |
| 621 | + |
| 622 | + # Initialize all metadata and assign targets version higher than 1. |
| 623 | + self.sim.targets.version = 2 |
| 624 | + self.sim.update_snapshot() |
| 625 | + self._run_refresh() |
| 626 | + |
| 627 | + # The new targets must have a lower version than the local trusted one. |
| 628 | + self.sim.targets.version = 1 |
| 629 | + self.sim.update_snapshot() |
| 630 | + |
| 631 | + # During the snapshot update, the local snapshot will be loaded even if |
| 632 | + # there is a hash mismatch with timestamp.snapshot_meta, because it will |
| 633 | + # be considered as trusted. |
| 634 | + # Should fail as a new version of snapshot will be fetched which lowers |
| 635 | + # the snapshot.meta["targets.json"] version by 1 and throws an error. |
| 636 | + with self.assertRaises(BadVersionNumberError): |
| 637 | + self._run_refresh() |
| 638 | + |
578 | 639 |
|
579 | 640 | if __name__ == "__main__":
|
580 | 641 | if "--dump" in sys.argv:
|
|
0 commit comments