diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index 56574ff471..c80f104e46 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -122,3 +122,7 @@ class CommitStateUnknownException(RESTError): class WaitingForLockException(Exception): """Need to wait for a lock, try again.""" + + +class ValidationException(Exception): + """Raised when validation fails.""" diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index af3f040482..927a071a78 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -435,3 +435,16 @@ def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMeta if snapshot.parent_snapshot_id is None: break snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id) + + +def ancestors_between( + to_snapshot: Snapshot, from_snapshot: Optional[Snapshot], table_metadata: TableMetadata +) -> Iterable[Snapshot]: + """Get the ancestors of and including the given snapshot between the to and from snapshots.""" + if from_snapshot is not None: + for snapshot in ancestors_of(to_snapshot, table_metadata): + yield snapshot + if snapshot == from_snapshot: + break + else: + yield from ancestors_of(to_snapshot, table_metadata) diff --git a/pyiceberg/table/update/validate.py b/pyiceberg/table/update/validate.py new file mode 100644 index 0000000000..7caaf1d521 --- /dev/null +++ b/pyiceberg/table/update/validate.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyiceberg.exceptions import ValidationException +from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between + + +def validation_history( + table: Table, + to_snapshot: Snapshot, + from_snapshot: Snapshot, + matching_operations: set[Operation], + manifest_content_filter: ManifestContent, +) -> tuple[list[ManifestFile], set[int]]: + """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. + + Args: + table: Table to get the history from + to_snapshot: Starting snapshot + from_snapshot: Parent snapshot to get the history from + matching_operations: Operations to match on + manifest_content_filter: Manifest content type to filter + + Raises: + ValidationException: If no matching snapshot is found or only one snapshot is found + + Returns: + List of manifest files and set of snapshots ID's matching conditions + """ + manifests_files: list[ManifestFile] = [] + snapshots: set[int] = set() + + last_snapshot = None + for snapshot in ancestors_between(to_snapshot, from_snapshot, table.metadata): + last_snapshot = snapshot + summary = snapshot.summary + if summary is None: + raise ValidationException(f"No summary found for snapshot {snapshot}!") + if summary.operation not in matching_operations: + continue + + snapshots.add(snapshot.snapshot_id) + # TODO: Maybe do the IO in a separate thread at some point, and collect at the bottom (we can easily merge the sets + manifests_files.extend( + [ + manifest + for manifest in snapshot.manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter + ] + ) + + if last_snapshot is not None and last_snapshot.snapshot_id != from_snapshot.snapshot_id: + raise ValidationException("No matching snapshot found.") + + return manifests_files, snapshots diff --git a/tests/table/test_init.py b/tests/table/test_init.py index acc31d4722..dbac84bd81 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -57,7 +57,6 @@ Snapshot, SnapshotLogEntry, Summary, - ancestors_of, ) from pyiceberg.table.sorting import ( NullOrder, @@ -225,44 +224,6 @@ def test_snapshot_by_timestamp(table_v2: Table) -> None: assert table_v2.snapshot_as_of_timestamp(1515100955770, inclusive=False) is None -def test_ancestors_of(table_v2: Table) -> None: - assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ - Snapshot( - snapshot_id=3055729675574597004, - parent_snapshot_id=3051729675574597004, - sequence_number=1, - timestamp_ms=1555100955770, - manifest_list="s3://a/b/2.avro", - summary=Summary(Operation.APPEND), - schema_id=1, - ), - Snapshot( - snapshot_id=3051729675574597004, - parent_snapshot_id=None, - sequence_number=0, - timestamp_ms=1515100955770, - manifest_list="s3://a/b/1.avro", - summary=Summary(Operation.APPEND), - schema_id=None, - ), - ] - - -def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: - # Test RecursionError: maximum recursion depth exceeded - assert ( - len( - list( - ancestors_of( - table_v2_with_extensive_snapshots.current_snapshot(), - table_v2_with_extensive_snapshots.metadata, - ) - ) - ) - == 2000 - ) - - def test_snapshot_by_id_does_not_exist(table_v2: Table) -> None: assert table_v2.snapshot_by_id(-1) is None diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 24d5f0ffff..0f7e9fb5c4 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -15,12 +15,23 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name,eval-used +from typing import cast + import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries +from pyiceberg.table import Table +from pyiceberg.table.snapshots import ( + Operation, + Snapshot, + SnapshotSummaryCollector, + Summary, + ancestors_between, + ancestors_of, + update_snapshot_summaries, +) from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import Record from pyiceberg.types import ( @@ -333,3 +344,58 @@ def test_invalid_type() -> None: ) assert "Could not parse summary property total-data-files to an int: abc" in str(e.value) + + +def test_ancestors_of(table_v2: Table) -> None: + assert list(ancestors_of(table_v2.current_snapshot(), table_v2.metadata)) == [ + Snapshot( + snapshot_id=3055729675574597004, + parent_snapshot_id=3051729675574597004, + sequence_number=1, + timestamp_ms=1555100955770, + manifest_list="s3://a/b/2.avro", + summary=Summary(Operation.APPEND), + schema_id=1, + ), + Snapshot( + snapshot_id=3051729675574597004, + parent_snapshot_id=None, + sequence_number=0, + timestamp_ms=1515100955770, + manifest_list="s3://a/b/1.avro", + summary=Summary(Operation.APPEND), + schema_id=None, + ), + ] + + +def test_ancestors_of_recursive_error(table_v2_with_extensive_snapshots: Table) -> None: + # Test RecursionError: maximum recursion depth exceeded + assert ( + len( + list( + ancestors_of( + table_v2_with_extensive_snapshots.current_snapshot(), + table_v2_with_extensive_snapshots.metadata, + ) + ) + ) + == 2000 + ) + + +def test_ancestors_between(table_v2_with_extensive_snapshots: Table) -> None: + oldest_snapshot = table_v2_with_extensive_snapshots.snapshots()[0] + current_snapshot = cast(Snapshot, table_v2_with_extensive_snapshots.current_snapshot()) + assert ( + len( + list( + ancestors_between( + current_snapshot, + oldest_snapshot, + table_v2_with_extensive_snapshots.metadata, + ) + ) + ) + == 2000 + ) diff --git a/tests/table/test_validate.py b/tests/table/test_validate.py new file mode 100644 index 0000000000..eac3733f2d --- /dev/null +++ b/tests/table/test_validate.py @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name,eval-used +from typing import cast +from unittest.mock import patch + +import pytest + +from pyiceberg.exceptions import ValidationException +from pyiceberg.io import FileIO +from pyiceberg.manifest import ManifestContent, ManifestFile +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot +from pyiceberg.table.update.validate import validation_history + + +@pytest.fixture +def table_v2_with_extensive_snapshots_and_manifests( + table_v2_with_extensive_snapshots: Table, +) -> tuple[Table, dict[int, list[ManifestFile]]]: + """Fixture to create a table with extensive snapshots and manifests.""" + mock_manifests = {} + + for i, snapshot in enumerate(table_v2_with_extensive_snapshots.snapshots()): + mock_manifest = ManifestFile.from_args( + manifest_path=f"foo/bar/{i}", + manifest_length=1, + partition_spec_id=1, + content=ManifestContent.DATA if i % 2 == 0 else ManifestContent.DELETES, + sequence_number=1, + min_sequence_number=1, + added_snapshot_id=snapshot.snapshot_id, + ) + + # Store the manifest for this specific snapshot + mock_manifests[snapshot.snapshot_id] = [mock_manifest] + + return table_v2_with_extensive_snapshots, mock_manifests + + +def test_validation_history(table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]]) -> None: + """Test the validation history function.""" + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + expected_manifest_data_counts = len([m for m in mock_manifests.values() if m[0].content == ManifestContent.DATA]) + + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + manifests, snapshots = validation_history( + table, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + ) + + assert len(manifests) == expected_manifest_data_counts + + +def test_validation_history_fails_on_snapshot_with_no_summary( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + """Test the validation history function fails on snapshot with no summary.""" + table, _ = table_v2_with_extensive_snapshots_and_manifests + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + # Create a snapshot with no summary + snapshot_with_no_summary = Snapshot( + snapshot_id="1234", + parent_id="5678", + timestamp_ms=0, + operation=Operation.APPEND, + summary=None, + manifest_list="foo/bar", + ) + with patch("pyiceberg.table.update.validate.ancestors_between", return_value=[snapshot_with_no_summary]): + with pytest.raises(ValidationException): + validation_history( + table, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + ) + + +def test_validation_history_fails_on_from_snapshot_not_matching_last_snapshot( + table_v2_with_extensive_snapshots_and_manifests: tuple[Table, dict[int, list[ManifestFile]]], +) -> None: + """Test the validation history function fails when from_snapshot doesn't match last_snapshot.""" + table, mock_manifests = table_v2_with_extensive_snapshots_and_manifests + + oldest_snapshot = table.snapshots()[0] + newest_snapshot = cast(Snapshot, table.current_snapshot()) + + def mock_read_manifest_side_effect(self: Snapshot, io: FileIO) -> list[ManifestFile]: + """Mock the manifests method to use the snapshot_id for lookup.""" + snapshot_id = self.snapshot_id + if snapshot_id in mock_manifests: + return mock_manifests[snapshot_id] + return [] + + missing_oldest_snapshot = table.snapshots()[1:] + + with patch("pyiceberg.table.snapshots.Snapshot.manifests", new=mock_read_manifest_side_effect): + with patch("pyiceberg.table.update.validate.ancestors_between", return_value=missing_oldest_snapshot): + with pytest.raises(ValidationException): + validation_history( + table, + newest_snapshot, + oldest_snapshot, + {Operation.APPEND}, + ManifestContent.DATA, + )