Skip to content

Commit 865f774

Browse files
authored
refine: seperate parquet reader and arrow convert (#313)
1 parent 3dcb3a9 commit 865f774

File tree

3 files changed

+213
-170
lines changed

3 files changed

+213
-170
lines changed

crates/iceberg/src/arrow/mod.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Conversion between Iceberg and Arrow schema
19+
20+
mod schema;
21+
pub use schema::*;
22+
mod reader;
23+
pub use reader::*;

crates/iceberg/src/arrow/reader.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Parquet file data reader
19+
20+
use arrow_schema::SchemaRef as ArrowSchemaRef;
21+
use async_stream::try_stream;
22+
use futures::stream::StreamExt;
23+
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
24+
use parquet::schema::types::SchemaDescriptor;
25+
use std::collections::HashMap;
26+
use std::str::FromStr;
27+
28+
use crate::arrow::arrow_schema_to_schema;
29+
use crate::io::FileIO;
30+
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
31+
use crate::spec::SchemaRef;
32+
use crate::{Error, ErrorKind};
33+
34+
/// Builder to create ArrowReader
35+
pub struct ArrowReaderBuilder {
36+
batch_size: Option<usize>,
37+
field_ids: Vec<usize>,
38+
file_io: FileIO,
39+
schema: SchemaRef,
40+
}
41+
42+
impl ArrowReaderBuilder {
43+
/// Create a new ArrowReaderBuilder
44+
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
45+
ArrowReaderBuilder {
46+
batch_size: None,
47+
field_ids: vec![],
48+
file_io,
49+
schema,
50+
}
51+
}
52+
53+
/// Sets the desired size of batches in the response
54+
/// to something other than the default
55+
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
56+
self.batch_size = Some(batch_size);
57+
self
58+
}
59+
60+
/// Sets the desired column projection with a list of field ids.
61+
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item = usize>) -> Self {
62+
self.field_ids = field_ids.into_iter().collect();
63+
self
64+
}
65+
66+
/// Build the ArrowReader.
67+
pub fn build(self) -> ArrowReader {
68+
ArrowReader {
69+
batch_size: self.batch_size,
70+
field_ids: self.field_ids,
71+
schema: self.schema,
72+
file_io: self.file_io,
73+
}
74+
}
75+
}
76+
77+
/// Reads data from Parquet files
78+
pub struct ArrowReader {
79+
batch_size: Option<usize>,
80+
field_ids: Vec<usize>,
81+
#[allow(dead_code)]
82+
schema: SchemaRef,
83+
file_io: FileIO,
84+
}
85+
86+
impl ArrowReader {
87+
/// Take a stream of FileScanTasks and reads all the files.
88+
/// Returns a stream of Arrow RecordBatches containing the data from the files
89+
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
90+
let file_io = self.file_io.clone();
91+
92+
Ok(try_stream! {
93+
while let Some(Ok(task)) = tasks.next().await {
94+
let parquet_reader = file_io
95+
.new_input(task.data().data_file().file_path())?
96+
.reader()
97+
.await?;
98+
99+
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
100+
.await?;
101+
102+
let parquet_schema = batch_stream_builder.parquet_schema();
103+
let arrow_schema = batch_stream_builder.schema();
104+
let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
105+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
106+
107+
if let Some(batch_size) = self.batch_size {
108+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
109+
}
110+
111+
let mut batch_stream = batch_stream_builder.build()?;
112+
113+
while let Some(batch) = batch_stream.next().await {
114+
yield batch?;
115+
}
116+
}
117+
}
118+
.boxed())
119+
}
120+
121+
fn get_arrow_projection_mask(
122+
&self,
123+
parquet_schema: &SchemaDescriptor,
124+
arrow_schema: &ArrowSchemaRef,
125+
) -> crate::Result<ProjectionMask> {
126+
if self.field_ids.is_empty() {
127+
Ok(ProjectionMask::all())
128+
} else {
129+
// Build the map between field id and column index in Parquet schema.
130+
let mut column_map = HashMap::new();
131+
132+
let fields = arrow_schema.fields();
133+
let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
134+
fields.filter_leaves(|idx, field| {
135+
let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
136+
if field_id.is_none() {
137+
return false;
138+
}
139+
140+
let field_id = i32::from_str(field_id.unwrap());
141+
if field_id.is_err() {
142+
return false;
143+
}
144+
let field_id = field_id.unwrap();
145+
146+
if !self.field_ids.contains(&(field_id as usize)) {
147+
return false;
148+
}
149+
150+
let iceberg_field = self.schema.field_by_id(field_id);
151+
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
152+
153+
if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
154+
return false;
155+
}
156+
157+
if iceberg_field.unwrap().field_type != parquet_iceberg_field.unwrap().field_type {
158+
return false;
159+
}
160+
161+
column_map.insert(field_id, idx);
162+
true
163+
});
164+
165+
if column_map.len() != self.field_ids.len() {
166+
return Err(Error::new(
167+
ErrorKind::DataInvalid,
168+
format!(
169+
"Parquet schema {} and Iceberg schema {} do not match.",
170+
iceberg_schema, self.schema
171+
),
172+
));
173+
}
174+
175+
let mut indices = vec![];
176+
for field_id in &self.field_ids {
177+
if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
178+
indices.push(*col_idx);
179+
} else {
180+
return Err(Error::new(
181+
ErrorKind::DataInvalid,
182+
format!("Field {} is not found in Parquet schema.", field_id),
183+
));
184+
}
185+
}
186+
Ok(ProjectionMask::leaves(parquet_schema, indices))
187+
}
188+
}
189+
}

crates/iceberg/src/arrow.rs renamed to crates/iceberg/src/arrow/schema.rs

Lines changed: 1 addition & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Parquet file data reader
19-
20-
use arrow_schema::SchemaRef as ArrowSchemaRef;
21-
use async_stream::try_stream;
22-
use futures::stream::StreamExt;
23-
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
24-
use parquet::schema::types::SchemaDescriptor;
25-
use std::collections::HashMap;
26-
use std::str::FromStr;
27-
28-
use crate::io::FileIO;
29-
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
30-
use crate::spec::SchemaRef;
18+
//! Conversion between Arrow schema and Iceberg schema.
3119
3220
use crate::error::Result;
3321
use crate::spec::{
@@ -37,163 +25,6 @@ use crate::{Error, ErrorKind};
3725
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
3826
use std::sync::Arc;
3927

40-
/// Builder to create ArrowReader
41-
pub struct ArrowReaderBuilder {
42-
batch_size: Option<usize>,
43-
field_ids: Vec<usize>,
44-
file_io: FileIO,
45-
schema: SchemaRef,
46-
}
47-
48-
impl ArrowReaderBuilder {
49-
/// Create a new ArrowReaderBuilder
50-
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
51-
ArrowReaderBuilder {
52-
batch_size: None,
53-
field_ids: vec![],
54-
file_io,
55-
schema,
56-
}
57-
}
58-
59-
/// Sets the desired size of batches in the response
60-
/// to something other than the default
61-
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
62-
self.batch_size = Some(batch_size);
63-
self
64-
}
65-
66-
/// Sets the desired column projection with a list of field ids.
67-
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item = usize>) -> Self {
68-
self.field_ids = field_ids.into_iter().collect();
69-
self
70-
}
71-
72-
/// Build the ArrowReader.
73-
pub fn build(self) -> ArrowReader {
74-
ArrowReader {
75-
batch_size: self.batch_size,
76-
field_ids: self.field_ids,
77-
schema: self.schema,
78-
file_io: self.file_io,
79-
}
80-
}
81-
}
82-
83-
/// Reads data from Parquet files
84-
pub struct ArrowReader {
85-
batch_size: Option<usize>,
86-
field_ids: Vec<usize>,
87-
#[allow(dead_code)]
88-
schema: SchemaRef,
89-
file_io: FileIO,
90-
}
91-
92-
impl ArrowReader {
93-
/// Take a stream of FileScanTasks and reads all the files.
94-
/// Returns a stream of Arrow RecordBatches containing the data from the files
95-
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
96-
let file_io = self.file_io.clone();
97-
98-
Ok(try_stream! {
99-
while let Some(Ok(task)) = tasks.next().await {
100-
let parquet_reader = file_io
101-
.new_input(task.data().data_file().file_path())?
102-
.reader()
103-
.await?;
104-
105-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
106-
.await?;
107-
108-
let parquet_schema = batch_stream_builder.parquet_schema();
109-
let arrow_schema = batch_stream_builder.schema();
110-
let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
111-
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
112-
113-
if let Some(batch_size) = self.batch_size {
114-
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
115-
}
116-
117-
let mut batch_stream = batch_stream_builder.build()?;
118-
119-
while let Some(batch) = batch_stream.next().await {
120-
yield batch?;
121-
}
122-
}
123-
}
124-
.boxed())
125-
}
126-
127-
fn get_arrow_projection_mask(
128-
&self,
129-
parquet_schema: &SchemaDescriptor,
130-
arrow_schema: &ArrowSchemaRef,
131-
) -> crate::Result<ProjectionMask> {
132-
if self.field_ids.is_empty() {
133-
Ok(ProjectionMask::all())
134-
} else {
135-
// Build the map between field id and column index in Parquet schema.
136-
let mut column_map = HashMap::new();
137-
138-
let fields = arrow_schema.fields();
139-
let iceberg_schema = arrow_schema_to_schema(arrow_schema)?;
140-
fields.filter_leaves(|idx, field| {
141-
let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY);
142-
if field_id.is_none() {
143-
return false;
144-
}
145-
146-
let field_id = i32::from_str(field_id.unwrap());
147-
if field_id.is_err() {
148-
return false;
149-
}
150-
let field_id = field_id.unwrap();
151-
152-
if !self.field_ids.contains(&(field_id as usize)) {
153-
return false;
154-
}
155-
156-
let iceberg_field = self.schema.field_by_id(field_id);
157-
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
158-
159-
if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
160-
return false;
161-
}
162-
163-
if iceberg_field.unwrap().field_type != parquet_iceberg_field.unwrap().field_type {
164-
return false;
165-
}
166-
167-
column_map.insert(field_id, idx);
168-
true
169-
});
170-
171-
if column_map.len() != self.field_ids.len() {
172-
return Err(Error::new(
173-
ErrorKind::DataInvalid,
174-
format!(
175-
"Parquet schema {} and Iceberg schema {} do not match.",
176-
iceberg_schema, self.schema
177-
),
178-
));
179-
}
180-
181-
let mut indices = vec![];
182-
for field_id in &self.field_ids {
183-
if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
184-
indices.push(*col_idx);
185-
} else {
186-
return Err(Error::new(
187-
ErrorKind::DataInvalid,
188-
format!("Field {} is not found in Parquet schema.", field_id),
189-
));
190-
}
191-
}
192-
Ok(ProjectionMask::leaves(parquet_schema, indices))
193-
}
194-
}
195-
}
196-
19728
/// A post order arrow schema visitor.
19829
///
19930
/// For order of methods called, please refer to [`visit_schema`].

0 commit comments

Comments
 (0)