Skip to content

Fix the snapshot summary of a partial overwrite #1879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.utils.deprecated import deprecation_message

if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -356,6 +357,11 @@ def update_snapshot_summaries(
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message="The truncate-full-table shouldn't be used.",
)
summary = _truncate_table_summary(summary, previous_summary)

if not previous_summary:
Expand Down
1 change: 0 additions & 1 deletion pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=self._operation == Operation.OVERWRITE,
)

def _commit(self) -> UpdatesAndRequirements:
Expand Down
20 changes: 9 additions & 11 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,21 +467,19 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
assert snapshots[2].summary == Summary(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i ran this locally. snapshot 0 has the following metadata, which is correct

{
    'added-data-files': '2',
    'added-files-size': '1490',
    'added-records': '5',
    'changed-partition-count': '2',
    'total-data-files': '2',
    'total-delete-files': '0',
    'total-equality-deletes': '0',
    'total-files-size': '1490',
    'total-position-deletes': '0',
    'total-records': '5'
}

but snapshot 1, DELETE op with the positional delete, has the following metadata,

{
    'added-delete-files': '1',
    'added-files-size': '1710',
    'added-position-delete-files': '1',
    'added-position-deletes': '1',
    'changed-partition-count': '1',
    'total-data-files': '2',
    'total-delete-files': '1',
    'total-equality-deletes': '0',
    'total-files-size': '3200',
    'total-position-deletes': '1',
    'total-records': '5'
}

everything looks right except for the total-records. we started off with 5 records, and the DELETE op removed 1 record. So total-records should be 4 here.
Is this a bug in the spark snapshot summary?

Copy link
Contributor Author

@Fokko Fokko Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Looking at it, snapshot summary 1 seems incorrect. Could you open up an issue on the Java side? It would be good to get some historical context around this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to reproduce it using #1926. I'll also open an issue on the java side

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/iceberg#12823 opened an issue on the java side

Operation.OVERWRITE,
**{
"added-files-size": snapshots[2].summary["total-files-size"],
"added-data-files": "1",
"added-files-size": snapshots[2].summary["added-files-size"],
"added-records": "2",
"changed-partition-count": "1",
"total-files-size": snapshots[2].summary["total-files-size"],
"total-delete-files": "0",
"total-data-files": "1",
"total-position-deletes": "0",
"total-records": "2",
"total-equality-deletes": "0",
"deleted-data-files": "2",
"removed-delete-files": "1",
"deleted-records": "5",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": snapshots[2].summary["removed-files-size"],
"removed-position-deletes": "1",
"total-data-files": "2",
"total-delete-files": "1",
"total-equality-deletes": "0",
"total-files-size": snapshots[2].summary["total-files-size"],
"total-position-deletes": "1",
"total-records": "4",
},
)

Expand Down
95 changes: 94 additions & 1 deletion tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
"total-records": "0",
}

# Overwrite
# Append
assert summaries[3] == {
"added-data-files": "1",
"added-files-size": str(file_size),
Expand All @@ -264,6 +264,99 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi
}


@pytest.mark.integration
def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.test_summaries_partial_overwrite"
TEST_DATA = {
"id": [1, 2, 3, 1, 1],
"name": ["AB", "CD", "EF", "CD", "EF"],
}
pa_schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
]
)
arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema)
tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema)
with tbl.update_spec() as txn:
txn.add_identity("id")
tbl.append(arrow_table)

assert len(tbl.inspect.data_files()) == 3

tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file

rows = spark.sql(
f"""
SELECT operation, summary
FROM {identifier}.snapshots
ORDER BY committed_at ASC
"""
).collect()

operations = [row.operation for row in rows]
assert operations == ["append", "overwrite"]

summaries = [row.summary for row in rows]

file_size = int(summaries[0]["added-files-size"])
assert file_size > 0

# APPEND
assert summaries[0] == {
"added-data-files": "3",
"added-files-size": "2570",
"added-records": "5",
"changed-partition-count": "3",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2570",
"total-position-deletes": "0",
"total-records": "5",
}
# Java produces:
# {
# "added-data-files": "1",
# "added-files-size": "707",
# "added-records": "2",
# "app-id": "local-1743678304626",
# "changed-partition-count": "1",
# "deleted-data-files": "1",
# "deleted-records": "3",
# "engine-name": "spark",
# "engine-version": "3.5.5",
# "iceberg-version": "Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)",
# "removed-files-size": "693",
# "spark.app.id": "local-1743678304626",
# "total-data-files": "3",
# "total-delete-files": "0",
# "total-equality-deletes": "0",
# "total-files-size": "1993",
# "total-position-deletes": "0",
# "total-records": "4"
# }
files = tbl.inspect.data_files()
assert len(files) == 3
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "859",
"added-records": "2",
"changed-partition-count": "1",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": "866",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2563",
"total-position-deletes": "0",
"total-records": "4",
}
assert len(tbl.scan().to_pandas()) == 4


@pytest.mark.integration
def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_data_files"
Expand Down
20 changes: 6 additions & 14 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None:
"total-position-deletes": "1",
"total-records": "1",
},
truncate_full_table=True,
)

expected = {
Expand All @@ -299,18 +298,12 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None:
"added-files-size": "4",
"added-position-deletes": "5",
"added-records": "6",
"total-data-files": "1",
"total-records": "6",
"total-delete-files": "2",
"total-equality-deletes": "3",
"total-files-size": "4",
"total-position-deletes": "5",
"deleted-data-files": "1",
"removed-delete-files": "1",
"deleted-records": "1",
"removed-files-size": "1",
"removed-position-deletes": "1",
"removed-equality-deletes": "1",
"total-data-files": "2",
"total-delete-files": "3",
"total-records": "7",
"total-files-size": "5",
"total-position-deletes": "6",
"total-equality-deletes": "4",
}

assert actual.additional_properties == expected
Expand All @@ -337,7 +330,6 @@ def test_invalid_type() -> None:
},
),
previous_summary={"total-data-files": "abc"}, # should be a number
truncate_full_table=True,
)

assert "Could not parse summary property total-data-files to an int: abc" in str(e.value)