diff --git a/Cargo.toml b/Cargo.toml index 296cb5d42e..97326667a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ bytes = "1.5" chrono = "0.4.34" ctor = "0.2.8" derive_builder = "0.20" +dyn-clone = "1" either = "1" env_logger = "0.11.0" fnv = "1" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 1307cc6f32..3b44dd853c 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -58,6 +58,7 @@ bitvec = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } derive_builder = { workspace = true } +dyn-clone = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 6f9c0a8920..654aa9c9c1 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -42,7 +42,6 @@ impl DataFileWriterBuilder { } } -#[async_trait::async_trait] impl IcebergWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; @@ -61,13 +60,12 @@ pub struct DataFileWriter { partition_value: Struct, } -#[async_trait::async_trait] impl IcebergWriter for DataFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { self.inner_writer.as_mut().unwrap().write(&batch).await } - async fn close(&mut self) -> Result> { + async fn close(mut self) -> Result> { let writer = self.inner_writer.take().unwrap(); Ok(writer .close() @@ -128,7 +126,7 @@ mod test { location_gen, file_name_gen, ); - let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await?; + let data_file_writer = DataFileWriterBuilder::new(pw, None).build().await?; let data_file = data_file_writer.close().await.unwrap(); assert_eq!(data_file.len(), 1); diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 328e2b93da..f18e2d4d88 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -107,7 +107,6 @@ impl EqualityDeleteWriterConfig { } } -#[async_trait::async_trait] impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { type R = EqualityDeleteFileWriter; @@ -130,7 +129,6 @@ pub struct EqualityDeleteFileWriter { partition_value: Struct, } -#[async_trait::async_trait] impl IcebergWriter for EqualityDeleteFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { let batch = self.projector.project_bacth(batch)?; @@ -144,7 +142,7 @@ impl IcebergWriter for EqualityDeleteFileWriter { } } - async fn close(&mut self) -> Result> { + async fn close(mut self) -> Result> { if let Some(writer) = self.inner_writer.take() { Ok(writer .close() @@ -342,11 +340,11 @@ mod test { ])) as ArrayRef; let col3 = Arc::new({ let list_parts = arrow_array::ListArray::from_iter_primitive::(vec![ - Some( - vec![Some(1),] - ); - 1024 - ]) + Some( + vec![Some(1), ] + ); + 1024 + ]) .into_parts(); arrow_array::ListArray::new( if let DataType::List(field) = arrow_schema.fields.get(3).unwrap().data_type() { diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 64357a0fe2..7e42a596ff 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -48,6 +48,8 @@ pub mod base_writer; pub mod file_writer; +use std::future::Future; + use arrow_array::RecordBatch; use crate::spec::DataFile; @@ -57,28 +59,128 @@ type DefaultInput = RecordBatch; type DefaultOutput = Vec; /// The builder for iceberg writer. -#[async_trait::async_trait] pub trait IcebergWriterBuilder: Send + Clone + 'static { /// The associated writer type. type R: IcebergWriter; /// Build the iceberg writer. - async fn build(self) -> Result; + fn build(self) -> impl Future> + Send; } /// The iceberg writer used to write data to iceberg table. -#[async_trait::async_trait] pub trait IcebergWriter: Send + 'static { /// Write data to iceberg table. - async fn write(&mut self, input: I) -> Result<()>; + fn write(&mut self, input: I) -> impl Future> + Send + '_; /// Close the writer and return the written data files. /// If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again. /// # NOTE /// After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic. - async fn close(&mut self) -> Result; + fn close(self) -> impl Future> + Send; } +mod dyn_trait { + use dyn_clone::{clone_trait_object, DynClone}; + + use super::Result; + use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder}; + + #[async_trait::async_trait] + pub trait DynIcebergWriterBuilder: Send + DynClone + 'static { + async fn dyn_build(self: Box) -> Result>; + } + + clone_trait_object!( DynIcebergWriterBuilder); + + #[async_trait::async_trait] + impl> + DynIcebergWriterBuilder for B + { + async fn dyn_build(self: Box) -> Result> { + Ok(>::build(*self) + .await? + .boxed()) + } + } + + /// Type alias for `Box` + pub type BoxedIcebergWriterBuilder = + Box>; + + impl IcebergWriterBuilder + for BoxedIcebergWriterBuilder + { + type R = BoxedIcebergWriter; + + async fn build(self) -> Result { + self.dyn_build().await + } + } + + /// Extension methods for `IcebergWriterBuilder` + pub trait IcebergWriterBuilderDynExt: + IcebergWriterBuilder + Sized + { + /// Create a type erased `IcebergWriterBuilder` wrapped with `Box`. + fn boxed(self) -> BoxedIcebergWriterBuilder { + Box::new(self) as _ + } + } + + impl IcebergWriterBuilderDynExt for B where B: IcebergWriterBuilder + {} + + /// The dyn iceberg writer used to write data to iceberg table. + #[async_trait::async_trait] + pub trait DynIcebergWriter: Send + 'static { + /// `write` of trait `IcebergWriter` + async fn dyn_write(&mut self, input: I) -> Result<()>; + /// `close` of trait `IcebergWriter` + async fn dyn_close(self: Box) -> Result; + } + + #[async_trait::async_trait] + impl> DynIcebergWriter for W { + async fn dyn_write(&mut self, input: I) -> Result<()> { + self.write(input).await + } + + async fn dyn_close(self: Box) -> Result { + (*self).close().await + } + } + + /// Type alias for `Box` + pub type BoxedIcebergWriter = + Box>; + + impl IcebergWriter for BoxedIcebergWriter { + async fn write(&mut self, input: I) -> Result<()> { + (**self).dyn_write(input).await + } + + async fn close(self) -> Result { + self.dyn_close().await + } + } + + /// Extension methods for `IcebergWriter` + pub trait IcebergWriterDynExt: + IcebergWriter + Sized + { + /// Create a type erased `IcebergWriter` wrapped with `Box`. + fn boxed(self) -> BoxedIcebergWriter { + Box::new(self) as _ + } + } + + impl> IcebergWriterDynExt for W {} +} + +pub use dyn_trait::{ + BoxedIcebergWriter, BoxedIcebergWriterBuilder, IcebergWriterBuilderDynExt, IcebergWriterDynExt, +}; + /// The current file status of iceberg writer. It implement for the writer which write a single /// file. pub trait CurrentFileStatus { @@ -92,21 +194,47 @@ pub trait CurrentFileStatus { #[cfg(test)] mod tests { + use std::sync::Arc; + use arrow_array::RecordBatch; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; - use super::IcebergWriter; - use crate::io::FileIO; + use super::{ + IcebergWriter, IcebergWriterBuilder, IcebergWriterBuilderDynExt, IcebergWriterDynExt, + }; + use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{DataFile, DataFileFormat}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; // This function is used to guarantee the trait can be used as a object safe trait. - async fn _guarantee_object_safe(mut w: Box) { - let _ = w + async fn _guarantee_dyn_trait(builder: impl IcebergWriterBuilder) { + fn ensure_writer_builder(builder: WB) -> WB { + builder + } + + fn ensure_writer(writer: W) -> W { + writer + } + + let writer = ensure_writer(builder.clone().build().await.unwrap()); + let mut boxed_writer = ensure_writer(writer.boxed()); + let _ = boxed_writer + .write(RecordBatch::new_empty(Schema::empty().into())) + .await; + let _ = boxed_writer.close().await; + let boxed_builder = ensure_writer_builder(builder.boxed()); + let mut boxed_writer = ensure_writer(boxed_builder.clone().build().await.unwrap()); + + let _ = boxed_writer .write(RecordBatch::new_empty(Schema::empty().into())) .await; - let _ = w.close().await; + let _ = boxed_writer.close().await; } // This function check: @@ -131,4 +259,25 @@ mod tests { let res = concat_batches(&batch.schema(), &batches).unwrap(); assert_eq!(*batch, res); } + + #[tokio::test] + async fn test_build_box_writer() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(crate::spec::Schema::builder().build().unwrap()), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_builder = DataFileWriterBuilder::new(pw, None).boxed(); + + let _writer = data_file_builder.build().await.unwrap(); + } }