Skip to content

Supply a hint arrow schema for casting Parquet field types during scans #814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum, schema_to_arrow_schema};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
Expand All @@ -50,7 +50,7 @@ use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, PrimitiveType, Schema};
use crate::spec::{Datum, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -195,12 +195,28 @@ impl ArrowReader {

let should_load_page_index = row_selection_enabled && task.predicate.is_some();

let mut reader_options = ArrowReaderOptions::new().with_page_index(should_load_page_index);
if task.schema.as_struct().fields().iter().any(|field| {
matches!(
field.field_type.as_ref(),
Type::Primitive(PrimitiveType::Int)
)
}) {
Comment on lines +199 to +204
Copy link
Contributor Author

@gruuya gruuya Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be done only if the field with this type is also one of the projected ones.

// If there's a `PrimitiveType::Int` in the schema we should coerce the arrow reader to
// the canonical arrow schema for it (e.g. in this case with `DataType::Int32`), because
// Parquet fields with arrow type hints for `DataType::Int8` and `DataType::Int16` get
// converted to `PrimitiveType::Int`.
//
// Otherwise, we may end up with a discrepancy between the reported schema and the
// actual schema (the one inferred from the parquet files).
let arrow_schema = schema_to_arrow_schema(task.schema())?;
reader_options = reader_options.with_schema(arrow_schema.into());
}

// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
ArrowReaderOptions::new().with_page_index(should_load_page_index),
)
.await?;
let mut record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, reader_options)
.await?;

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
Expand Down
Loading