Skip to content

Commit c1ea00a

Browse files
committed
refine
1 parent 4407bc1 commit c1ea00a

File tree

4 files changed

+57
-39
lines changed

4 files changed

+57
-39
lines changed

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! This module provide `DataFileWriter`.
1919
20-
use crate::spec::{DataContentType, DataFileBuilder};
20+
use crate::spec::{DataContentType, DataFile, Struct};
2121
use crate::writer::file_writer::FileWriter;
2222
use crate::writer::CurrentFileStatus;
2323
use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, IcebergWriterBuilder};
@@ -38,14 +38,30 @@ impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
3838
}
3939
}
4040

41-
#[allow(async_fn_in_trait)]
41+
/// Config for `DataFileWriter`.
42+
pub struct DataFileWriterConfig {
43+
partition_value: Struct,
44+
}
45+
46+
impl DataFileWriterConfig {
47+
/// Create a new `DataFileWriterConfig` with partition value.
48+
pub fn new(partition_value: Option<Struct>) -> Self {
49+
Self {
50+
partition_value: partition_value.unwrap_or(Struct::empty()),
51+
}
52+
}
53+
}
54+
55+
#[async_trait::async_trait]
4256
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
4357
type R = DataFileWriter<B>;
58+
type C = DataFileWriterConfig;
4459

45-
async fn build(self) -> Result<Self::R> {
60+
async fn build(self, config: Self::C) -> Result<Self::R> {
4661
Ok(DataFileWriter {
4762
inner_writer: self.inner.clone().build().await?,
4863
builder: self.inner,
64+
partition_value: config.partition_value,
4965
})
5066
}
5167
}
@@ -54,6 +70,7 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
5470
pub struct DataFileWriter<B: FileWriterBuilder> {
5571
builder: B,
5672
inner_writer: B::R,
73+
partition_value: Struct,
5774
}
5875

5976
#[async_trait::async_trait]
@@ -62,15 +79,16 @@ impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
6279
self.inner_writer.write(&batch).await
6380
}
6481

65-
async fn flush(&mut self) -> Result<Vec<DataFileBuilder>> {
82+
async fn flush(&mut self) -> Result<Vec<DataFile>> {
6683
let writer = std::mem::replace(&mut self.inner_writer, self.builder.clone().build().await?);
6784
let res = writer
6885
.close()
6986
.await?
7087
.into_iter()
7188
.map(|mut res| {
7289
res.content(DataContentType::Data);
73-
res
90+
res.partition(self.partition_value.clone());
91+
res.build().expect("Guranteed to be valid")
7492
})
7593
.collect_vec();
7694
Ok(res)
@@ -101,9 +119,9 @@ mod test {
101119

102120
use crate::{
103121
io::FileIOBuilder,
104-
spec::{DataFileFormat, Struct},
122+
spec::DataFileFormat,
105123
writer::{
106-
base_writer::data_file_writer::DataFileWriterBuilder,
124+
base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig},
107125
file_writer::{
108126
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
109127
ParquetWriterBuilder,
@@ -141,7 +159,7 @@ mod test {
141159
)
142160
.with_metadata(HashMap::from([(
143161
PARQUET_FIELD_ID_META_KEY.to_string(),
144-
"-1".to_string(),
162+
"5".to_string(),
145163
)]))]
146164
.into(),
147165
),
@@ -160,7 +178,7 @@ mod test {
160178
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
161179
.with_metadata(HashMap::from([(
162180
PARQUET_FIELD_ID_META_KEY.to_string(),
163-
"-1".to_string(),
181+
"6".to_string(),
164182
)])),
165183
)),
166184
true,
@@ -182,15 +200,15 @@ mod test {
182200
)
183201
.with_metadata(HashMap::from([(
184202
PARQUET_FIELD_ID_META_KEY.to_string(),
185-
"-1".to_string(),
203+
"7".to_string(),
186204
)]))]
187205
.into(),
188206
),
189207
true,
190208
)
191209
.with_metadata(HashMap::from([(
192210
PARQUET_FIELD_ID_META_KEY.to_string(),
193-
"-1".to_string(),
211+
"8".to_string(),
194212
)]))]
195213
.into(),
196214
),
@@ -209,7 +227,7 @@ mod test {
209227
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
210228
.with_metadata(HashMap::from([(
211229
PARQUET_FIELD_ID_META_KEY.to_string(),
212-
"-1".to_string(),
230+
"5".to_string(),
213231
)])),
214232
]
215233
.into(),
@@ -231,7 +249,7 @@ mod test {
231249
arrow_array::ListArray::new(
232250
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
233251
PARQUET_FIELD_ID_META_KEY.to_string(),
234-
"-1".to_string(),
252+
"6".to_string(),
235253
)]))),
236254
list_parts.1,
237255
list_parts.2,
@@ -249,23 +267,23 @@ mod test {
249267
)
250268
.with_metadata(HashMap::from([(
251269
PARQUET_FIELD_ID_META_KEY.to_string(),
252-
"-1".to_string(),
270+
"7".to_string(),
253271
)]))]
254272
.into(),
255273
),
256274
true,
257275
)
258276
.with_metadata(HashMap::from([(
259277
PARQUET_FIELD_ID_META_KEY.to_string(),
260-
"-1".to_string(),
278+
"8".to_string(),
261279
)]))]
262280
.into(),
263281
vec![Arc::new(StructArray::new(
264282
vec![
265283
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
266284
.with_metadata(HashMap::from([(
267285
PARQUET_FIELD_ID_META_KEY.to_string(),
268-
"-1".to_string(),
286+
"7".to_string(),
269287
)])),
270288
]
271289
.into(),
@@ -285,20 +303,16 @@ mod test {
285303
location_gen,
286304
file_name_gen,
287305
);
288-
let mut data_file_writer = DataFileWriterBuilder::new(pb).build().await?;
306+
let mut data_file_writer = DataFileWriterBuilder::new(pb)
307+
.build(DataFileWriterConfig::new(None))
308+
.await?;
289309

290310
for _ in 0..3 {
291311
// write
292312
data_file_writer.write(to_write.clone()).await?;
293313
let res = data_file_writer.flush().await?;
294314
assert_eq!(res.len(), 1);
295-
let data_file = res
296-
.into_iter()
297-
.next()
298-
.unwrap()
299-
.partition(Struct::empty())
300-
.build()
301-
.unwrap();
315+
let data_file = res.into_iter().next().unwrap();
302316

303317
// check
304318
check_parquet_data_file(&file_io, &data_file, &to_write).await;

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! This module contains the writer for data file format supported by iceberg: parquet, orc.
1919
20-
use super::{CurrentFileStatus, DefaultOutput};
21-
use crate::Result;
20+
use super::CurrentFileStatus;
21+
use crate::{spec::DataFileBuilder, Result};
2222
use arrow_array::RecordBatch;
2323
use futures::Future;
2424

@@ -28,6 +28,8 @@ mod track_writer;
2828

2929
pub mod location_generator;
3030

31+
type DefaultOutput = Vec<DataFileBuilder>;
32+
3133
/// File writer builder trait.
3234
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
3335
/// The associated file writer type.

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ mod tests {
352352
)
353353
.with_metadata(HashMap::from([(
354354
PARQUET_FIELD_ID_META_KEY.to_string(),
355-
"-1".to_string(),
355+
"5".to_string(),
356356
)]))]
357357
.into(),
358358
),
@@ -371,7 +371,7 @@ mod tests {
371371
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
372372
.with_metadata(HashMap::from([(
373373
PARQUET_FIELD_ID_META_KEY.to_string(),
374-
"-1".to_string(),
374+
"6".to_string(),
375375
)])),
376376
)),
377377
true,
@@ -393,15 +393,15 @@ mod tests {
393393
)
394394
.with_metadata(HashMap::from([(
395395
PARQUET_FIELD_ID_META_KEY.to_string(),
396-
"-1".to_string(),
396+
"7".to_string(),
397397
)]))]
398398
.into(),
399399
),
400400
true,
401401
)
402402
.with_metadata(HashMap::from([(
403403
PARQUET_FIELD_ID_META_KEY.to_string(),
404-
"-1".to_string(),
404+
"8".to_string(),
405405
)]))]
406406
.into(),
407407
),
@@ -420,7 +420,7 @@ mod tests {
420420
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
421421
.with_metadata(HashMap::from([(
422422
PARQUET_FIELD_ID_META_KEY.to_string(),
423-
"-1".to_string(),
423+
"5".to_string(),
424424
)])),
425425
]
426426
.into(),
@@ -442,7 +442,7 @@ mod tests {
442442
arrow_array::ListArray::new(
443443
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
444444
PARQUET_FIELD_ID_META_KEY.to_string(),
445-
"-1".to_string(),
445+
"6".to_string(),
446446
)]))),
447447
list_parts.1,
448448
list_parts.2,
@@ -460,23 +460,23 @@ mod tests {
460460
)
461461
.with_metadata(HashMap::from([(
462462
PARQUET_FIELD_ID_META_KEY.to_string(),
463-
"-1".to_string(),
463+
"7".to_string(),
464464
)]))]
465465
.into(),
466466
),
467467
true,
468468
)
469469
.with_metadata(HashMap::from([(
470470
PARQUET_FIELD_ID_META_KEY.to_string(),
471-
"-1".to_string(),
471+
"8".to_string(),
472472
)]))]
473473
.into(),
474474
vec![Arc::new(StructArray::new(
475475
vec![
476476
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
477477
.with_metadata(HashMap::from([(
478478
PARQUET_FIELD_ID_META_KEY.to_string(),
479-
"-1".to_string(),
479+
"7".to_string(),
480480
)])),
481481
]
482482
.into(),

crates/iceberg/src/writer/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,23 @@
4848
pub mod base_writer;
4949
pub mod file_writer;
5050

51-
use crate::{spec::DataFileBuilder, Result};
51+
use crate::{spec::DataFile, Result};
5252
use arrow_array::RecordBatch;
5353

5454
type DefaultInput = RecordBatch;
55-
type DefaultOutput = Vec<DataFileBuilder>;
55+
type DefaultOutput = Vec<DataFile>;
5656

5757
/// The builder for iceberg writer.
58-
#[allow(async_fn_in_trait)]
58+
#[async_trait::async_trait]
5959
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
6060
Send + Clone + 'static
6161
{
6262
/// The associated writer type.
6363
type R: IcebergWriter<I, O>;
64+
/// The associated writer config type used to build the writer.
65+
type C;
6466
/// Build the iceberg writer.
65-
async fn build(self) -> Result<Self::R>;
67+
async fn build(self, config: Self::C) -> Result<Self::R>;
6668
}
6769

6870
/// The iceberg writer used to write data to iceberg table.

0 commit comments

Comments
 (0)