Skip to content

Commit 5188516

Browse files
committed
support lower_bound&&upper_bound for parquet writer
1 parent 1bf80e1 commit 5188516

File tree

7 files changed

+1008
-454
lines changed

7 files changed

+1008
-454
lines changed

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ lazy_static = { workspace = true }
5252
log = { workspace = true }
5353
murmur3 = { workspace = true }
5454
once_cell = { workspace = true }
55-
opendal = { workspace = true }
55+
opendal = { workspace = true, features = ["services-fs"] }
5656
ordered-float = { workspace = true }
5757
parquet = { workspace = true, features = ["async"] }
5858
reqwest = { workspace = true }

crates/iceberg/src/arrow/schema.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ use rust_decimal::prelude::ToPrimitive;
3434
use std::collections::HashMap;
3535
use std::sync::Arc;
3636

37+
/// When iceberg map type convert to Arrow map type, the default map field name is "key_value".
38+
pub const DEFAULT_MAP_FIELD_NAME: &str = "key_value";
39+
3740
/// A post order arrow schema visitor.
3841
///
3942
/// For order of methods called, please refer to [`visit_schema`].
@@ -499,9 +502,10 @@ impl SchemaVisitor for ToArrowSchemaConverter {
499502
_ => unreachable!(),
500503
};
501504
let field = Field::new(
502-
"entries",
505+
DEFAULT_MAP_FIELD_NAME,
503506
DataType::Struct(vec![key_field, value_field].into()),
504-
map.value_field.required,
507+
// Map field is always not nullable
508+
false,
505509
);
506510

507511
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Map(
@@ -561,7 +565,7 @@ impl SchemaVisitor for ToArrowSchemaConverter {
561565
Ok(ArrowSchemaOrFieldOrType::Type(DataType::Date32))
562566
}
563567
crate::spec::PrimitiveType::Time => Ok(ArrowSchemaOrFieldOrType::Type(
564-
DataType::Time32(TimeUnit::Microsecond),
568+
DataType::Time64(TimeUnit::Microsecond),
565569
)),
566570
crate::spec::PrimitiveType::Timestamp => Ok(ArrowSchemaOrFieldOrType::Type(
567571
DataType::Timestamp(TimeUnit::Microsecond, None),
@@ -657,10 +661,9 @@ mod tests {
657661
let r#struct = DataType::Struct(fields);
658662
let map = DataType::Map(
659663
Arc::new(
660-
Field::new("entries", r#struct, false).with_metadata(HashMap::from([(
661-
PARQUET_FIELD_ID_META_KEY.to_string(),
662-
"19".to_string(),
663-
)])),
664+
Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false).with_metadata(HashMap::from([
665+
(PARQUET_FIELD_ID_META_KEY.to_string(), "19".to_string()),
666+
])),
664667
),
665668
false,
666669
);
@@ -1022,7 +1025,10 @@ mod tests {
10221025
]);
10231026

10241027
let r#struct = DataType::Struct(fields);
1025-
let map = DataType::Map(Arc::new(Field::new("entries", r#struct, false)), false);
1028+
let map = DataType::Map(
1029+
Arc::new(Field::new(DEFAULT_MAP_FIELD_NAME, r#struct, false)),
1030+
false,
1031+
);
10261032

10271033
let fields = Fields::from(vec![
10281034
Field::new("aa", DataType::Int32, false).with_metadata(HashMap::from([(
@@ -1086,7 +1092,7 @@ mod tests {
10861092
PARQUET_FIELD_ID_META_KEY.to_string(),
10871093
"8".to_string(),
10881094
)])),
1089-
Field::new("i", DataType::Time32(TimeUnit::Microsecond), false).with_metadata(
1095+
Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata(
10901096
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "9".to_string())]),
10911097
),
10921098
Field::new(

crates/iceberg/src/spec/datatypes.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,13 @@ pub struct ListType {
658658
pub element_field: NestedFieldRef,
659659
}
660660

661+
impl ListType {
662+
/// Construct a list type with the given element field.
663+
pub fn new(element_field: NestedFieldRef) -> Self {
664+
Self { element_field }
665+
}
666+
}
667+
661668
/// Module for type serialization/deserialization.
662669
pub(super) mod _serde {
663670
use crate::spec::datatypes::Type::Map;
@@ -773,6 +780,16 @@ pub struct MapType {
773780
pub value_field: NestedFieldRef,
774781
}
775782

783+
impl MapType {
784+
/// Construct a map type with the given key and value fields.
785+
pub fn new(key_field: NestedFieldRef, value_field: NestedFieldRef) -> Self {
786+
Self {
787+
key_field,
788+
value_field,
789+
}
790+
}
791+
}
792+
776793
#[cfg(test)]
777794
mod tests {
778795
use pretty_assertions::assert_eq;

crates/iceberg/src/spec/schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1179,7 +1179,7 @@ mod tests {
11791179
(schema, record)
11801180
}
11811181

1182-
fn table_schema_nested() -> Schema {
1182+
pub fn table_schema_nested() -> Schema {
11831183
Schema::builder()
11841184
.with_schema_id(1)
11851185
.with_identifier_field_ids(vec![2])

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 17 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,13 @@ impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
108108

109109
#[cfg(test)]
110110
mod test {
111-
use std::{collections::HashMap, sync::Arc};
111+
use std::sync::Arc;
112112

113-
use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, StructArray};
114-
use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, file::properties::WriterProperties};
113+
use crate::{
114+
spec::{DataContentType, Schema, Struct},
115+
Result,
116+
};
117+
use parquet::file::properties::WriterProperties;
115118
use tempfile::TempDir;
116119

117120
use crate::{
@@ -123,195 +126,35 @@ mod test {
123126
location_generator::{test::MockLocationGenerator, DefaultFileNameGenerator},
124127
ParquetWriterBuilder,
125128
},
126-
tests::check_parquet_data_file,
127129
IcebergWriter, IcebergWriterBuilder,
128130
},
129131
};
130132

131133
#[tokio::test]
132-
async fn test_data_file_writer() -> Result<(), anyhow::Error> {
134+
async fn test_parquet_writer() -> Result<()> {
133135
let temp_dir = TempDir::new().unwrap();
134136
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
135-
let location_gen =
137+
let loccation_gen =
136138
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
137139
let file_name_gen =
138140
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
139141

140-
// prepare data
141-
// Int, Struct(Int), String, List(Int), Struct(Struct(Int))
142-
let schema = {
143-
let fields = vec![
144-
arrow_schema::Field::new("col0", arrow_schema::DataType::Int64, true)
145-
.with_metadata(HashMap::from([(
146-
PARQUET_FIELD_ID_META_KEY.to_string(),
147-
"0".to_string(),
148-
)])),
149-
arrow_schema::Field::new(
150-
"col1",
151-
arrow_schema::DataType::Struct(
152-
vec![arrow_schema::Field::new(
153-
"sub_col",
154-
arrow_schema::DataType::Int64,
155-
true,
156-
)
157-
.with_metadata(HashMap::from([(
158-
PARQUET_FIELD_ID_META_KEY.to_string(),
159-
"5".to_string(),
160-
)]))]
161-
.into(),
162-
),
163-
true,
164-
)
165-
.with_metadata(HashMap::from([(
166-
PARQUET_FIELD_ID_META_KEY.to_string(),
167-
"1".to_string(),
168-
)])),
169-
arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, true).with_metadata(
170-
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
171-
),
172-
arrow_schema::Field::new(
173-
"col3",
174-
arrow_schema::DataType::List(Arc::new(
175-
arrow_schema::Field::new("item", arrow_schema::DataType::Int64, true)
176-
.with_metadata(HashMap::from([(
177-
PARQUET_FIELD_ID_META_KEY.to_string(),
178-
"6".to_string(),
179-
)])),
180-
)),
181-
true,
182-
)
183-
.with_metadata(HashMap::from([(
184-
PARQUET_FIELD_ID_META_KEY.to_string(),
185-
"3".to_string(),
186-
)])),
187-
arrow_schema::Field::new(
188-
"col4",
189-
arrow_schema::DataType::Struct(
190-
vec![arrow_schema::Field::new(
191-
"sub_col",
192-
arrow_schema::DataType::Struct(
193-
vec![arrow_schema::Field::new(
194-
"sub_sub_col",
195-
arrow_schema::DataType::Int64,
196-
true,
197-
)
198-
.with_metadata(HashMap::from([(
199-
PARQUET_FIELD_ID_META_KEY.to_string(),
200-
"7".to_string(),
201-
)]))]
202-
.into(),
203-
),
204-
true,
205-
)
206-
.with_metadata(HashMap::from([(
207-
PARQUET_FIELD_ID_META_KEY.to_string(),
208-
"8".to_string(),
209-
)]))]
210-
.into(),
211-
),
212-
true,
213-
)
214-
.with_metadata(HashMap::from([(
215-
PARQUET_FIELD_ID_META_KEY.to_string(),
216-
"4".to_string(),
217-
)])),
218-
];
219-
Arc::new(arrow_schema::Schema::new(fields))
220-
};
221-
let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
222-
let col1 = Arc::new(StructArray::new(
223-
vec![
224-
arrow_schema::Field::new("sub_col", arrow_schema::DataType::Int64, true)
225-
.with_metadata(HashMap::from([(
226-
PARQUET_FIELD_ID_META_KEY.to_string(),
227-
"5".to_string(),
228-
)])),
229-
]
230-
.into(),
231-
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
232-
None,
233-
));
234-
let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
235-
"test";
236-
1024
237-
])) as ArrayRef;
238-
let col3 = Arc::new({
239-
let list_parts = arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
240-
Some(
241-
vec![Some(1),]
242-
);
243-
1024
244-
])
245-
.into_parts();
246-
arrow_array::ListArray::new(
247-
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
248-
PARQUET_FIELD_ID_META_KEY.to_string(),
249-
"6".to_string(),
250-
)]))),
251-
list_parts.1,
252-
list_parts.2,
253-
list_parts.3,
254-
)
255-
}) as ArrayRef;
256-
let col4 = Arc::new(StructArray::new(
257-
vec![arrow_schema::Field::new(
258-
"sub_col",
259-
arrow_schema::DataType::Struct(
260-
vec![arrow_schema::Field::new(
261-
"sub_sub_col",
262-
arrow_schema::DataType::Int64,
263-
true,
264-
)
265-
.with_metadata(HashMap::from([(
266-
PARQUET_FIELD_ID_META_KEY.to_string(),
267-
"7".to_string(),
268-
)]))]
269-
.into(),
270-
),
271-
true,
272-
)
273-
.with_metadata(HashMap::from([(
274-
PARQUET_FIELD_ID_META_KEY.to_string(),
275-
"8".to_string(),
276-
)]))]
277-
.into(),
278-
vec![Arc::new(StructArray::new(
279-
vec![
280-
arrow_schema::Field::new("sub_sub_col", arrow_schema::DataType::Int64, true)
281-
.with_metadata(HashMap::from([(
282-
PARQUET_FIELD_ID_META_KEY.to_string(),
283-
"7".to_string(),
284-
)])),
285-
]
286-
.into(),
287-
vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
288-
None,
289-
))],
290-
None,
291-
));
292-
let to_write =
293-
RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, col4]).unwrap();
294-
295-
// prepare writer
296-
let pb = ParquetWriterBuilder::new(
142+
let pw = ParquetWriterBuilder::new(
297143
WriterProperties::builder().build(),
298-
to_write.schema(),
144+
Arc::new(Schema::builder().build().unwrap()),
299145
file_io.clone(),
300-
location_gen,
146+
loccation_gen,
301147
file_name_gen,
302148
);
303-
let mut data_file_writer = DataFileWriterBuilder::new(pb)
149+
let mut data_file_writer = DataFileWriterBuilder::new(pw)
304150
.build(DataFileWriterConfig::new(None))
305151
.await?;
306152

307-
// write
308-
data_file_writer.write(to_write.clone()).await?;
309-
let res = data_file_writer.close().await?;
310-
assert_eq!(res.len(), 1);
311-
let data_file = res.into_iter().next().unwrap();
312-
313-
// check
314-
check_parquet_data_file(&file_io, &data_file, &to_write).await;
153+
let data_file = data_file_writer.close().await.unwrap();
154+
assert_eq!(data_file.len(), 1);
155+
assert_eq!(data_file[0].file_format, DataFileFormat::Parquet);
156+
assert_eq!(data_file[0].content, DataContentType::Data);
157+
assert_eq!(data_file[0].partition, Struct::empty());
315158

316159
Ok(())
317160
}

0 commit comments

Comments
 (0)