Skip to content

Commit 26f305f

Browse files
author
ZENOTME
committed
Add DataFileBuilder and remove **WriteResult trait
1 parent 982b54b commit 26f305f

File tree

3 files changed

+24
-32
lines changed

3 files changed

+24
-32
lines changed

crates/iceberg/src/spec/manifest.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use serde_json::to_vec;
3232
use std::cmp::min;
3333
use std::collections::HashMap;
3434
use std::str::FromStr;
35-
3635
/// A manifest contains metadata and a list of entries.
3736
#[derive(Debug, PartialEq, Eq, Clone)]
3837
pub struct Manifest {
@@ -851,7 +850,11 @@ impl TryFrom<i32> for ManifestStatus {
851850
}
852851

853852
/// Data file carries data file path, partition tuple, metrics, …
854-
#[derive(Debug, PartialEq, Clone, Eq)]
853+
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
854+
/// For optional field, we use `#[builder(default)]` or `#[builder(setter(strip_option), default)]` so that the field
855+
/// will be set to `Default` when it is not set.
856+
/// For required field, the build will fail if it is not set.
857+
#[builder(name = "DataFileBuilder", setter(prefix = "with"))]
855858
pub struct DataFile {
856859
/// field id: 134
857860
///
@@ -886,25 +889,29 @@ pub struct DataFile {
886889
/// Map from column id to the total size on disk of all regions that
887890
/// store the column. Does not include bytes necessary to read other
888891
/// columns, like footers. Leave null for row-oriented formats (Avro)
892+
#[builder(default)]
889893
column_sizes: HashMap<i32, u64>,
890894
/// field id: 109
891895
/// key field id: 119
892896
/// value field id: 120
893897
///
894898
/// Map from column id to number of values in the column (including null
895899
/// and NaN values)
900+
#[builder(default)]
896901
value_counts: HashMap<i32, u64>,
897902
/// field id: 110
898903
/// key field id: 121
899904
/// value field id: 122
900905
///
901906
/// Map from column id to number of null values in the column
907+
#[builder(default)]
902908
null_value_counts: HashMap<i32, u64>,
903909
/// field id: 137
904910
/// key field id: 138
905911
/// value field id: 139
906912
///
907913
/// Map from column id to number of NaN values in the column
914+
#[builder(default)]
908915
nan_value_counts: HashMap<i32, u64>,
909916
/// field id: 125
910917
/// key field id: 126
@@ -917,6 +924,7 @@ pub struct DataFile {
917924
/// Reference:
918925
///
919926
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
927+
#[builder(default)]
920928
lower_bounds: HashMap<i32, Literal>,
921929
/// field id: 128
922930
/// key field id: 129
@@ -929,16 +937,19 @@ pub struct DataFile {
929937
/// Reference:
930938
///
931939
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
940+
#[builder(default)]
932941
upper_bounds: HashMap<i32, Literal>,
933942
/// field id: 131
934943
///
935944
/// Implementation-specific key metadata for encryption
945+
#[builder(default)]
936946
key_metadata: Vec<u8>,
937947
/// field id: 132
938948
/// element field id: 133
939949
///
940950
/// Split offsets for the data file. For example, all row group offsets
941951
/// in a Parquet file. Must be sorted ascending
952+
#[builder(default)]
942953
split_offsets: Vec<i64>,
943954
/// field id: 135
944955
/// element field id: 136
@@ -947,6 +958,7 @@ pub struct DataFile {
947958
/// Required when content is EqualityDeletes and should be null
948959
/// otherwise. Fields with ids listed in this column must be present
949960
/// in the delete file
961+
#[builder(default)]
950962
equality_ids: Vec<i32>,
951963
/// field id: 140
952964
///
@@ -958,6 +970,7 @@ pub struct DataFile {
958970
/// sorted by file and position, not a table order, and should set sort
959971
/// order id to null. Readers must ignore sort order id for position
960972
/// delete files.
973+
#[builder(setter(strip_option), default)]
961974
sort_order_id: Option<i32>,
962975
}
963976

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
//! Iceberg File Writer
1919
20-
use super::{CurrentFileStatus, IcebergWriteResult};
21-
use crate::Result;
20+
use super::CurrentFileStatus;
21+
use crate::{spec::DataFileBuilder, Result};
2222
use arrow_array::RecordBatch;
2323
use arrow_schema::SchemaRef;
2424

@@ -34,18 +34,8 @@ pub trait FileWriterBuilder: Send + Clone + 'static {
3434
/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
3535
#[async_trait::async_trait]
3636
pub trait FileWriter: Send + 'static + CurrentFileStatus {
37-
/// The associated file write result type.
38-
type R: FileWriteResult;
3937
/// Write record batch to file.
4038
async fn write(&mut self, batch: &RecordBatch) -> Result<()>;
4139
/// Close file writer.
42-
async fn close(self) -> Result<Vec<Self::R>>;
43-
}
44-
45-
/// File write result.
46-
pub trait FileWriteResult: Send + 'static {
47-
/// The associated iceberg write result type.
48-
type R: IcebergWriteResult;
49-
/// Convert to iceberg write result.
50-
fn to_iceberg_result(self) -> Self::R;
40+
async fn close(self) -> Result<Vec<DataFileBuilder>>;
5141
}

crates/iceberg/src/writer/mod.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
//! iceberg_writer.write(input).await?;
4949
//!
5050
//! let write_result = iceberg_writer.flush().await?;
51+
//!
52+
//! let data_file = write_result.into_iter().map(|builder|builder.build()).collect::<Vec<_>>();
5153
//! ```
5254
//!
5355
//! # Complex Case 2: Create a fanout partition data file writer using parquet file format.
@@ -65,12 +67,11 @@
6567
//! iceberg_writer.write(input).await?;
6668
//!
6769
//! let write_result = iceberg_writer.flush().await?;
70+
//!
71+
//! let data_file = write_result.into_iter().map(|builder|builder.build()).collect::<Vec<_>>();
6872
//! ```
6973
70-
use crate::{
71-
spec::{DataContentType, Struct},
72-
Result,
73-
};
74+
use crate::{spec::DataFileBuilder, Result};
7475
use arrow_array::RecordBatch;
7576
use arrow_schema::SchemaRef;
7677

@@ -90,22 +91,10 @@ pub trait IcebergWriterBuilder<I = DefaultInput>: Send + Clone + 'static {
9091
/// The iceberg writer used to write data to iceberg table.
9192
#[async_trait::async_trait]
9293
pub trait IcebergWriter<I = DefaultInput>: Send + 'static {
93-
/// The associated write result type.
94-
type R: IcebergWriteResult;
9594
/// Write data to iceberg table.
9695
async fn write(&mut self, input: I) -> Result<()>;
9796
/// Flush the writer and return the write result.
98-
async fn flush(&mut self) -> Result<Vec<Self::R>>;
99-
}
100-
101-
/// The write result of iceberg writer.
102-
pub trait IcebergWriteResult: Send + Sync + 'static {
103-
/// Set the content type of the write result.
104-
fn set_content(&mut self, content: DataContentType) -> &mut Self;
105-
/// Set the equality ids of the write result.
106-
fn set_equality_ids(&mut self, equality_ids: Vec<i32>) -> &mut Self;
107-
/// Set the partition of the write result.
108-
fn set_partition(&mut self, partition_value: Struct) -> &mut Self;
97+
async fn flush(&mut self) -> Result<Vec<DataFileBuilder>>;
10998
}
11099

111100
/// The current file status of iceberg writer. It implement for the writer which write a single

0 commit comments

Comments
 (0)