From f331ad5a5567e15fdd6710ea8fccc3ab313c7d5f Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Sat, 25 May 2024 14:52:54 +0530 Subject: [PATCH 1/8] incremental changelog implementation --- pyiceberg/table/__init__.py | 161 ++++++++++++++++++++++++++++++++ tests/integration/test_reads.py | 20 ++++ 2 files changed, 181 insertions(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c57f0d1297..cca6abf578 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import collections import itertools import uuid import warnings @@ -1235,6 +1236,24 @@ def scan( limit=limit, ) + def incremental_changelog_scan( + self, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ) -> IncrementalChangelogScan: + return IncrementalChangelogScan( + table_metadata=self.metadata, + io=self.io, + row_filter=row_filter, + selected_fields=selected_fields, + case_sensitive=case_sensitive, + options=options, + limit=limit, + ) + @property def format_version(self) -> TableVersion: return self.metadata.format_version @@ -1587,6 +1606,42 @@ def __init__( self.length = length or data_file.file_size_in_bytes +class ChangelogOperation(Enum): + Insert = 1 + Delete = 2 + UpdateBefore = 3 + UpdateAfter = 4 + + +@dataclass(init=False) +class ChangelogScanTask(ScanTask): + file: DataFile + delete_files: Set[DataFile] + start: int + length: int + change_type: ChangelogOperation + change_ordinal: int + commit_snapshot_id: int + + def __init__( + self, + data_file: DataFile, + change_type: ChangelogOperation, + change_ordinal: int, + commit_snapshot_id: int, + delete_files: Optional[Set[DataFile]] = None, + start: Optional[int] = None, + length: Optional[int] = None, + ) -> None: + self.file = data_file + self.change_type = change_type + self.change_ordinal = change_ordinal + self.commit_snapshot_id = commit_snapshot_id + self.delete_files = delete_files or set() + self.start = start or 0 + self.length = length or data_file.file_size_in_bytes + + def _open_manifest( io: FileIO, manifest: ManifestFile, @@ -1778,6 +1833,112 @@ def to_ray(self) -> ray.data.dataset.Dataset: return ray.data.from_arrow(self.to_arrow()) +class IncrementalChangelogScan(BaseIncrementalScan): + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + to_snapshot_id: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + options, + limit, + to_snapshot_id, + from_snapshot_id_exclusive, + ) + + def use_ref(self: S, name: str) -> S: + raise NotImplementedError("Not implemented for IncrementalChangelogScan yet.") + + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[ChangelogScanTask]: + snapshots_between = collections.deque() + for snapshot in ancestors_between(to_snapshot_id, from_snapshot_id_exclusive, self.table_metadata): + if snapshot.summary.operation != Operation.REPLACE: # type: ignore + for manifest_file in self.table_metadata.snapshot_by_id(snapshot.snapshot_id).manifests(self.io): + if manifest_file.content == ManifestContent.DELETES: + raise "Delete files are currently not supported in changelog scans" + snapshots_between.appendleft(snapshot) + + if not snapshots_between: + return iter([]) + + snapshot_ids = {snapshot.snapshot_id for snapshot in snapshots_between} + snapshot_ordinal = {snapshot.snapshot_id: ordinal for ordinal, snapshot in enumerate(snapshots_between)} + + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + + manifests = { + manifest_file + for snapshot_id in snapshot_ids + for manifest_file in self.table_metadata.snapshot_by_id(snapshot_id).manifests(self.io) # type: ignore + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + if manifest_file.added_snapshot_id in snapshot_ids + } + + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file + + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + self.table_metadata.schema(), + self.row_filter, + self.case_sensitive, + self.options.get("include_empty_files") == "true", + ).eval + + min_data_sequence_number = _min_data_file_sequence_number(manifests) # type: ignore + + changelog_entries: List[ManifestEntry] = [] + + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + ( + self.io, + manifest, + partition_evaluators[manifest.partition_spec_id], + metrics_evaluator, + ) + for manifest in manifests + if self._check_sequence_number(min_data_sequence_number, manifest) + ], + ) + ): + if manifest_entry.snapshot_id in snapshot_ids: + changelog_entries.append(manifest_entry) + + for entry in changelog_entries: + if entry.status == ManifestEntryStatus.ADDED: + operation = ChangelogOperation.Insert + elif entry.status == ManifestEntryStatus.DELETED: + operation = ChangelogOperation.Delete + else: + raise f"Unexpected entry status: {entry.status}" + + yield ChangelogScanTask( + data_file=entry.data_file, + commit_snapshot_id=entry.snapshot_id, + change_type=operation, + change_ordinal=snapshot_ordinal[entry.snapshot_id] + ) + + class MoveOperation(Enum): First = 1 Before = 2 diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 2a10e37ba9..8d6af7691a 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -423,6 +423,26 @@ def test_scan_branch(catalog: Catalog) -> None: assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12] +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_incremental_changelog_scan(catalog: Catalog) -> None: + print(catalog.list_tables("default")) + test_table = catalog.load_table("default.test_table_read_from_snapshots") + + scan = test_table.incremental_changelog_scan().from_snapshot_exclusive(test_table.history()[0].snapshot_id) + assert len(list(scan.plan_files())) == 3 + + scan = test_table.incremental_changelog_scan().to_snapshot(test_table.history()[1].snapshot_id) + assert len(list(scan.plan_files())) == 2 + + scan = ( + test_table.incremental_changelog_scan() + .from_snapshot_exclusive(test_table.history()[0].snapshot_id) + .to_snapshot(test_table.history()[2].snapshot_id) + ) + assert len(list(scan.plan_files())) == 2 + + @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_filter_on_new_column(catalog: Catalog) -> None: From 3b84a3abcdd21a8fa00a5f7dae16c745f0691e3b Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 15:00:26 +0530 Subject: [PATCH 2/8] add table to provision --- dev/provision.py | 49 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/dev/provision.py b/dev/provision.py index 44086caf20..6572cadb29 100644 --- a/dev/provision.py +++ b/dev/provision.py @@ -342,3 +342,52 @@ (array(), map(), array(struct(1))) """ ) + + spark.sql( + f""" + CREATE OR REPLACE TABLE {catalog_name}.default.test_table_read_from_snapshots ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'format-version'='2' + ); + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-01' AS date), 1, 'a') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-01' AS date), 2, 'b') + """ + ) + + spark.sql( + f""" + DELETE FROM {catalog_name}.default.test_table_read_from_snapshots + WHERE number = 2 + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-02' AS date), 3, 'c') + """ + ) + + spark.sql( + f""" + INSERT INTO {catalog_name}.default.test_table_read_from_snapshots + VALUES (CAST('2022-03-02' AS date), 4, 'd') + """ + ) From 34daf7cde0c0ce8806aabd8e815f9f5365f357c9 Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 15:17:25 +0530 Subject: [PATCH 3/8] don't discard manifest entries of delete status by default --- pyiceberg/table/__init__.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cca6abf578..bbf4d5d6f2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1647,10 +1647,11 @@ def _open_manifest( manifest: ManifestFile, partition_filter: Callable[[DataFile], bool], metrics_evaluator: Callable[[DataFile], bool], + discard_deleted: bool = True, ) -> List[ManifestEntry]: return [ manifest_entry - for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True) + for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=discard_deleted) if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file) ] @@ -1909,12 +1910,7 @@ def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) - *executor.map( lambda args: _open_manifest(*args), [ - ( - self.io, - manifest, - partition_evaluators[manifest.partition_spec_id], - metrics_evaluator, - ) + (self.io, manifest, partition_evaluators[manifest.partition_spec_id], metrics_evaluator, False) for manifest in manifests if self._check_sequence_number(min_data_sequence_number, manifest) ], From a0e28cce7cb020cf347db98d80e7a2b97a36da5a Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 15:19:55 +0530 Subject: [PATCH 4/8] fix indent --- pyiceberg/table/__init__.py | 144 ++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bbf4d5d6f2..4dbbf69207 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1624,14 +1624,14 @@ class ChangelogScanTask(ScanTask): commit_snapshot_id: int def __init__( - self, - data_file: DataFile, - change_type: ChangelogOperation, - change_ordinal: int, - commit_snapshot_id: int, - delete_files: Optional[Set[DataFile]] = None, - start: Optional[int] = None, - length: Optional[int] = None, + self, + data_file: DataFile, + change_type: ChangelogOperation, + change_ordinal: int, + commit_snapshot_id: int, + delete_files: Optional[Set[DataFile]] = None, + start: Optional[int] = None, + length: Optional[int] = None, ) -> None: self.file = data_file self.change_type = change_type @@ -1859,80 +1859,80 @@ def __init__( from_snapshot_id_exclusive, ) - def use_ref(self: S, name: str) -> S: - raise NotImplementedError("Not implemented for IncrementalChangelogScan yet.") - - def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[ChangelogScanTask]: - snapshots_between = collections.deque() - for snapshot in ancestors_between(to_snapshot_id, from_snapshot_id_exclusive, self.table_metadata): - if snapshot.summary.operation != Operation.REPLACE: # type: ignore - for manifest_file in self.table_metadata.snapshot_by_id(snapshot.snapshot_id).manifests(self.io): - if manifest_file.content == ManifestContent.DELETES: - raise "Delete files are currently not supported in changelog scans" - snapshots_between.appendleft(snapshot) - - if not snapshots_between: - return iter([]) + def use_ref(self: S, name: str) -> S: + raise NotImplementedError("Not implemented for IncrementalChangelogScan yet.") + + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[ChangelogScanTask]: + snapshots_between = collections.deque() + for snapshot in ancestors_between(to_snapshot_id, from_snapshot_id_exclusive, self.table_metadata): + if snapshot.summary.operation != Operation.REPLACE: # type: ignore + for manifest_file in self.table_metadata.snapshot_by_id(snapshot.snapshot_id).manifests(self.io): + if manifest_file.content == ManifestContent.DELETES: + raise "Delete files are currently not supported in changelog scans" + snapshots_between.appendleft(snapshot) + + if not snapshots_between: + return iter([]) - snapshot_ids = {snapshot.snapshot_id for snapshot in snapshots_between} - snapshot_ordinal = {snapshot.snapshot_id: ordinal for ordinal, snapshot in enumerate(snapshots_between)} + snapshot_ids = {snapshot.snapshot_id for snapshot in snapshots_between} + snapshot_ordinal = {snapshot.snapshot_id: ordinal for ordinal, snapshot in enumerate(snapshots_between)} - # step 1: filter manifests using partition summaries - # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id + # step 1: filter manifests using partition summaries + # the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id - manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) - manifests = { - manifest_file - for snapshot_id in snapshot_ids - for manifest_file in self.table_metadata.snapshot_by_id(snapshot_id).manifests(self.io) # type: ignore - if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) - if manifest_file.added_snapshot_id in snapshot_ids - } + manifests = { + manifest_file + for snapshot_id in snapshot_ids + for manifest_file in self.table_metadata.snapshot_by_id(snapshot_id).manifests(self.io) # type: ignore + if manifest_evaluators[manifest_file.partition_spec_id](manifest_file) + if manifest_file.added_snapshot_id in snapshot_ids + } - # step 2: filter the data files in each manifest - # this filter depends on the partition spec used to write the manifest file + # step 2: filter the data files in each manifest + # this filter depends on the partition spec used to write the manifest file - partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) - metrics_evaluator = _InclusiveMetricsEvaluator( - self.table_metadata.schema(), - self.row_filter, - self.case_sensitive, - self.options.get("include_empty_files") == "true", - ).eval + partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) + metrics_evaluator = _InclusiveMetricsEvaluator( + self.table_metadata.schema(), + self.row_filter, + self.case_sensitive, + self.options.get("include_empty_files") == "true", + ).eval - min_data_sequence_number = _min_data_file_sequence_number(manifests) # type: ignore + min_data_sequence_number = _min_data_file_sequence_number(manifests) # type: ignore - changelog_entries: List[ManifestEntry] = [] + changelog_entries: List[ManifestEntry] = [] - executor = ExecutorFactory.get_or_create() - for manifest_entry in chain( - *executor.map( - lambda args: _open_manifest(*args), - [ - (self.io, manifest, partition_evaluators[manifest.partition_spec_id], metrics_evaluator, False) - for manifest in manifests - if self._check_sequence_number(min_data_sequence_number, manifest) - ], - ) - ): - if manifest_entry.snapshot_id in snapshot_ids: - changelog_entries.append(manifest_entry) - - for entry in changelog_entries: - if entry.status == ManifestEntryStatus.ADDED: - operation = ChangelogOperation.Insert - elif entry.status == ManifestEntryStatus.DELETED: - operation = ChangelogOperation.Delete - else: - raise f"Unexpected entry status: {entry.status}" + executor = ExecutorFactory.get_or_create() + for manifest_entry in chain( + *executor.map( + lambda args: _open_manifest(*args), + [ + (self.io, manifest, partition_evaluators[manifest.partition_spec_id], metrics_evaluator, False) + for manifest in manifests + if self._check_sequence_number(min_data_sequence_number, manifest) + ], + ) + ): + if manifest_entry.snapshot_id in snapshot_ids: + changelog_entries.append(manifest_entry) + + for entry in changelog_entries: + if entry.status == ManifestEntryStatus.ADDED: + operation = ChangelogOperation.Insert + elif entry.status == ManifestEntryStatus.DELETED: + operation = ChangelogOperation.Delete + else: + raise f"Unexpected entry status: {entry.status}" - yield ChangelogScanTask( - data_file=entry.data_file, - commit_snapshot_id=entry.snapshot_id, - change_type=operation, - change_ordinal=snapshot_ordinal[entry.snapshot_id] - ) + yield ChangelogScanTask( + data_file=entry.data_file, + commit_snapshot_id=entry.snapshot_id, + change_type=operation, + change_ordinal=snapshot_ordinal[entry.snapshot_id] + ) class MoveOperation(Enum): From 30f34d751bf697c40f5ee1d64c58688109da7e15 Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 15:20:08 +0530 Subject: [PATCH 5/8] update test --- tests/integration/test_reads.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 8d6af7691a..34ca00208a 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -430,10 +430,10 @@ def test_incremental_changelog_scan(catalog: Catalog) -> None: test_table = catalog.load_table("default.test_table_read_from_snapshots") scan = test_table.incremental_changelog_scan().from_snapshot_exclusive(test_table.history()[0].snapshot_id) - assert len(list(scan.plan_files())) == 3 + assert len(list(scan.plan_files())) == 4 - scan = test_table.incremental_changelog_scan().to_snapshot(test_table.history()[1].snapshot_id) - assert len(list(scan.plan_files())) == 2 + scan = test_table.incremental_changelog_scan().to_snapshot(test_table.history()[4].snapshot_id) + assert len(list(scan.plan_files())) == 5 scan = ( test_table.incremental_changelog_scan() From 774acf0c092b1997d220bc688a6591be3ccaba0a Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 15:33:21 +0530 Subject: [PATCH 6/8] remove unused parameters --- pyiceberg/table/__init__.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 4dbbf69207..662d6c502d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1616,9 +1616,6 @@ class ChangelogOperation(Enum): @dataclass(init=False) class ChangelogScanTask(ScanTask): file: DataFile - delete_files: Set[DataFile] - start: int - length: int change_type: ChangelogOperation change_ordinal: int commit_snapshot_id: int @@ -1629,17 +1626,11 @@ def __init__( change_type: ChangelogOperation, change_ordinal: int, commit_snapshot_id: int, - delete_files: Optional[Set[DataFile]] = None, - start: Optional[int] = None, - length: Optional[int] = None, ) -> None: self.file = data_file self.change_type = change_type self.change_ordinal = change_ordinal self.commit_snapshot_id = commit_snapshot_id - self.delete_files = delete_files or set() - self.start = start or 0 - self.length = length or data_file.file_size_in_bytes def _open_manifest( From a15fef6b4102802877dbe6ba97f0cfd195b15ab3 Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 16:03:22 +0530 Subject: [PATCH 7/8] rename variable --- pyiceberg/table/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 662d6c502d..af9d767a22 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1618,19 +1618,19 @@ class ChangelogScanTask(ScanTask): file: DataFile change_type: ChangelogOperation change_ordinal: int - commit_snapshot_id: int + change_snapshot_id: int def __init__( self, data_file: DataFile, change_type: ChangelogOperation, change_ordinal: int, - commit_snapshot_id: int, + change_snapshot_id: int, ) -> None: self.file = data_file self.change_type = change_type self.change_ordinal = change_ordinal - self.commit_snapshot_id = commit_snapshot_id + self.change_snapshot_id = change_snapshot_id def _open_manifest( @@ -1920,7 +1920,7 @@ def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) - yield ChangelogScanTask( data_file=entry.data_file, - commit_snapshot_id=entry.snapshot_id, + change_snapshot_id=entry.snapshot_id, change_type=operation, change_ordinal=snapshot_ordinal[entry.snapshot_id] ) From d7bf28b7311cd710db46eade9b56c12598eb5541 Mon Sep 17 00:00:00 2001 From: chinmay-bhat <12948588+chinmay-bhat@users.noreply.github.com> Date: Thu, 30 May 2024 16:06:08 +0530 Subject: [PATCH 8/8] remove print --- tests/integration/test_reads.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 34ca00208a..b97fea6959 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -426,7 +426,6 @@ def test_scan_branch(catalog: Catalog) -> None: @pytest.mark.integration @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_incremental_changelog_scan(catalog: Catalog) -> None: - print(catalog.list_tables("default")) test_table = catalog.load_table("default.test_table_read_from_snapshots") scan = test_table.incremental_changelog_scan().from_snapshot_exclusive(test_table.history()[0].snapshot_id)