From cb2e1ffa6924a74691a49274f3ee924a79039ed2 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 6 Sep 2025 18:47:48 -0700 Subject: [PATCH 1/3] refactor writers for the future partitioning writers --- .../writer/base_writer/data_file_writer.rs | 6 +- .../base_writer/equality_delete_writer.rs | 5 +- crates/iceberg/src/writer/file_writer/mod.rs | 5 +- .../src/writer/file_writer/parquet_writer.rs | 61 ++----- .../src/writer/file_writer/rolling_writer.rs | 160 +++++++++++------- crates/iceberg/src/writer/mod.rs | 14 +- .../datafusion/src/physical_plan/write.rs | 30 ++-- 7 files changed, 152 insertions(+), 129 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index ff35b9ae4b..398098f18f 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -19,7 +19,7 @@ use arrow_array::RecordBatch; use itertools::Itertools; - +use crate::io::OutputFile; use crate::Result; use crate::spec::{DataContentType, DataFile, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; @@ -48,9 +48,9 @@ impl DataFileWriterBuilder { impl IcebergWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; - async fn build(self) -> Result { + async fn build(self, output_file: OutputFile) -> Result { Ok(DataFileWriter { - inner_writer: Some(self.inner.clone().build().await?), + inner_writer: Some(self.inner.clone().build(output_file).await?), partition_value: self.partition_value.unwrap_or(Struct::empty()), partition_spec_id: self.partition_spec_id, }) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 058b743789..89aeece8cc 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -30,6 +30,7 @@ use crate::spec::{DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; +use crate::io::OutputFile; /// Builder for `EqualityDeleteWriter`. #[derive(Clone, Debug)] @@ -113,9 +114,9 @@ impl EqualityDeleteWriterConfig { impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { type R = EqualityDeleteFileWriter; - async fn build(self) -> Result { + async fn build(self, output_file: OutputFile) -> Result { Ok(EqualityDeleteFileWriter { - inner_writer: Some(self.inner.clone().build().await?), + inner_writer: Some(self.inner.clone().build(output_file).await?), projector: self.config.projector, equality_ids: self.config.equality_ids, partition_value: self.config.partition_value, diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 2a5a735534..2da783adc3 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -22,6 +22,7 @@ use futures::Future; use super::CurrentFileStatus; use crate::Result; +use crate::io::OutputFile; use crate::spec::DataFileBuilder; mod parquet_writer; @@ -37,8 +38,8 @@ type DefaultOutput = Vec; pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; - /// Build file writer. - fn build(self) -> impl Future> + Send; + /// Build file writer with the provided output file. + fn build(self, output_file: OutputFile) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 90c0d28c0a..07230b7099 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -34,7 +34,6 @@ use parquet::format::FileMetaData; use parquet::thrift::{TCompactOutputProtocol, TSerializable}; use thrift::protocol::TOutputProtocol; -use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::{FileWriter, FileWriterBuilder}; use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, @@ -43,7 +42,7 @@ use crate::arrow::{ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionKey, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, + NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, StructType, TableMetadata, Type, visit_schema, }; use crate::transform::create_transform_function; @@ -52,78 +51,46 @@ use crate::{Error, ErrorKind, Result}; /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] -pub struct ParquetWriterBuilder { +pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, - partition_key: Option, match_mode: FieldMatchMode, - - file_io: FileIO, - location_generator: T, - file_name_generator: F, } -impl ParquetWriterBuilder { +impl ParquetWriterBuilder { /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. pub fn new( props: WriterProperties, schema: SchemaRef, - partition_key: Option, - file_io: FileIO, - location_generator: T, - file_name_generator: F, ) -> Self { - Self::new_with_match_mode( - props, - schema, - partition_key, - FieldMatchMode::Id, - file_io, - location_generator, - file_name_generator, - ) + Self::new_with_match_mode(props, schema, FieldMatchMode::Id) } /// Create a new `ParquetWriterBuilder` with custom match mode pub fn new_with_match_mode( props: WriterProperties, schema: SchemaRef, - partition_key: Option, match_mode: FieldMatchMode, - file_io: FileIO, - location_generator: T, - file_name_generator: F, ) -> Self { Self { props, schema, - partition_key, match_mode, - file_io, - location_generator, - file_name_generator, } } } -impl FileWriterBuilder for ParquetWriterBuilder { +impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self) -> Result { - let out_file = self - .file_io - .new_output(self.location_generator.generate_location( - self.partition_key.as_ref(), - &self.file_name_generator.generate_file_name(), - ))?; - + async fn build(self, output_file: OutputFile) -> Result { Ok(ParquetWriter { schema: self.schema.clone(), inner_writer: None, writer_properties: self.props, current_row_num: 0, - out_file, + output_file, nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), }) } @@ -250,7 +217,7 @@ impl SchemaVisitor for IndexByParquetPathName { /// `ParquetWriter`` is used to write arrow data into parquet file on storage. pub struct ParquetWriter { schema: SchemaRef, - out_file: OutputFile, + output_file: OutputFile, inner_writer: Option>>>, writer_properties: WriterProperties, current_row_num: usize, @@ -555,7 +522,7 @@ impl FileWriter for ParquetWriter { writer } else { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let inner_writer = self.out_file.writer().await?; + let inner_writer = self.output_file.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( async_writer, @@ -594,7 +561,7 @@ impl FileWriter for ParquetWriter { let written_size = writer.bytes_written(); if self.current_row_num == 0 { - self.out_file.delete().await.map_err(|err| { + self.output_file.delete().await.map_err(|err| { Error::new( ErrorKind::Unexpected, "Failed to delete empty parquet file.", @@ -616,7 +583,7 @@ impl FileWriter for ParquetWriter { self.schema, parquet_metadata, written_size, - self.out_file.location().to_string(), + self.output_file.location().to_string(), self.nan_value_count_visitor.nan_value_counts, )?]) } @@ -625,7 +592,7 @@ impl FileWriter for ParquetWriter { impl CurrentFileStatus for ParquetWriter { fn current_file_path(&self) -> String { - self.out_file.location().to_string() + self.output_file.location().to_string() } fn current_row_num(&self) -> usize { @@ -1681,7 +1648,7 @@ mod tests { .build() .await?; pw.write(&to_write).await?; - let file_path = pw.out_file.location().to_string(); + let file_path = pw.output_file.location().to_string(); pw.close().await.unwrap(); assert!(file_io.exists(file_path).await.unwrap()); @@ -1698,7 +1665,7 @@ mod tests { ) .build() .await?; - let file_path = pw.out_file.location().to_string(); + let file_path = pw.output_file.location().to_string(); pw.close().await.unwrap(); assert!(!file_io.exists(file_path).await.unwrap()); diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 24aa49e4a6..168eeb3dba 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -17,65 +17,75 @@ use arrow_array::RecordBatch; -use crate::spec::DataFileBuilder; -use crate::writer::CurrentFileStatus; -use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::io::FileIO; +use crate::spec::{DataFile, PartitionKey}; +use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; +use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; -/// Builder for creating a `RollingFileWriter` that rolls over to a new file -/// when the data size exceeds a target threshold. -#[derive(Clone)] -pub struct RollingFileWriterBuilder { +/// A writer that automatically rolls over to a new file when the data size +/// exceeds a target threshold. +/// +/// This writer wraps another writer that tracks the amount of data written. +/// When the data size exceeds the target size, it closes the current file and +/// starts writing to a new one. +pub struct RollingWriter +where + B: IcebergWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + inner: Option, inner_builder: B, target_file_size: usize, + location_generator: L, + file_name_generator: F, + file_io: FileIO, + partition_key: Option, + data_files: Vec, // this should be B::R::O? DefaultOutput? } -impl RollingFileWriterBuilder { - /// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size. +impl RollingWriter +where + B: IcebergWriterBuilder, + B::R: CurrentFileStatus, + L: LocationGenerator, + F: FileNameGenerator, +{ + /// Creates a new `RollingWriter` with the specified inner builder and target size. /// /// # Arguments /// - /// * `inner_builder` - The builder for the underlying file writer + /// * `inner_builder` - The builder for the underlying writer /// * `target_file_size` - The target size in bytes before rolling over to a new file + /// * `location_generator` - The location generator to use for generating file paths + /// * `file_name_generator` - The file name generator to use for generating file names + /// * `file_io` - The file IO to use for creating new files + /// * `partition_key` - The partition key for the files /// /// NOTE: The `target_file_size` does not exactly reflect the final size on physical storage. /// This is because the input size is based on the Arrow in-memory format and cannot precisely control rollover behavior. /// The actual file size on disk is expected to be slightly larger than `target_file_size`. - pub fn new(inner_builder: B, target_file_size: usize) -> Self { + pub fn new( + inner_builder: B, + target_file_size: usize, + location_generator: L, + file_name_generator: F, + file_io: FileIO, + partition_key: Option, + ) -> Self { Self { + inner: None, inner_builder, target_file_size, + location_generator, + file_name_generator, + file_io, + partition_key, + data_files: Vec::new(), } } -} - -impl FileWriterBuilder for RollingFileWriterBuilder { - type R = RollingFileWriter; - - async fn build(self) -> Result { - Ok(RollingFileWriter { - inner: None, - inner_builder: self.inner_builder, - target_file_size: self.target_file_size, - data_file_builders: vec![], - }) - } -} - -/// A writer that automatically rolls over to a new file when the data size -/// exceeds a target threshold. -/// -/// This writer wraps another file writer that tracks the amount of data written. -/// When the data size exceeds the target size, it closes the current file and -/// starts writing to a new one. -pub struct RollingFileWriter { - inner: Option, - inner_builder: B, - target_file_size: usize, - data_file_builders: Vec, -} -impl RollingFileWriter { /// Determines if the writer should roll over to a new file. /// /// # Returns @@ -84,28 +94,45 @@ impl RollingFileWriter { fn should_roll(&self) -> bool { self.current_written_size() > self.target_file_size } -} -impl FileWriter for RollingFileWriter { - async fn write(&mut self, input: &RecordBatch) -> Result<()> { + /// Create a new writer for the current partition. + async fn create_new_writer(&mut self) -> Result { + let file_path = self.location_generator.generate_location( + self.partition_key.as_ref(), + &self.file_name_generator.generate_file_name(), + ); + + let output_file = self.file_io.new_output(file_path)?; + let writer = self.inner_builder.clone().build(output_file).await?; + + Ok(writer) + } + + /// Write a record batch to the current file, rolling over to a new file if necessary. + /// + /// # Arguments + /// + /// * `batch` - The record batch to write + pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { if self.inner.is_none() { // initialize inner writer - self.inner = Some(self.inner_builder.clone().build().await?); + self.inner = Some(self.create_new_writer().await?); } if self.should_roll() { - if let Some(inner) = self.inner.take() { + if let Some(mut inner) = self.inner.take() { // close the current writer, roll to a new file - self.data_file_builders.extend(inner.close().await?); + let mut data_files = inner.close().await?; + self.data_files.append(&mut data_files); // start a new writer - self.inner = Some(self.inner_builder.clone().build().await?); + self.inner = Some(self.create_new_writer().await?); } } // write the input - if let Some(writer) = self.inner.as_mut() { - Ok(writer.write(input).await?) + if let Some(writer) = &mut self.inner { + writer.write(batch).await } else { Err(Error::new( ErrorKind::Unexpected, @@ -114,28 +141,47 @@ impl FileWriter for RollingFileWriter { } } - async fn close(mut self) -> Result> { + /// Close the writer and return the data files for all files. + pub async fn close(&mut self) -> Result> { // close the current writer and merge the output - if let Some(current_writer) = self.inner { - self.data_file_builders - .extend(current_writer.close().await?); + if let Some(mut current_writer) = self.inner.take() { + let data_files = current_writer.close().await?; + self.data_files.extend(data_files); } - Ok(self.data_file_builders) + Ok(std::mem::take(&mut self.data_files)) } } -impl CurrentFileStatus for RollingFileWriter { +impl CurrentFileStatus for RollingWriter +where + B: IcebergWriterBuilder, + B::R: IcebergWriter + CurrentFileStatus, + L: LocationGenerator, + F: FileNameGenerator, +{ fn current_file_path(&self) -> String { - self.inner.as_ref().unwrap().current_file_path() + if let Some(inner) = &self.inner { + inner.current_file_path() + } else { + "".to_string() + } } fn current_row_num(&self) -> usize { - self.inner.as_ref().unwrap().current_row_num() + if let Some(inner) = &self.inner { + inner.current_row_num() + } else { + 0 + } } fn current_written_size(&self) -> usize { - self.inner.as_ref().unwrap().current_written_size() + if let Some(inner) = &self.inner { + inner.current_written_size() + } else { + 0 + } } } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 8f17d50e27..90c5d015a3 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -229,6 +229,7 @@ pub mod file_writer; use arrow_array::RecordBatch; use crate::Result; +use crate::io::OutputFile; use crate::spec::DataFile; type DefaultInput = RecordBatch; @@ -241,8 +242,8 @@ pub trait IcebergWriterBuilder: { /// The associated writer type. type R: IcebergWriter; - /// Build the iceberg writer. - async fn build(self) -> Result; + /// Build the iceberg writer with the provided output file. + async fn build(self, output_file: OutputFile) -> Result; } /// The iceberg writer used to write data to iceberg table. @@ -268,6 +269,15 @@ pub trait CurrentFileStatus { fn current_written_size(&self) -> usize; } +/// The partitioning writer used to write data to multiple partitions. +pub trait PartitioningWriter { + /// Write a record batch, which may contain rows for multiple partitions. + fn write(&mut self, batch: RecordBatch) -> Result<()>; + + /// Close all writers and return the data files. + fn close(&mut self) -> Result>; +} + #[cfg(test)] mod tests { use arrow_array::RecordBatch; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 625405c95b..9797d73c50 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -47,8 +47,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::writer::file_writer::rolling_writer::RollingWriter; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; @@ -231,17 +230,11 @@ impl ExecutionPlan for IcebergWriteExec { ))); } - // Create data file writer builder + // Create rolling writer let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( WriterProperties::default(), self.table.metadata().current_schema().clone(), - None, FieldMatchMode::Name, - self.table.file_io().clone(), - DefaultLocationGenerator::new(self.table.metadata().clone()) - .map_err(to_datafusion_error)?, - // todo filename prefix/suffix should be configurable - DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format), ); let target_file_size = match self .table @@ -261,10 +254,18 @@ impl ExecutionPlan for IcebergWriteExec { .map_err(to_datafusion_error)?, None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, }; - let rolling_writer_builder = - RollingFileWriterBuilder::new(parquet_file_writer_builder, target_file_size); let data_file_writer_builder = - DataFileWriterBuilder::new(rolling_writer_builder, None, spec_id); + DataFileWriterBuilder::new(parquet_file_writer_builder, None, spec_id); + let rolling_writer = RollingWriter::new( + data_file_writer_builder, + target_file_size, + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?, + // todo filename prefix/suffix should be configurable + DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format), + self.table.file_io().clone(), + None, + ); // Get input data let data = execute_input_stream( @@ -279,10 +280,7 @@ impl ExecutionPlan for IcebergWriteExec { // Create write stream let stream = futures::stream::once(async move { - let mut writer = data_file_writer_builder - .build() - .await - .map_err(to_datafusion_error)?; + let mut writer = rolling_writer; let mut input_stream = data; while let Some(batch) = input_stream.next().await { From d7b4eeb582f06cb41ce7c3e35272babcd75efd07 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Sat, 6 Sep 2025 19:05:30 -0700 Subject: [PATCH 2/3] some minor changes --- .../iceberg/src/writer/base_writer/data_file_writer.rs | 3 ++- .../src/writer/base_writer/equality_delete_writer.rs | 2 +- crates/iceberg/src/writer/file_writer/parquet_writer.rs | 9 +++------ crates/iceberg/src/writer/file_writer/rolling_writer.rs | 8 +++----- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 398098f18f..19c419152c 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -19,8 +19,9 @@ use arrow_array::RecordBatch; use itertools::Itertools; -use crate::io::OutputFile; + use crate::Result; +use crate::io::OutputFile; use crate::spec::{DataContentType, DataFile, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 89aeece8cc..d85ace5447 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -26,11 +26,11 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_projector::RecordBatchProjector; use crate::arrow::schema_to_arrow_schema; +use crate::io::OutputFile; use crate::spec::{DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; -use crate::io::OutputFile; /// Builder for `EqualityDeleteWriter`. #[derive(Clone, Debug)] diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 07230b7099..c7588cabfe 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -42,8 +42,8 @@ use crate::arrow::{ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, - Struct, StructType, TableMetadata, Type, visit_schema, + NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, + StructType, TableMetadata, Type, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -60,10 +60,7 @@ pub struct ParquetWriterBuilder { impl ParquetWriterBuilder { /// Create a new `ParquetWriterBuilder` /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. - pub fn new( - props: WriterProperties, - schema: SchemaRef, - ) -> Self { + pub fn new(props: WriterProperties, schema: SchemaRef) -> Self { Self::new_with_match_mode(props, schema, FieldMatchMode::Id) } diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 168eeb3dba..d2ab9fec87 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -42,7 +42,7 @@ where file_name_generator: F, file_io: FileIO, partition_key: Option, - data_files: Vec, // this should be B::R::O? DefaultOutput? + data_files: Vec, // todo this should be B::R::O? DefaultOutput? } impl RollingWriter @@ -122,8 +122,7 @@ where if self.should_roll() { if let Some(mut inner) = self.inner.take() { // close the current writer, roll to a new file - let mut data_files = inner.close().await?; - self.data_files.append(&mut data_files); + self.data_files.extend(inner.close().await?); // start a new writer self.inner = Some(self.create_new_writer().await?); @@ -145,8 +144,7 @@ where pub async fn close(&mut self) -> Result> { // close the current writer and merge the output if let Some(mut current_writer) = self.inner.take() { - let data_files = current_writer.close().await?; - self.data_files.extend(data_files); + self.data_files.extend(current_writer.close().await?); } Ok(std::mem::take(&mut self.data_files)) From ac264fcd1be2d1eba9b8c6748bb6c9dc427ab70e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 9 Sep 2025 11:22:58 -0700 Subject: [PATCH 3/3] expose IO in rollingwriter --- .../src/writer/file_writer/rolling_writer.rs | 38 +++++++++++-------- crates/iceberg/src/writer/mod.rs | 20 +++++----- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index d2ab9fec87..022ac898c4 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -18,9 +18,11 @@ use arrow_array::RecordBatch; use crate::io::FileIO; -use crate::spec::{DataFile, PartitionKey}; +use crate::spec::PartitionKey; use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; -use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::writer::{ + CurrentFileStatus, DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder, +}; use crate::{Error, ErrorKind, Result}; /// A writer that automatically rolls over to a new file when the data size @@ -29,11 +31,12 @@ use crate::{Error, ErrorKind, Result}; /// This writer wraps another writer that tracks the amount of data written. /// When the data size exceeds the target size, it closes the current file and /// starts writing to a new one. -pub struct RollingWriter +pub struct RollingWriter where - B: IcebergWriterBuilder, + B: IcebergWriterBuilder, L: LocationGenerator, F: FileNameGenerator, + O: IntoIterator, { inner: Option, inner_builder: B, @@ -42,15 +45,17 @@ where file_name_generator: F, file_io: FileIO, partition_key: Option, - data_files: Vec, // todo this should be B::R::O? DefaultOutput? + data_files: Vec, } -impl RollingWriter +impl RollingWriter where - B: IcebergWriterBuilder, + B: IcebergWriterBuilder, B::R: CurrentFileStatus, L: LocationGenerator, F: FileNameGenerator, + O: IntoIterator, + O::Item: Clone, { /// Creates a new `RollingWriter` with the specified inner builder and target size. /// @@ -108,12 +113,12 @@ where Ok(writer) } - /// Write a record batch to the current file, rolling over to a new file if necessary. + /// Write input data to the current file, rolling over to a new file if necessary. /// /// # Arguments /// - /// * `batch` - The record batch to write - pub async fn write(&mut self, batch: RecordBatch) -> Result<()> { + /// * `input` - The input data to write + pub async fn write(&mut self, input: I) -> Result<()> { if self.inner.is_none() { // initialize inner writer self.inner = Some(self.create_new_writer().await?); @@ -131,7 +136,7 @@ where // write the input if let Some(writer) = &mut self.inner { - writer.write(batch).await + writer.write(input).await } else { Err(Error::new( ErrorKind::Unexpected, @@ -140,8 +145,8 @@ where } } - /// Close the writer and return the data files for all files. - pub async fn close(&mut self) -> Result> { + /// Close the writer and return the output items for all files. + pub async fn close(&mut self) -> Result> { // close the current writer and merge the output if let Some(mut current_writer) = self.inner.take() { self.data_files.extend(current_writer.close().await?); @@ -151,12 +156,13 @@ where } } -impl CurrentFileStatus for RollingWriter +impl CurrentFileStatus for RollingWriter where - B: IcebergWriterBuilder, - B::R: IcebergWriter + CurrentFileStatus, + B: IcebergWriterBuilder, + B::R: IcebergWriter + CurrentFileStatus, L: LocationGenerator, F: FileNameGenerator, + O: IntoIterator, { fn current_file_path(&self) -> String { if let Some(inner) = &self.inner { diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 90c5d015a3..dc1a6ec358 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -230,11 +230,20 @@ use arrow_array::RecordBatch; use crate::Result; use crate::io::OutputFile; -use crate::spec::DataFile; +use crate::spec::{DataFile, PartitionKey}; type DefaultInput = RecordBatch; type DefaultOutput = Vec; +/// The partitioning writer used to write data to multiple partitions. +pub trait PartitioningWriter { + /// Write a record batch, all rows from this record batch should come from one partition + fn write(&mut self, partition_key: PartitionKey, batch: RecordBatch) -> Result<()>; + + /// Close all writers and return the data files. + fn close(&mut self) -> Result>; +} + /// The builder for iceberg writer. #[async_trait::async_trait] pub trait IcebergWriterBuilder: @@ -269,15 +278,6 @@ pub trait CurrentFileStatus { fn current_written_size(&self) -> usize; } -/// The partitioning writer used to write data to multiple partitions. -pub trait PartitioningWriter { - /// Write a record batch, which may contain rows for multiple partitions. - fn write(&mut self, batch: RecordBatch) -> Result<()>; - - /// Close all writers and return the data files. - fn close(&mut self) -> Result>; -} - #[cfg(test)] mod tests { use arrow_array::RecordBatch;