From 5bbcb11021e61c12135ff54f3df5a376f1c00435 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Mon, 24 Mar 2025 14:14:10 -0700 Subject: [PATCH 1/6] produce overwrite operation --- tests/integration/test_writes/test_writes.py | 70 ++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 1fe29c684c..45a1cc4c05 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -262,6 +262,76 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi } +@pytest.mark.integration +def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.test_summaries_partial_overwrite" + TEST_DATA = { + "id": [1, 2, 3, 1, 1], + "name": ["AB", "CD", "EF", "CD", "EF"], + } + pa_schema = pa.schema( + [ + pa.field("id", pa.dictionary(pa.int32(), pa.int32(), False)), + pa.field("name", pa.dictionary(pa.int32(), pa.string(), False)), + ] + ) + arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema) + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema) + with tbl.update_spec() as txn: + txn.add_identity("id") # partition by `id` to create 3 data files + tbl.append(arrow_table) # append + tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file + + rows = spark.sql( + f""" + SELECT operation, summary + FROM {identifier}.snapshots + ORDER BY committed_at ASC + """ + ).collect() + + operations = [row.operation for row in rows] + assert operations == ["append", "overwrite"] + + summaries = [row.summary for row in rows] + + file_size = int(summaries[0]["added-files-size"]) + assert file_size > 0 + + # APPEND + assert summaries[0] == { + "added-data-files": "3", + "added-files-size": "2848", + "added-records": "5", + "changed-partition-count": "3", + "total-data-files": "3", + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "2848", + "total-position-deletes": "0", + "total-records": "5", + } + # BUG `deleted-data-files` property is being replaced by the previous summary's `total-data-files` value + # OVERWRITE from tbl.delete + assert summaries[1] == { + "added-data-files": "1", + "added-files-size": "859", + "added-records": "2", # wrong should be 0 + "changed-partition-count": "1", + "deleted-data-files": "3", # wrong should be 1 + "deleted-records": "5", # wrong should be 1 + "removed-files-size": "2848", + "total-data-files": "1", # wrong should be 3 + "total-delete-files": "0", + "total-equality-deletes": "0", + "total-files-size": "859", + "total-position-deletes": "0", + "total-records": "2", # wrong should be 4 + } + assert len(tbl.inspect.data_files()) == 3 + assert len(tbl.scan().to_pandas()) == 4 + + @pytest.mark.integration def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_data_files" From 06ceb128cf778b3abc8e761478fd0a261886b4ac Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 3 Apr 2025 13:10:09 +0200 Subject: [PATCH 2/6] Fix the snapshot summary of a partial overwrite --- pyiceberg/table/snapshots.py | 6 +++ pyiceberg/table/update/snapshot.py | 1 - tests/integration/test_writes/test_writes.py | 52 ++++++++++++++------ tests/table/test_snapshots.py | 2 - 4 files changed, 44 insertions(+), 17 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a5515f12b0..db3fdb94ff 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -28,6 +28,7 @@ from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.utils.deprecated import deprecation_message if TYPE_CHECKING: from pyiceberg.table.metadata import TableMetadata @@ -356,6 +357,11 @@ def update_snapshot_summaries( raise ValueError(f"Operation not implemented: {summary.operation}") if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: + deprecation_message( + deprecated_in="0.10.0", + removed_in="0.11.0", + help_message="The truncate-full-table should be used.", + ) summary = _truncate_table_summary(summary, previous_summary) if not previous_summary: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c705f3b9fd..0aff68520b 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -236,7 +236,6 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: return update_snapshot_summaries( summary=Summary(operation=self._operation, **ssc.build(), **snapshot_properties), previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, - truncate_full_table=self._operation == Operation.OVERWRITE, ) def _commit(self) -> UpdatesAndRequirements: diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 45a1cc4c05..f6545ea959 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -271,15 +271,19 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal } pa_schema = pa.schema( [ - pa.field("id", pa.dictionary(pa.int32(), pa.int32(), False)), - pa.field("name", pa.dictionary(pa.int32(), pa.string(), False)), + pa.field("id", pa.int32(), pa.int32()), + pa.field("name", pa.int32(), pa.string()), ] ) arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema) tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=pa_schema) with tbl.update_spec() as txn: - txn.add_identity("id") # partition by `id` to create 3 data files - tbl.append(arrow_table) # append + txn.add_identity("id") + tbl.append(arrow_table) + + # TODO: We might want to check why this ends up in 3 files + assert len(tbl.inspect.data_files()) == 3 + tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file rows = spark.sql( @@ -311,24 +315,44 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal "total-position-deletes": "0", "total-records": "5", } - # BUG `deleted-data-files` property is being replaced by the previous summary's `total-data-files` value - # OVERWRITE from tbl.delete + # Java produces: + # { + # "added-data-files": "1", + # "added-files-size": "707", + # "added-records": "2", + # "app-id": "local-1743678304626", + # "changed-partition-count": "1", + # "deleted-data-files": "1", + # "deleted-records": "3", + # "engine-name": "spark", + # "engine-version": "3.5.5", + # "iceberg-version": "Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)", + # "removed-files-size": "693", + # "spark.app.id": "local-1743678304626", + # "total-data-files": "3", + # "total-delete-files": "0", + # "total-equality-deletes": "0", + # "total-files-size": "1993", + # "total-position-deletes": "0", + # "total-records": "4" + # } + files = tbl.inspect.data_files() + assert len(files) == 3 assert summaries[1] == { "added-data-files": "1", "added-files-size": "859", - "added-records": "2", # wrong should be 0 + "added-records": "2", "changed-partition-count": "1", - "deleted-data-files": "3", # wrong should be 1 - "deleted-records": "5", # wrong should be 1 - "removed-files-size": "2848", - "total-data-files": "1", # wrong should be 3 + "deleted-data-files": "1", + "deleted-records": "3", + "removed-files-size": "950", + "total-data-files": "3", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "859", + "total-files-size": "2757", "total-position-deletes": "0", - "total-records": "2", # wrong should be 4 + "total-records": "4", } - assert len(tbl.inspect.data_files()) == 3 assert len(tbl.scan().to_pandas()) == 4 diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index b4dde217d4..c6fb401d89 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -289,7 +289,6 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: "total-position-deletes": "1", "total-records": "1", }, - truncate_full_table=True, ) expected = { @@ -337,7 +336,6 @@ def test_invalid_type() -> None: }, ), previous_summary={"total-data-files": "abc"}, # should be a number - truncate_full_table=True, ) assert "Could not parse summary property total-data-files to an int: abc" in str(e.value) From ecf029d44295fcba50a6bceedd942f27fe1bd5cd Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 3 Apr 2025 20:18:33 +0200 Subject: [PATCH 3/6] Fix moar tests --- tests/integration/test_writes/test_writes.py | 2 +- tests/table/test_snapshots.py | 18 ++++++------------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index f6545ea959..2918782e50 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -248,7 +248,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi "total-records": "0", } - # Overwrite + # Append assert summaries[3] == { "added-data-files": "1", "added-files-size": str(file_size), diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index c6fb401d89..f9f7fb026e 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -298,18 +298,12 @@ def test_merge_snapshot_summaries_overwrite_summary() -> None: "added-files-size": "4", "added-position-deletes": "5", "added-records": "6", - "total-data-files": "1", - "total-records": "6", - "total-delete-files": "2", - "total-equality-deletes": "3", - "total-files-size": "4", - "total-position-deletes": "5", - "deleted-data-files": "1", - "removed-delete-files": "1", - "deleted-records": "1", - "removed-files-size": "1", - "removed-position-deletes": "1", - "removed-equality-deletes": "1", + "total-data-files": "2", + "total-delete-files": "3", + "total-records": "7", + "total-files-size": "5", + "total-position-deletes": "6", + "total-equality-deletes": "4", } assert actual.additional_properties == expected From 3f211f9c31f54237b658024ea5baaa6184f7b162 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 3 Apr 2025 20:47:08 +0200 Subject: [PATCH 4/6] Fix moar tests --- tests/integration/test_deletes.py | 20 +++++++++----------- tests/integration/test_writes/test_writes.py | 12 ++++++------ 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index ae03beea53..527f659640 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -467,21 +467,19 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio assert snapshots[2].summary == Summary( Operation.OVERWRITE, **{ - "added-files-size": snapshots[2].summary["total-files-size"], "added-data-files": "1", + "added-files-size": snapshots[2].summary["added-files-size"], "added-records": "2", "changed-partition-count": "1", - "total-files-size": snapshots[2].summary["total-files-size"], - "total-delete-files": "0", - "total-data-files": "1", - "total-position-deletes": "0", - "total-records": "2", - "total-equality-deletes": "0", - "deleted-data-files": "2", - "removed-delete-files": "1", - "deleted-records": "5", + "deleted-data-files": "1", + "deleted-records": "3", "removed-files-size": snapshots[2].summary["removed-files-size"], - "removed-position-deletes": "1", + "total-data-files": "2", + "total-delete-files": "1", + "total-equality-deletes": "0", + "total-files-size": snapshots[2].summary["total-files-size"], + "total-position-deletes": "1", + "total-records": "4", }, ) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2918782e50..5bd8aab180 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -271,8 +271,8 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal } pa_schema = pa.schema( [ - pa.field("id", pa.int32(), pa.int32()), - pa.field("name", pa.int32(), pa.string()), + pa.field("id", pa.int32()), + pa.field("name", pa.string()), ] ) arrow_table = pa.Table.from_pydict(TEST_DATA, schema=pa_schema) @@ -305,13 +305,13 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal # APPEND assert summaries[0] == { "added-data-files": "3", - "added-files-size": "2848", + "added-files-size": "2570", "added-records": "5", "changed-partition-count": "3", "total-data-files": "3", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "2848", + "total-files-size": "2570", "total-position-deletes": "0", "total-records": "5", } @@ -345,11 +345,11 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal "changed-partition-count": "1", "deleted-data-files": "1", "deleted-records": "3", - "removed-files-size": "950", + "removed-files-size": "866", "total-data-files": "3", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "2757", + "total-files-size": "2563", "total-position-deletes": "0", "total-records": "4", } From b2d2c79d84a37eeace5715f5c60ff513cfd7bfbf Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 15 Apr 2025 12:15:30 +0200 Subject: [PATCH 5/6] Remove comment Co-authored-by: Kevin Liu --- tests/integration/test_writes/test_writes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 5bd8aab180..857ac1cdac 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -281,7 +281,6 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal txn.add_identity("id") tbl.append(arrow_table) - # TODO: We might want to check why this ends up in 3 files assert len(tbl.inspect.data_files()) == 3 tbl.delete(delete_filter="id == 1 and name = 'AB'") # partial overwrite data from 1 data file From 6ea575e0d72defbce172e58fe84ef1cdf4e478b2 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Tue, 15 Apr 2025 12:16:58 +0200 Subject: [PATCH 6/6] Update deprecation --- pyiceberg/table/snapshots.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index db3fdb94ff..af3f040482 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -360,7 +360,7 @@ def update_snapshot_summaries( deprecation_message( deprecated_in="0.10.0", removed_in="0.11.0", - help_message="The truncate-full-table should be used.", + help_message="The truncate-full-table shouldn't be used.", ) summary = _truncate_table_summary(summary, previous_summary)