Skip to content

Incremental Changelog Scan #782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,52 @@
(array(), map(), array(struct(1)))
"""
)

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_table_read_from_snapshots (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'format-version'='2'
);
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_read_from_snapshots
VALUES (CAST('2022-03-01' AS date), 1, 'a')
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_read_from_snapshots
VALUES (CAST('2022-03-01' AS date), 2, 'b')
"""
)

spark.sql(
f"""
DELETE FROM {catalog_name}.default.test_table_read_from_snapshots
WHERE number = 2
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_read_from_snapshots
VALUES (CAST('2022-03-02' AS date), 3, 'c')
"""
)

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_table_read_from_snapshots
VALUES (CAST('2022-03-02' AS date), 4, 'd')
"""
)
150 changes: 149 additions & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import collections
import itertools
import uuid
import warnings
Expand Down Expand Up @@ -1235,6 +1236,24 @@ def scan(
limit=limit,
)

def incremental_changelog_scan(
self,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> IncrementalChangelogScan:
return IncrementalChangelogScan(
table_metadata=self.metadata,
io=self.io,
row_filter=row_filter,
selected_fields=selected_fields,
case_sensitive=case_sensitive,
options=options,
limit=limit,
)

@property
def format_version(self) -> TableVersion:
return self.metadata.format_version
Expand Down Expand Up @@ -1587,15 +1606,43 @@ def __init__(
self.length = length or data_file.file_size_in_bytes


class ChangelogOperation(Enum):
Insert = 1
Delete = 2
UpdateBefore = 3
UpdateAfter = 4


@dataclass(init=False)
class ChangelogScanTask(ScanTask):
file: DataFile
change_type: ChangelogOperation
change_ordinal: int
change_snapshot_id: int

def __init__(
self,
data_file: DataFile,
change_type: ChangelogOperation,
change_ordinal: int,
change_snapshot_id: int,
) -> None:
self.file = data_file
self.change_type = change_type
self.change_ordinal = change_ordinal
self.change_snapshot_id = change_snapshot_id


def _open_manifest(
io: FileIO,
manifest: ManifestFile,
partition_filter: Callable[[DataFile], bool],
metrics_evaluator: Callable[[DataFile], bool],
discard_deleted: bool = True,
) -> List[ManifestEntry]:
return [
manifest_entry
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=discard_deleted)
if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file)
]

Expand Down Expand Up @@ -1778,6 +1825,107 @@ def to_ray(self) -> ray.data.dataset.Dataset:
return ray.data.from_arrow(self.to_arrow())


class IncrementalChangelogScan(BaseIncrementalScan):
def __init__(
self,
table_metadata: TableMetadata,
io: FileIO,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
to_snapshot_id: Optional[int] = None,
from_snapshot_id_exclusive: Optional[int] = None,
):
super().__init__(
table_metadata,
io,
row_filter,
selected_fields,
case_sensitive,
options,
limit,
to_snapshot_id,
from_snapshot_id_exclusive,
)

def use_ref(self: S, name: str) -> S:
raise NotImplementedError("Not implemented for IncrementalChangelogScan yet.")

def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[ChangelogScanTask]:
snapshots_between = collections.deque()
for snapshot in ancestors_between(to_snapshot_id, from_snapshot_id_exclusive, self.table_metadata):
if snapshot.summary.operation != Operation.REPLACE: # type: ignore
for manifest_file in self.table_metadata.snapshot_by_id(snapshot.snapshot_id).manifests(self.io):
if manifest_file.content == ManifestContent.DELETES:
raise "Delete files are currently not supported in changelog scans"
snapshots_between.appendleft(snapshot)

if not snapshots_between:
return iter([])

snapshot_ids = {snapshot.snapshot_id for snapshot in snapshots_between}
snapshot_ordinal = {snapshot.snapshot_id: ordinal for ordinal, snapshot in enumerate(snapshots_between)}

# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

manifests = {
manifest_file
for snapshot_id in snapshot_ids
for manifest_file in self.table_metadata.snapshot_by_id(snapshot_id).manifests(self.io) # type: ignore
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
if manifest_file.added_snapshot_id in snapshot_ids
}

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
metrics_evaluator = _InclusiveMetricsEvaluator(
self.table_metadata.schema(),
self.row_filter,
self.case_sensitive,
self.options.get("include_empty_files") == "true",
).eval

min_data_sequence_number = _min_data_file_sequence_number(manifests) # type: ignore

changelog_entries: List[ManifestEntry] = []

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(self.io, manifest, partition_evaluators[manifest.partition_spec_id], metrics_evaluator, False)
for manifest in manifests
if self._check_sequence_number(min_data_sequence_number, manifest)
],
)
):
if manifest_entry.snapshot_id in snapshot_ids:
changelog_entries.append(manifest_entry)

for entry in changelog_entries:
if entry.status == ManifestEntryStatus.ADDED:
operation = ChangelogOperation.Insert
elif entry.status == ManifestEntryStatus.DELETED:
operation = ChangelogOperation.Delete
else:
raise f"Unexpected entry status: {entry.status}"

yield ChangelogScanTask(
data_file=entry.data_file,
change_snapshot_id=entry.snapshot_id,
change_type=operation,
change_ordinal=snapshot_ordinal[entry.snapshot_id]
)


class MoveOperation(Enum):
First = 1
Before = 2
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,25 @@ def test_scan_branch(catalog: Catalog) -> None:
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12]


@pytest.mark.integration
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')])
def test_incremental_changelog_scan(catalog: Catalog) -> None:
test_table = catalog.load_table("default.test_table_read_from_snapshots")

scan = test_table.incremental_changelog_scan().from_snapshot_exclusive(test_table.history()[0].snapshot_id)
assert len(list(scan.plan_files())) == 4

scan = test_table.incremental_changelog_scan().to_snapshot(test_table.history()[4].snapshot_id)
assert len(list(scan.plan_files())) == 5

scan = (
test_table.incremental_changelog_scan()
.from_snapshot_exclusive(test_table.history()[0].snapshot_id)
.to_snapshot(test_table.history()[2].snapshot_id)
)
assert len(list(scan.plan_files())) == 2


@pytest.mark.integration
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')])
def test_filter_on_new_column(catalog: Catalog) -> None:
Expand Down