Skip to content

Conversation

CTTY
Copy link
Contributor

@CTTY CTTY commented Sep 7, 2025

Which issue does this PR close?

What changes are included in this PR?

Refactored the writer layers; from a bird’s-eye view, the structure now looks like this:

flowchart TD
    subgraph PartitioningWriter
        PW[PartitioningWriter]

        subgraph RollingWriter
            RW[RollingWriter]

            subgraph DataFileWriter
                DFW[DataFileWriter]

                subgraph FileWriter
                    FW[FileWriter]
                end

                DFW --> FW
            end

            RW --> DFW
        end

        PW --> RW
    end


Loading
  • Modified Writer Interfaces:

    • Updated the FileWriterBuilder and IcebergWriterBuilder interfaces to accept an OutputFile parameter in their build methods
    • Added a new PartitioningWriter trait in writer/mod.rs with methods for partitioning-aware writers (not implemented yet, we can use a separate PR to add this trait if needed)
  • Transformed RollingFileWriter to RollingWriter:

    • Renamed RollingFileWriter to RollingWriter
    • Changed from implementing FileWriter to being a standalone writer that uses IcebergWriterBuilder
    • Added support for location generation and file naming within the writer itself
    • Made it generic over IcebergWriterBuilder, LocationGenerator, and FileNameGenerator
    • Added methods to create new writers for partitions
  • Updated ParquetWriter:

    • Removed direct dependencies on LocationGenerator and FileNameGenerator
    • Renamed out_file to output_file for consistency
    • Simplified the interface to focus on writing to a provided output file
  • Updated DataFileWriter and EqualityDeleteWriter:

    • Modified to pass the OutputFile parameter to their inner writers
  • Updated DataFusion Integration:

    • Modified IcebergWriteExec to use the new RollingWriter directly instead of using builder patterns
    • Simplified the writer creation process in the execution plan
    • NOTE: Technically DataFusion or any engine should use TaskWriter -> PartitioningWriter -> RollingWriter -> ..., but TaskWriter and PartitioningWriter are not included in this draft so far

Are these changes tested?

Not yet, but changing the existing tests accordingly should be enough

/// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size.
impl<B, L, F> RollingWriter<B, L, F>
where
B: IcebergWriterBuilder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing need to noticed is that following is what IcebergWriterBuilder looks like.

#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
    Send + Clone + 'static
{
    /// The associated writer type.
    type R: IcebergWriter<I, O>;
    /// Build the iceberg writer.
    async fn build(self) -> Result<Self::R>;
}

For writer like position delete writer, it has different input like following, see: #704

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput>>
    for PositionDeleteWriterBuilder<B>
{
    type R = PositionDeleteWriter<B>;

    async fn build(self) -> Result<Self::R> {
        Ok(PositionDeleteWriter {
            inner_writer: Some(self.inner.build().await?),
            partition_value: self.partition_value.unwrap_or(Struct::empty()),
        })
    }
}

And that's why rolling writer is a FileWriter at first. After we adopt this design, how can we something like

RollingWriter<PostitionDeletWriter>

Copy link
Contributor Author

@CTTY CTTY Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your concern is valid, we may need to expose I and O in the RollingWriter as well, and that should solve this problem?

pub struct RollingWriter<B, L, F, I, O>
where
    B: IcebergWriterBuilder<I, O>,
    L: LocationGenerator,
    F: FileNameGenerator,

Meanwhile I've been wondering how useful is the abstraction of IcebergWriter... If we separate RollingWriter into RollingPositionalDeletesWriter and RollingXXXWriter and have them use concrete types then this would be a lot easier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile I've been wondering how useful is the abstraction of IcebergWriter

E.g the user want to custom their own writer with to track some metrics like following:

RollingWriter<TrackPositionalDeletesWriter>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think custom writers can either implement FileWriter (lightweighted, file-level customization) or PartitioningWriter (heavier, customization across multiple partitions).

In your example, the custom writer can implement FileWriter and be used like this:

RollingPositionalDeletesWriter<TrackWriter>

/// Close all writers and return the data files.
fn close(&mut self) -> Result<Vec<DataFile>>;
}

/// The builder for iceberg writer.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we will also need to change the DefaultOutput for IcebergWriter from Vec<DataFile> to Vec<DataFileBuilder> since IcebergWriter is no longer the outermost writer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decouple ParquetWriter and LocationGenerator
2 participants