Skip to content

Incremental Changelog Scan #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
stefan-pdx opened this issue Dec 26, 2023 · 5 comments
Open

Incremental Changelog Scan #240

stefan-pdx opened this issue Dec 26, 2023 · 5 comments

Comments

@stefan-pdx
Copy link

Feature Request / Improvement

Hello,

The Java API supports performing a table scan for just the incremental changes between two snapshots: https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/IncrementalChangelogScan.html

How feasible would it be to support the same in PyIceberg? Are there any special considerations to make?

Thanks!

@stefan-pdx stefan-pdx changed the title Incremental Chnagelog Scan Incremental Changelog Scan Dec 27, 2023
Copy link

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

@github-actions github-actions bot added the stale label Jun 24, 2024
Copy link

github-actions bot commented Jul 9, 2024

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jul 9, 2024
@sungwy sungwy reopened this Jul 24, 2024
@sungwy
Copy link
Collaborator

sungwy commented Jul 24, 2024

Reopening issue as it was marked stale.

@glesperance
Copy link

glesperance commented Jul 24, 2024

This would be great. In the meantime I naively hacked this to get newly appended rows -- seems to work for my use case.
Looking at the code, wouldn't this feature be easier to implement if plan_files allowed to pass an optional screenshot_id argument?

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
Returns:
List of FileScanTasks that contain both data and delete files.
"""
snapshot = self.snapshot()
if not snapshot:
return iter([])

from typing import Iterable, Optional, Tuple, Union
from pyiceberg.table import (
    DataScan, FileScanTask, Table, Properties, ALWAYS_TRUE, EMPTY_DICT, BooleanExpression
)

class AppendScan(DataScan):
    start_snapshot_id: int | None = None

    @classmethod
    def from_table(cls, table: Table,
        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
        selected_fields: Tuple[str, ...] = ("*",),
        case_sensitive: bool = True,
        start_snapshot_id: Optional[int] = None,
        snapshot_id: Optional[int] = None,
        options: Properties = EMPTY_DICT,
        limit: Optional[int] = None,
    ) -> DataScan:
        instance = cls(
            table_metadata=table.metadata,
            io=table.io,
            row_filter=row_filter,
            selected_fields=selected_fields,
            case_sensitive=case_sensitive,
            snapshot_id=snapshot_id,
            options=options,
            limit=limit,
        )

        instance.start_snapshot_id = start_snapshot_id

        return instance

    def plan_files(self) -> Iterable[FileScanTask]:
        current_plan = super().plan_files()
        
        if self.start_snapshot_id is None:
            return current_plan
        
        # We need to filter out the files that were already in the old snapshot
        try:
            orig_snapshot_id = self.snapshot_id
            self.snapshot_id = self.start_snapshot_id
            prev_plan = super().plan_files()
            
            return [task for task in current_plan if task not in prev_plan]
        
        # Restore the snapshot id
        finally:
            self.snapshot_id = orig_snapshot_id

append_scan = AppendScan.from_table(product, start_snapshot_id=product.history()[-2].snapshot_id)
append_scan.to_pandas()

@smaheshwar-pltr
Copy link
Contributor

Hi folks, any update on this? I see #533 and #782, but unsure how close they are. Is this being worked on currently?

Reading PyIceberg tables incrementally is crucial for us. Happy to help out here!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants