Skip to content

Incremental Append Scan #533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

hililiwei
Copy link
Contributor

@hililiwei hililiwei commented Mar 19, 2024

Hi,@Fokko, long time no see. 😄
I have written some preliminary code for incremental reading, but it still needs a lot of work. However, I would like to discuss it with you at an early stage as it will help me stay on the right track. Could you please take a look at it when you have a chance? Thank you.

@hililiwei hililiwei changed the title Incremental Append Scan WIP: Incremental Append Scan Mar 19, 2024
@ndrluis ndrluis mentioned this pull request Mar 19, 2024
8 tasks
@@ -1578,6 +1595,120 @@ def to_ray(self) -> ray.data.dataset.Dataset:
return ray.data.from_arrow(self.to_arrow())


class IncrementalAppendScan(DataScan):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused here. Should I modify the inheritance structure? The current inheritance structure is TableScan -> DataScan -> IncrementalAppendScan. Should I create a class, like BaseScan, and make both DataScan and IncrementalAppendScan inherit from it, and move the snapshot_id snapshot() from TableScan into DataScan? Like:

TableScan  
    >BaseScan 
         >DataScan 
         >IncrementalAppendScan

@hililiwei
Copy link
Contributor Author

In the latest code commit, I tinkered with the class inheritance by introducing a new base class, BaseIncrementalScan, which inherits from TableScan. I also pushed the snapshot_id down to DataScan and shuffled a few methods around (which might cause some backward compatibility issues 💔 ). How do you think I can improve it? @Fokko

@hililiwei hililiwei changed the title WIP: Incremental Append Scan Incremental Append Scan Apr 2, 2024
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @hililiwei sorry for the late reply, for some reason this fell of my radar. This looks like a great start. Excited to see this coming to PyIceberg as well 👍

Comment on lines +1632 to +1980
to_snapshot_id: Optional[int] = None,
from_snapshot_id_exclusive: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worthwhile to add a docstring there to describe these parameters to help the end-user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments, but I'm not sure if they are well-written. Please review them, thx.

@hililiwei hililiwei force-pushed the incremental branch 4 times, most recently from 57f6551 to d1207f8 Compare April 30, 2024 10:47
@hililiwei
Copy link
Contributor Author

Sorry for the late correction. I've adjusted the code based on the latest comments. Could you please take a look?

@Fokko
Copy link
Contributor

Fokko commented May 23, 2024

@hililiwei I'm sorry, this also fell off my radar.

@Fokko Fokko mentioned this pull request May 23, 2024
39 tasks
Returns:
this for method chaining
"""
return self.update(to_snapshot_id=to_snapshot_id)
Copy link
Contributor

@chinmay-bhat chinmay-bhat May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thank you for the contribution!
I think in this implementation, to_snapshot_id doesn't have a default value if to_snapshot_id is not provided. Can you add the default value as mentioned in the docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the parameter "to_snapshot_id" is not set, we assume that the user intends to get the latest data in the table, so I will fetch the latest snapshot_id of the table.

        if self.to_snapshot_id is None:
            current_snapshot = self.table_metadata.current_snapshot()
            if current_snapshot is None:
                raise ValueError("End snapshot is not set and table has no current snapshot")
            self.to_snapshot_id = current_snapshot.snapshot_id

Currently, this is being done in the plan_files(), but we can also move it forward to __init__

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think keeping it in plan_files() should be fine! As to_snapshot_id and from_snapshot_id_exclusive are Optional[int], do we want to handle the case when the ids are None?

For ex:

def to_snapshot(self: S, to_snapshot_id: Optional[int]) -> self:
    if to_snapshot_id is None:
        return self
    return self.update(to_snapshot_id=to_snapshot_id)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Returns:
this for method chaining
"""
return self.update(from_snapshot_id_exclusive=from_snapshot_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here also, from_snapshot_id also doesn't have a default value, as mentioned in the docstring. Should we add it here or is it being set somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the tool method ancestors_between, it will handle the situation where from_snapshot_id is empty.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks

@chinmay-bhat chinmay-bhat mentioned this pull request May 30, 2024
1 task
@abstractmethod
def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[FileScanTask]: ...

def plan_files(self) -> Iterable[FileScanTask]:
Copy link
Contributor

@chinmay-bhat chinmay-bhat Jun 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for plan_files() to return Iterable[ScanTask]?
As this class is the base for incremental scans, this change will allow returning iterable of child classes of ScanTask including FileScanTask.
For example, ChangelogScanTask(ScanTask), which will be introduced in incremental changelog scan PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@glesperance
Copy link

I managed to get a poor mans append-scan with this #240 (comment)

Looking at this PR wouldn't it be simpler to implement append-scan in the api by adding a append_scan method to Table, then refactoring plan_files to take an optional snapshot_id, and providing a lightweight AppendScan class that makes 2 calls to plan_files and then compares?

In my case there was no need for touching __eq__ or __hash__

Xiao-zhen-Liu added a commit to Texera/texera that referenced this pull request Jan 30, 2025
This PR adds a storage layer implementation on the Python side of
Texera's codebase, mirroring the implementation of our Java-based
storage layer.

## Motivation
- The primary motivation of having a storage layer in Python so that we
can let Python UDF operators' ports write directly to result tables
without needing to send the results back to Java.
- In the future we will also use the Python storage layer for UDF logs
and workflow runtime statistics.

## Storage APIs
- There are 3 abstract classes in Java's storage implementation:
  - `ReadOnlyVirtualDocument` for read-only tables
- `VirtualDocument` for tables supporting both read and write
operations.
  - `BufferedItemWriter` as a writer class of `VirtualDocument`
- We mirror the implementation in Python, but keep only the APIs
relevant to table storage (e.g., APIs related to dataset storage are not
kept in Python.)

## Iceberg Document
Following #3147, we add a table-storage implementation based on Apache
Iceberg (pyiceberg), including `IcebergDocument`, `IcebergTableWriter`,
`IcebergCatalogInstance`, and related util functions and tests.

### Limitations of / TODOs for python implementation 
pyiceberg is less mature than its java-based counterpart. As a result
there are a few functionalities not supported in our current Python
storage implementation.

#### Incremental Read
Incremental Read is not supported by pyiceberg. It will be supported [in
the future](apache/iceberg-python#533). Before
then we will not include incremental read in our Python codebase (it is
also not currently needed)
#### Concurrent writers
Iceberg uses optimistic concurrency control for concurrent writers. Java
Iceberg natively supports retry with configurable retry parameters,
using exponential backoff (without randomness). However pyiceberg does
not currently support retry. We implemented an ad-hoc custom retry
mechanism in `IcebergTableWriter`, using exponential random backoff
based on the [tenacity](https://tenacity.readthedocs.io/en/latest/)
library. It has a good speed (~0.6s for 10 concurrent writers writing
20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds
for the same test). We may need to re-evaluate this custom
implementation if pyiceberg supports retry natively in the future.

## Iceberg Catalog
pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST
catalog for production. We use postgresql based SQL catalog in this
implementation for the following reasons:
- It supports local storage.
- We tested that it is works with both Java and Python iceberg storage.
- It is easier to set up for developers (compared to REST services).

### PostgreSQL setup
Python storage layer requires a running postgreSQL service in the
environment, and an empty database for iceberg to work.
- **A script to set up a new postgres database for Texera's iceberg
storage has been added for CI tests.**
- The database will be used by pyiceberg to manage the catalog.
- The logic to setup the database is added in GitHub CI config.
- Java side can continue using Hadoop-based catalog for now until we add
storage on operator ports for both Java and Python.
- As the Python storage is not currently used by Python workers, no
action is required for developers for now.

### REST catalogs (feel free to skip this section)
I also explored 3 major REST catalog implementations
([lakekeeper](https://lakekeeper.io),
[polaris](https://polaris.apache.org), and
[gravitino](https://gravitino.apache.org)) and here are some
observations:
- REST catalogs are the trend primarily because different query engines
(Spark, Flink, Snowflake, etc.) relying on iceberg need a central place
to keep and manage the catalogs. Under the hood they all still use some
database as their storage layer.
- Most of them support / recommend cloud storage only in production and
do not support local storage.
- They are incubating projects and lack documentation. For example I
find it very hard to set up authentication (as pyiceberg requires
authentication to work with REST catalogs) using gravitino, and using
them will add a lot more burden to our developers.
- I have successfully made polaris work with our implementation after
setting up auth, but somehow it was very very slow.
- As postgres catalog is working, we will explore more about REST
catalog in the future if have migrated to cloud storage and have
scalability issues.

## Storage configurations

A static class `StorageConfigs` is added to manage storage-related
configurations. We do NOT read the configs from files. Instead we will
let Java pass the configs to Python worker, and the config will be
filled when initializing the worker. The storage config is hardcoded in
CI tests.

## Other items

`VFSURIFactory` and `DocumentFactory` are added in Python storage layer
mirroring the Java implementations.

## TODO for Java Storage
- Add SQL catalog as another type of iceberg catalog

---------

Co-authored-by: Jiadong Bai <[email protected]>
@mrendi29
Copy link

Is anyone still working on this PR? If not i'd like to give it a try and pick it up from where it was left. :D

@glesperance
Copy link

glesperance commented Mar 12, 2025

@mrendi29 unclear.

For now I'm running with this: #240 (comment) .

@Fokko would be code in my comment make sense? if so I could issue a pr to for it.

Ma77Ball pushed a commit to Texera/texera that referenced this pull request Apr 2, 2025
This PR adds a storage layer implementation on the Python side of
Texera's codebase, mirroring the implementation of our Java-based
storage layer.

## Motivation
- The primary motivation of having a storage layer in Python so that we
can let Python UDF operators' ports write directly to result tables
without needing to send the results back to Java.
- In the future we will also use the Python storage layer for UDF logs
and workflow runtime statistics.

## Storage APIs
- There are 3 abstract classes in Java's storage implementation:
  - `ReadOnlyVirtualDocument` for read-only tables
- `VirtualDocument` for tables supporting both read and write
operations.
  - `BufferedItemWriter` as a writer class of `VirtualDocument`
- We mirror the implementation in Python, but keep only the APIs
relevant to table storage (e.g., APIs related to dataset storage are not
kept in Python.)

## Iceberg Document
Following #3147, we add a table-storage implementation based on Apache
Iceberg (pyiceberg), including `IcebergDocument`, `IcebergTableWriter`,
`IcebergCatalogInstance`, and related util functions and tests.

### Limitations of / TODOs for python implementation 
pyiceberg is less mature than its java-based counterpart. As a result
there are a few functionalities not supported in our current Python
storage implementation.

#### Incremental Read
Incremental Read is not supported by pyiceberg. It will be supported [in
the future](apache/iceberg-python#533). Before
then we will not include incremental read in our Python codebase (it is
also not currently needed)
#### Concurrent writers
Iceberg uses optimistic concurrency control for concurrent writers. Java
Iceberg natively supports retry with configurable retry parameters,
using exponential backoff (without randomness). However pyiceberg does
not currently support retry. We implemented an ad-hoc custom retry
mechanism in `IcebergTableWriter`, using exponential random backoff
based on the [tenacity](https://tenacity.readthedocs.io/en/latest/)
library. It has a good speed (~0.6s for 10 concurrent writers writing
20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds
for the same test). We may need to re-evaluate this custom
implementation if pyiceberg supports retry natively in the future.

## Iceberg Catalog
pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST
catalog for production. We use postgresql based SQL catalog in this
implementation for the following reasons:
- It supports local storage.
- We tested that it is works with both Java and Python iceberg storage.
- It is easier to set up for developers (compared to REST services).

### PostgreSQL setup
Python storage layer requires a running postgreSQL service in the
environment, and an empty database for iceberg to work.
- **A script to set up a new postgres database for Texera's iceberg
storage has been added for CI tests.**
- The database will be used by pyiceberg to manage the catalog.
- The logic to setup the database is added in GitHub CI config.
- Java side can continue using Hadoop-based catalog for now until we add
storage on operator ports for both Java and Python.
- As the Python storage is not currently used by Python workers, no
action is required for developers for now.

### REST catalogs (feel free to skip this section)
I also explored 3 major REST catalog implementations
([lakekeeper](https://lakekeeper.io),
[polaris](https://polaris.apache.org), and
[gravitino](https://gravitino.apache.org)) and here are some
observations:
- REST catalogs are the trend primarily because different query engines
(Spark, Flink, Snowflake, etc.) relying on iceberg need a central place
to keep and manage the catalogs. Under the hood they all still use some
database as their storage layer.
- Most of them support / recommend cloud storage only in production and
do not support local storage.
- They are incubating projects and lack documentation. For example I
find it very hard to set up authentication (as pyiceberg requires
authentication to work with REST catalogs) using gravitino, and using
them will add a lot more burden to our developers.
- I have successfully made polaris work with our implementation after
setting up auth, but somehow it was very very slow.
- As postgres catalog is working, we will explore more about REST
catalog in the future if have migrated to cloud storage and have
scalability issues.

## Storage configurations

A static class `StorageConfigs` is added to manage storage-related
configurations. We do NOT read the configs from files. Instead we will
let Java pass the configs to Python worker, and the config will be
filled when initializing the worker. The storage config is hardcoded in
CI tests.

## Other items

`VFSURIFactory` and `DocumentFactory` are added in Python storage layer
mirroring the Java implementations.

## TODO for Java Storage
- Add SQL catalog as another type of iceberg catalog

---------

Co-authored-by: Jiadong Bai <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants