Skip to content

Add write_parquet API for writing Parquet files without committing #1742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

andormarkus
Copy link

@andormarkus andormarkus commented Feb 28, 2025

This PR adds a new API method write_parquet() to the Table class, which allows writing a PyArrow table to Parquet files in Iceberg-compatible format without committing them to the table metadata. This provides a way to decouple the write and commit process, which is particularly useful in high-concurrency scenarios.

Key features

  • write_parquet(df) writes Parquet files compatible with Iceberg table format
  • Returns a list of file paths to the written files
  • Files can later be committed using add_files() API
  • Helps manage concurrency by separating write operations from metadata commits

Use case

This is especially useful for high-concurrency ingestion scenarios where multiple writers could be writing data to an Iceberg table simultaneously. By separating the write and commit phases, applications can implement a queue system where the commit process (which requires a lock) is handled separately from the data writing phase:

# Write data but don't commit
file_paths = table.write_parquet(df)

# Later, commit the files to make them visible in queries
table.add_files(file_paths=file_paths)

Documentation

Added comprehensive documentation to the API docs, including explanations and examples of how to use the new method alongside the existing add_files API.

Seeking guidance

I would appreciate guidance from project maintainers on:

  1. Which test cases would be most appropriate for this new API
  2. Is there a preferred location or approach for testing this functionality?
  3. Should we add tests that specifically verify the interaction between write_parquet() and add_files()?
  4. Are there any performance considerations or edge cases that should be covered in testing?
  5. Any further documentation or API changes before this is ready for review

Closes #1737

This method allows users to write a PyArrow table to the table's storage format
as Parquet files without committing them to the table. The method returns a list
of file paths that were written, enabling workflows that require access to the
data files before committing metadata changes.

Also adds an include_field_ids parameter to the underlying write_file and
_dataframe_to_data_files functions to provide more control over the Parquet
writing process.
This method allows users to write a PyArrow table to the table's storage format
as Parquet files without committing them to the table. The method returns a list
of file paths that were written, enabling workflows that require access to the
data files before committing metadata changes.

Also adds an include_field_ids parameter to the underlying write_file and
_dataframe_to_data_files functions to provide more control over the Parquet
writing process.
This method allows users to write a PyArrow table to the table's storage format
as Parquet files without committing them to the table. The method returns a list
of file paths that were written, enabling workflows that require access to the
data files before committing metadata changes.

Also adds an include_field_ids parameter to the underlying write_file and
_dataframe_to_data_files functions to provide more control over the Parquet
writing process.
This method allows users to write a PyArrow table to the table's storage format
as Parquet files without committing them to the table. The method returns a list
of file paths that were written, enabling workflows that require access to the
data files before committing metadata changes.

Also adds an include_field_ids parameter to the underlying write_file and
_dataframe_to_data_files functions to provide more control over the Parquet
writing process.
This method allows users to write a PyArrow table to the table's storage format
as Parquet files without committing them to the table. The method returns a list
of file paths that were written, enabling workflows that require access to the
data files before committing metadata changes.

Also adds an include_field_ids parameter to the underlying write_file and
_dataframe_to_data_files functions to provide more control over the Parquet
writing process.
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @andormarkus

This is a useful functionality but im not sure if its something we would want to add to the pyiceberg library. I understand the motivation but I think is specific to your use case.

The write_parquet API is not related to the iceberg Table class, it deals only with the underlying data files. So adding it to the Table class can lead to confusion.

As a user of the pyiceberg library, I can manually flush dataframes to parquet and then register it to the Iceberg table with add_files.
Similar to what you describe,

with tbl.update_snapshot().fast_append() as update_snapshot:
    data_files = _dataframe_to_data_files(..., df, ...)
    for data_file in data_files:
        update_snapshot.append_data_file(data_file)

Please let me know if that resolves your specific issue.

@andormarkus
Copy link
Author

Thanks for your feedback!

I understand your concern about adding this to the Table class. I agree it's not the ideal location and could lead to confusion.

The primary issue I'm trying to solve involves distributed environments. While your suggested approach works well in a single process, my use case involves multiple distributed processes. One process writes data files and another commits them to the table, requiring simple communication between these processes.

Passing DataFile objects between processes requires serialization, which I've found challenging to implement properly. I tried jsonpickle and custom serialization methods but encountered significant issues.

What I need is a simpler workflow where:

  1. A process writes Parquet files in Iceberg-compatible format (like Table.apped does)
  2. It returns just simple strings (easily passed between systems)
  3. Another process can take these simple string and use any API to commit them

This approach avoids having to pass complex objects like DataFile between distributed components. I'm open to alternative implementations that meet these requirements.

@Fokko
Copy link
Contributor

Fokko commented Mar 2, 2025

Thanks @andormarkus for working on this, I've just commented on #1737, would love to hear your thoughts there as well.

Passing DataFile objects between processes requires serialization, which I've found challenging to implement properly. I tried jsonpickle and custom serialization methods but encountered significant issues.

DataFile's tend to be pretty big. Java solves this by writing the DataFiles out into a Manifest, and serializing the ManifestFile and sending it to the main process.

@andormarkus
Copy link
Author

andormarkus commented Mar 3, 2025

Hi @Fokko

We ended up to build a dynamic custom serialize / deserialize function which supports gzip and zlib compression to deal with size issue. Once the DataFile is compressed the size is not so big and we can easily pass it trough the queue.

We needed to built is dynamic thus we deal with the changes of the DataFile class. If we can add method to DataFile for serialize / deserialize than the logic can be much simpler.

The Manifest file is implemented in PyIceberg as well, however as I read in the source code the read and write methods are deeply nested and not easy to access compared to append_data_file

I'm fine to contribute the serialize / deserialize functions if the maintainers agree on this approach.

@Fokko
Copy link
Contributor

Fokko commented Mar 3, 2025

I understand that it is a streaming workload? In that case, writing the manifest doesn't help a lot. I understand the problem. Let me think out loud:

The problem with the current approach (in this PR) is that when the schema or partition spec changes, the add_files will compare it against the latest schema. While it is perfectly fine to still commit the files with an older schema to the table, I think the add_files will not allow this. Another concern is around setting precedence. For the append operations, this works perfectly fine. But when we want to do distributed rewrites or deletes, the order of operation is very important.

We needed to built is dynamic thus we deal with the changes of the DataFile class. If we can add method to DataFile for serialize / deserialize than the logic can be much simpler.

The most obvious way of serializing it is by using Avro. This is efficient over the wire as well (I expect it to be much smaller than jsonpickle or regular pickle). I would be in favor of having this in combination with append_data_file because it is much more robust. This would also play very well with the logic suggested in #1678, resulting in far fewer conflicts.

@andormarkus
Copy link
Author

We want to avoid streaming and Spark on all costs. We have burned ourself with both.

I have no problem with Avro (Manifestfile) combined with append_data_file.

Yes from current PR is kind of obsolete because I think the append_data_file can be better approach.

I like your #1678 however this would create to much commit. We have implemented this suggestion however this created to much commit was was really performance killer.

I will close this PR and related issue and reopen everything as distributed write

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Add Writer Support for Table-Compatible Parquet Files
4 participants