Skip to content

Commit 738a4a2

Browse files
committed
remove config in writer builder interface
1 parent 4fba3f4 commit 738a4a2

File tree

7 files changed

+58
-81
lines changed

7 files changed

+58
-81
lines changed

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,45 +22,34 @@ use itertools::Itertools;
2222

2323
use crate::spec::{DataContentType, DataFile, Struct};
2424
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
25-
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
25+
use crate::writer::{CurrentWriterStatus, IcebergWriter, IcebergWriterBuilder};
2626
use crate::Result;
2727

2828
/// Builder for `DataFileWriter`.
2929
#[derive(Clone)]
3030
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
3131
inner: B,
32+
partition_value: Option<Struct>,
3233
}
3334

3435
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
3536
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
36-
pub fn new(inner: B) -> Self {
37-
Self { inner }
38-
}
39-
}
40-
41-
/// Config for `DataFileWriter`.
42-
pub struct DataFileWriterConfig {
43-
partition_value: Struct,
44-
}
45-
46-
impl DataFileWriterConfig {
47-
/// Create a new `DataFileWriterConfig` with partition value.
48-
pub fn new(partition_value: Option<Struct>) -> Self {
37+
pub fn new(inner: B, partition_value: Option<Struct>) -> Self {
4938
Self {
50-
partition_value: partition_value.unwrap_or(Struct::empty()),
39+
inner,
40+
partition_value,
5141
}
5242
}
5343
}
5444

5545
#[async_trait::async_trait]
5646
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
5747
type R = DataFileWriter<B>;
58-
type C = DataFileWriterConfig;
5948

60-
async fn build(self, config: Self::C) -> Result<Self::R> {
49+
async fn build(self) -> Result<Self::R> {
6150
Ok(DataFileWriter {
6251
inner_writer: Some(self.inner.clone().build().await?),
63-
partition_value: config.partition_value,
52+
partition_value: self.partition_value.unwrap_or(Struct::empty()),
6453
})
6554
}
6655
}
@@ -92,7 +81,7 @@ impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
9281
}
9382
}
9483

95-
impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
84+
impl<B: FileWriterBuilder> CurrentWriterStatus for DataFileWriter<B> {
9685
fn current_file_path(&self) -> String {
9786
self.inner_writer.as_ref().unwrap().current_file_path()
9887
}
@@ -115,9 +104,7 @@ mod test {
115104

116105
use crate::io::FileIOBuilder;
117106
use crate::spec::{DataContentType, DataFileFormat, Schema, Struct};
118-
use crate::writer::base_writer::data_file_writer::{
119-
DataFileWriterBuilder, DataFileWriterConfig,
120-
};
107+
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
121108
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
122109
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
123110
use crate::writer::file_writer::ParquetWriterBuilder;
@@ -135,14 +122,12 @@ mod test {
135122

136123
let pw = ParquetWriterBuilder::new(
137124
WriterProperties::builder().build(),
138-
Arc::new(Schema::builder().build().unwrap()),
139125
file_io.clone(),
140126
location_gen,
141127
file_name_gen,
128+
Arc::new(Schema::builder().build().unwrap()),
142129
);
143-
let mut data_file_writer = DataFileWriterBuilder::new(pw)
144-
.build(DataFileWriterConfig::new(None))
145-
.await?;
130+
let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await?;
146131

147132
let data_file = data_file_writer.close().await.unwrap();
148133
assert_eq!(data_file.len(), 1);

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,18 @@ use crate::{Error, ErrorKind, Result};
3535
#[derive(Clone)]
3636
pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
3737
inner: B,
38+
config: EqualityDeleteWriterConfig,
3839
}
3940

4041
impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
4142
/// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`.
42-
pub fn new(inner: B) -> Self {
43-
Self { inner }
43+
pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
44+
Self { inner, config }
4445
}
4546
}
4647

4748
/// Config for `EqualityDeleteWriter`.
49+
#[derive(Clone)]
4850
pub struct EqualityDeleteWriterConfig {
4951
// Field ids used to determine row equality in equality delete files.
5052
equality_ids: Vec<i32>,
@@ -108,14 +110,13 @@ impl EqualityDeleteWriterConfig {
108110
#[async_trait::async_trait]
109111
impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
110112
type R = EqualityDeleteFileWriter<B>;
111-
type C = EqualityDeleteWriterConfig;
112113

113-
async fn build(self, config: Self::C) -> Result<Self::R> {
114+
async fn build(self) -> Result<Self::R> {
114115
Ok(EqualityDeleteFileWriter {
115116
inner_writer: Some(self.inner.clone().build().await?),
116-
projector: config.projector,
117-
equality_ids: config.equality_ids,
118-
partition_value: config.partition_value,
117+
projector: self.config.projector,
118+
equality_ids: self.config.equality_ids,
119+
partition_value: self.config.partition_value,
119120
})
120121
}
121122
}
@@ -386,20 +387,19 @@ mod test {
386387
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
387388
let delete_schema =
388389
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
389-
let projector = equality_config.projector.clone();
390390

391391
// prepare writer
392392
let pb = ParquetWriterBuilder::new(
393393
WriterProperties::builder().build(),
394-
Arc::new(delete_schema),
395394
file_io.clone(),
396395
location_gen,
397396
file_name_gen,
397+
Arc::new(delete_schema),
398398
);
399-
400-
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
401-
.build(equality_config)
399+
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, equality_config)
400+
.build()
402401
.await?;
402+
let projector = equality_delete_writer.projector.clone();
403403

404404
// write
405405
equality_delete_writer.write(to_write.clone()).await?;
@@ -556,13 +556,13 @@ mod test {
556556

557557
let pb = ParquetWriterBuilder::new(
558558
WriterProperties::builder().build(),
559-
Arc::new(delete_schema),
560559
file_io.clone(),
561560
location_gen,
562561
file_name_gen,
562+
Arc::new(delete_schema),
563563
);
564-
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
565-
.build(config)
564+
let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb, config)
565+
.build()
566566
.await?;
567567

568568
// prepare data
@@ -640,10 +640,12 @@ mod test {
640640
Some(b""),
641641
Some(b"zzzz"),
642642
])) as ArrayRef;
643-
let to_write = RecordBatch::try_new(delete_arrow_schema.clone(), vec![
644-
col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13,
645-
])
646-
.unwrap();
643+
let to_write =
644+
RecordBatch::try_new(Arc::new(schema_to_arrow_schema(&schema).unwrap()), vec![
645+
col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12,
646+
col13,
647+
])
648+
.unwrap();
647649
equality_delete_writer.write(to_write.clone()).await?;
648650
let res = equality_delete_writer.close().await?;
649651
assert_eq!(res.len(), 1);

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use arrow_array::RecordBatch;
2121
use futures::Future;
2222

23-
use super::CurrentFileStatus;
23+
use super::CurrentWriterStatus;
2424
use crate::spec::DataFileBuilder;
2525
use crate::Result;
2626

@@ -41,7 +41,7 @@ pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
4141
}
4242

4343
/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
44-
pub trait FileWriter<O = DefaultOutput>: Send + CurrentFileStatus + 'static {
44+
pub trait FileWriter<O = DefaultOutput>: Send + CurrentWriterStatus + 'static {
4545
/// Write record batch to file.
4646
fn write(&mut self, batch: &RecordBatch) -> impl Future<Output = Result<()>> + Send;
4747
/// Close file writer.

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,36 +42,35 @@ use crate::spec::{
4242
visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef,
4343
PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type,
4444
};
45-
use crate::writer::CurrentFileStatus;
45+
use crate::writer::CurrentWriterStatus;
4646
use crate::{Error, ErrorKind, Result};
4747

4848
/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
4949
#[derive(Clone)]
5050
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
5151
props: WriterProperties,
52-
schema: SchemaRef,
53-
5452
file_io: FileIO,
5553
location_generator: T,
5654
file_name_generator: F,
55+
schema: SchemaRef,
5756
}
5857

5958
impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
6059
/// Create a new `ParquetWriterBuilder`
6160
/// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
6261
pub fn new(
6362
props: WriterProperties,
64-
schema: SchemaRef,
6563
file_io: FileIO,
6664
location_generator: T,
6765
file_name_generator: F,
66+
schema: SchemaRef,
6867
) -> Self {
6968
Self {
7069
props,
71-
schema,
7270
file_io,
7371
location_generator,
7472
file_name_generator,
73+
schema,
7574
}
7675
}
7776
}
@@ -80,20 +79,19 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8079
type R = ParquetWriter;
8180

8281
async fn build(self) -> crate::Result<Self::R> {
83-
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
8482
let written_size = Arc::new(AtomicI64::new(0));
8583
let out_file = self.file_io.new_output(
8684
self.location_generator
8785
.generate_location(&self.file_name_generator.generate_file_name()),
8886
)?;
8987
let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone());
9088
let async_writer = AsyncFileWriter::new(inner_writer);
91-
let writer =
92-
AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), Some(self.props))
93-
.map_err(|err| {
94-
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
95-
.with_source(err)
96-
})?;
89+
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
90+
let writer = AsyncArrowWriter::try_new(async_writer, arrow_schema, Some(self.props))
91+
.map_err(|err| {
92+
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
93+
.with_source(err)
94+
})?;
9795

9896
Ok(ParquetWriter {
9997
schema: self.schema.clone(),
@@ -422,7 +420,7 @@ impl FileWriter for ParquetWriter {
422420
}
423421
}
424422

425-
impl CurrentFileStatus for ParquetWriter {
423+
impl CurrentWriterStatus for ParquetWriter {
426424
fn current_file_path(&self) -> String {
427425
self.out_file.location().to_string()
428426
}
@@ -667,10 +665,10 @@ mod tests {
667665
// write data
668666
let mut pw = ParquetWriterBuilder::new(
669667
WriterProperties::builder().build(),
670-
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
671668
file_io.clone(),
672669
location_gen,
673670
file_name_gen,
671+
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
674672
)
675673
.build()
676674
.await?;
@@ -863,10 +861,10 @@ mod tests {
863861
// write data
864862
let mut pw = ParquetWriterBuilder::new(
865863
WriterProperties::builder().build(),
866-
Arc::new(schema),
867864
file_io.clone(),
868865
location_gen,
869866
file_name_gen,
867+
Arc::new(schema),
870868
)
871869
.build()
872870
.await?;
@@ -1053,10 +1051,10 @@ mod tests {
10531051
// write data
10541052
let mut pw = ParquetWriterBuilder::new(
10551053
WriterProperties::builder().build(),
1056-
Arc::new(schema),
10571054
file_io.clone(),
10581055
loccation_gen,
10591056
file_name_gen,
1057+
Arc::new(schema),
10601058
)
10611059
.build()
10621060
.await?;

crates/iceberg/src/writer/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,8 @@ pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
6363
{
6464
/// The associated writer type.
6565
type R: IcebergWriter<I, O>;
66-
/// The associated writer config type used to build the writer.
67-
type C;
6866
/// Build the iceberg writer.
69-
async fn build(self, config: Self::C) -> Result<Self::R>;
67+
async fn build(self) -> Result<Self::R>;
7068
}
7169

7270
/// The iceberg writer used to write data to iceberg table.
@@ -83,7 +81,7 @@ pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
8381

8482
/// The current file status of iceberg writer. It implement for the writer which write a single
8583
/// file.
86-
pub trait CurrentFileStatus {
84+
pub trait CurrentWriterStatus {
8785
/// Get the current file path.
8886
fn current_file_path(&self) -> String;
8987
/// Get the current file row number.

crates/integration_tests/tests/append_data_file_test.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2424
use futures::TryStreamExt;
2525
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
2626
use iceberg::transaction::Transaction;
27-
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
27+
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
2828
use iceberg::writer::file_writer::location_generator::{
2929
DefaultFileNameGenerator, DefaultLocationGenerator,
3030
};
@@ -92,16 +92,13 @@ async fn test_append_data_file() {
9292
);
9393
let parquet_writer_builder = ParquetWriterBuilder::new(
9494
WriterProperties::default(),
95-
table.metadata().current_schema().clone(),
9695
table.file_io().clone(),
9796
location_generator.clone(),
9897
file_name_generator.clone(),
98+
table.metadata().current_schema().clone(),
9999
);
100-
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
101-
let mut data_file_writer = data_file_writer_builder
102-
.build(DataFileWriterConfig::new(None))
103-
.await
104-
.unwrap();
100+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
101+
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
105102
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
106103
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
107104
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);

crates/integration_tests/tests/conflict_commit_test.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
2424
use futures::TryStreamExt;
2525
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
2626
use iceberg::transaction::Transaction;
27-
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
27+
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
2828
use iceberg::writer::file_writer::location_generator::{
2929
DefaultFileNameGenerator, DefaultLocationGenerator,
3030
};
@@ -91,16 +91,13 @@ async fn test_append_data_file_conflict() {
9191
);
9292
let parquet_writer_builder = ParquetWriterBuilder::new(
9393
WriterProperties::default(),
94-
table.metadata().current_schema().clone(),
9594
table.file_io().clone(),
9695
location_generator.clone(),
9796
file_name_generator.clone(),
97+
table.metadata().current_schema().clone(),
9898
);
99-
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
100-
let mut data_file_writer = data_file_writer_builder
101-
.build(DataFileWriterConfig::new(None))
102-
.await
103-
.unwrap();
99+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
100+
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
104101
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
105102
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
106103
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);

0 commit comments

Comments
 (0)