Skip to content

Commit bd228fe

Browse files
authored
Partition Writer Support Part 1: add partition splitter (#1040)
## Which issue does this PR close? This PR is part 1 to close #342. ## What changes are included in this PR? The partition writer support will be separate into three PR: 1. add partition splitter which used to split the record batch based on partition value 2. add fanout partition writer which will compute the partition value of input and split them 3. add precompute partition writer which will use the partition value provided by user This PR is the first part. ## Are these changes tested?
1 parent 8bc44a7 commit bd228fe

File tree

2 files changed

+305
-0
lines changed

2 files changed

+305
-0
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ mod value;
3434

3535
pub use reader::*;
3636
pub use value::*;
37+
pub(crate) mod record_batch_partition_splitter;
Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::sync::Arc;
20+
21+
use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StructArray};
22+
use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef};
23+
use arrow_select::filter::filter_record_batch;
24+
use itertools::Itertools;
25+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
26+
27+
use super::arrow_struct_to_literal;
28+
use super::record_batch_projector::RecordBatchProjector;
29+
use crate::arrow::type_to_arrow_type;
30+
use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
31+
use crate::transform::{BoxedTransformFunction, create_transform_function};
32+
use crate::{Error, ErrorKind, Result};
33+
34+
/// The splitter used to split the record batch into multiple record batches by the partition spec.
35+
/// 1. It will project and transform the input record batch based on the partition spec, get the partitioned record batch.
36+
/// 2. Split the input record batch into multiple record batches based on the partitioned record batch.
37+
// # TODO
38+
// Remove this after partition writer supported.
39+
#[allow(dead_code)]
40+
pub struct RecordBatchPartitionSplitter {
41+
schema: SchemaRef,
42+
partition_spec: PartitionSpecRef,
43+
projector: RecordBatchProjector,
44+
transform_functions: Vec<BoxedTransformFunction>,
45+
46+
partition_type: StructType,
47+
partition_arrow_type: DataType,
48+
}
49+
50+
// # TODO
51+
// Remove this after partition writer supported.
52+
#[allow(dead_code)]
53+
impl RecordBatchPartitionSplitter {
54+
pub fn new(
55+
input_schema: ArrowSchemaRef,
56+
iceberg_schema: SchemaRef,
57+
partition_spec: PartitionSpecRef,
58+
) -> Result<Self> {
59+
let projector = RecordBatchProjector::new(
60+
input_schema,
61+
&partition_spec
62+
.fields()
63+
.iter()
64+
.map(|field| field.source_id)
65+
.collect::<Vec<_>>(),
66+
// The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct.
67+
// ref: https://iceberg.apache.org/spec/#partitioning
68+
|field| {
69+
if !field.data_type().is_primitive() {
70+
return Ok(None);
71+
}
72+
field
73+
.metadata()
74+
.get(PARQUET_FIELD_ID_META_KEY)
75+
.map(|s| {
76+
s.parse::<i64>()
77+
.map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))
78+
})
79+
.transpose()
80+
},
81+
|_| true,
82+
)?;
83+
let transform_functions = partition_spec
84+
.fields()
85+
.iter()
86+
.map(|field| create_transform_function(&field.transform))
87+
.collect::<Result<Vec<_>>>()?;
88+
89+
let partition_type = partition_spec.partition_type(&iceberg_schema)?;
90+
let partition_arrow_type = type_to_arrow_type(&Type::Struct(partition_type.clone()))?;
91+
92+
Ok(Self {
93+
schema: iceberg_schema,
94+
partition_spec,
95+
projector,
96+
transform_functions,
97+
partition_type,
98+
partition_arrow_type,
99+
})
100+
}
101+
102+
fn partition_columns_to_struct(&self, partition_columns: Vec<ArrayRef>) -> Result<Vec<Struct>> {
103+
let arrow_struct_array = {
104+
let partition_arrow_fields = {
105+
let DataType::Struct(fields) = &self.partition_arrow_type else {
106+
return Err(Error::new(
107+
ErrorKind::DataInvalid,
108+
"The partition arrow type is not a struct type",
109+
));
110+
};
111+
fields.clone()
112+
};
113+
Arc::new(StructArray::try_new(
114+
partition_arrow_fields,
115+
partition_columns,
116+
None,
117+
)?) as ArrayRef
118+
};
119+
let struct_array = {
120+
let struct_array = arrow_struct_to_literal(&arrow_struct_array, &self.partition_type)?;
121+
struct_array
122+
.into_iter()
123+
.map(|s| {
124+
if let Some(s) = s {
125+
if let Literal::Struct(s) = s {
126+
Ok(s)
127+
} else {
128+
Err(Error::new(
129+
ErrorKind::DataInvalid,
130+
"The struct is not a struct literal",
131+
))
132+
}
133+
} else {
134+
Err(Error::new(ErrorKind::DataInvalid, "The struct is null"))
135+
}
136+
})
137+
.collect::<Result<Vec<_>>>()?
138+
};
139+
140+
Ok(struct_array)
141+
}
142+
143+
/// Split the record batch into multiple record batches based on the partition spec.
144+
pub fn split(&self, batch: &RecordBatch) -> Result<Vec<(Struct, RecordBatch)>> {
145+
let source_columns = self.projector.project_column(batch.columns())?;
146+
let partition_columns = source_columns
147+
.into_iter()
148+
.zip_eq(self.transform_functions.iter())
149+
.map(|(source_column, transform_function)| transform_function.transform(source_column))
150+
.collect::<Result<Vec<_>>>()?;
151+
152+
let partition_structs = self.partition_columns_to_struct(partition_columns)?;
153+
154+
// Group the batch by row value.
155+
let mut group_ids = HashMap::new();
156+
partition_structs
157+
.iter()
158+
.enumerate()
159+
.for_each(|(row_id, row)| {
160+
group_ids.entry(row.clone()).or_insert(vec![]).push(row_id);
161+
});
162+
163+
// Partition the batch with same partition partition_values
164+
let mut partition_batches = Vec::with_capacity(group_ids.len());
165+
for (row, row_ids) in group_ids.into_iter() {
166+
// generate the bool filter array from column_ids
167+
let filter_array: BooleanArray = {
168+
let mut filter = vec![false; batch.num_rows()];
169+
row_ids.into_iter().for_each(|row_id| {
170+
filter[row_id] = true;
171+
});
172+
filter.into()
173+
};
174+
175+
// filter the RecordBatch
176+
partition_batches.push((row, filter_record_batch(batch, &filter_array)?));
177+
}
178+
179+
Ok(partition_batches)
180+
}
181+
}
182+
183+
#[cfg(test)]
184+
mod tests {
185+
use std::sync::Arc;
186+
187+
use arrow_array::{Int32Array, RecordBatch, StringArray};
188+
189+
use super::*;
190+
use crate::arrow::schema_to_arrow_schema;
191+
use crate::spec::{
192+
NestedField, PartitionSpecBuilder, PrimitiveLiteral, Schema, Transform,
193+
UnboundPartitionField,
194+
};
195+
196+
#[test]
197+
fn test_record_batch_partition_split() {
198+
let schema = Arc::new(
199+
Schema::builder()
200+
.with_fields(vec![
201+
NestedField::required(
202+
1,
203+
"id",
204+
Type::Primitive(crate::spec::PrimitiveType::Int),
205+
)
206+
.into(),
207+
NestedField::required(
208+
2,
209+
"name",
210+
Type::Primitive(crate::spec::PrimitiveType::String),
211+
)
212+
.into(),
213+
])
214+
.build()
215+
.unwrap(),
216+
);
217+
let partition_spec = Arc::new(
218+
PartitionSpecBuilder::new(schema.clone())
219+
.with_spec_id(1)
220+
.add_unbound_field(UnboundPartitionField {
221+
source_id: 1,
222+
field_id: None,
223+
name: "id_bucket".to_string(),
224+
transform: Transform::Identity,
225+
})
226+
.unwrap()
227+
.build()
228+
.unwrap(),
229+
);
230+
let input_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
231+
let partition_splitter =
232+
RecordBatchPartitionSplitter::new(input_schema.clone(), schema.clone(), partition_spec)
233+
.expect("Failed to create splitter");
234+
235+
let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]);
236+
let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]);
237+
let batch = RecordBatch::try_new(input_schema.clone(), vec![
238+
Arc::new(id_array),
239+
Arc::new(data_array),
240+
])
241+
.expect("Failed to create RecordBatch");
242+
243+
let mut partitioned_batches = partition_splitter
244+
.split(&batch)
245+
.expect("Failed to split RecordBatch");
246+
partitioned_batches.sort_by_key(|(row, _)| {
247+
if let PrimitiveLiteral::Int(i) = row.fields()[0]
248+
.as_ref()
249+
.unwrap()
250+
.as_primitive_literal()
251+
.unwrap()
252+
{
253+
i
254+
} else {
255+
panic!("The partition value is not a int");
256+
}
257+
});
258+
assert_eq!(partitioned_batches.len(), 3);
259+
{
260+
// check the first partition
261+
let expected_id_array = Int32Array::from(vec![1, 1, 1]);
262+
let expected_data_array = StringArray::from(vec!["a", "c", "g"]);
263+
let expected_batch = RecordBatch::try_new(input_schema.clone(), vec![
264+
Arc::new(expected_id_array),
265+
Arc::new(expected_data_array),
266+
])
267+
.expect("Failed to create expected RecordBatch");
268+
assert_eq!(partitioned_batches[0].1, expected_batch);
269+
}
270+
{
271+
// check the second partition
272+
let expected_id_array = Int32Array::from(vec![2, 2]);
273+
let expected_data_array = StringArray::from(vec!["b", "e"]);
274+
let expected_batch = RecordBatch::try_new(input_schema.clone(), vec![
275+
Arc::new(expected_id_array),
276+
Arc::new(expected_data_array),
277+
])
278+
.expect("Failed to create expected RecordBatch");
279+
assert_eq!(partitioned_batches[1].1, expected_batch);
280+
}
281+
{
282+
// check the third partition
283+
let expected_id_array = Int32Array::from(vec![3, 3]);
284+
let expected_data_array = StringArray::from(vec!["d", "f"]);
285+
let expected_batch = RecordBatch::try_new(input_schema.clone(), vec![
286+
Arc::new(expected_id_array),
287+
Arc::new(expected_data_array),
288+
])
289+
.expect("Failed to create expected RecordBatch");
290+
assert_eq!(partitioned_batches[2].1, expected_batch);
291+
}
292+
293+
let partition_values = partitioned_batches
294+
.iter()
295+
.map(|(row, _)| row.clone())
296+
.collect::<Vec<_>>();
297+
// check partition value is struct(1), struct(2), struct(3)
298+
assert_eq!(partition_values, vec![
299+
Struct::from_iter(vec![Some(Literal::int(1))]),
300+
Struct::from_iter(vec![Some(Literal::int(2))]),
301+
Struct::from_iter(vec![Some(Literal::int(3))]),
302+
]);
303+
}
304+
}

0 commit comments

Comments
 (0)