From ef90d05faab2d36cbc990f4705fa99ea9df81ff7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 11 Aug 2025 13:33:16 -0400 Subject: [PATCH 1/4] Consolidate Parquet Metadata handling --- datafusion/common/src/encryption.rs | 2 + .../src/datasource/file_format/parquet.rs | 226 +++------- .../core/tests/parquet/custom_reader.rs | 26 +- .../datasource-parquet/src/file_format.rs | 394 +++------------- datafusion/datasource-parquet/src/metadata.rs | 422 ++++++++++++++++++ datafusion/datasource-parquet/src/mod.rs | 1 + datafusion/datasource-parquet/src/reader.rs | 57 +-- 7 files changed, 577 insertions(+), 551 deletions(-) create mode 100644 datafusion/datasource-parquet/src/metadata.rs diff --git a/datafusion/common/src/encryption.rs b/datafusion/common/src/encryption.rs index 5dd603a08112..b764ad77cff1 100644 --- a/datafusion/common/src/encryption.rs +++ b/datafusion/common/src/encryption.rs @@ -24,8 +24,10 @@ pub use parquet::encryption::decrypt::FileDecryptionProperties; pub use parquet::encryption::encrypt::FileEncryptionProperties; #[cfg(not(feature = "parquet_encryption"))] +#[derive(Default, Debug)] pub struct FileDecryptionProperties; #[cfg(not(feature = "parquet_encryption"))] +#[derive(Default, Debug)] pub struct FileEncryptionProperties; pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties}; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b7d66e4f2789..db704bee8801 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -133,8 +133,7 @@ mod tests { use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::{ListingTableUrl, PartitionedFile}; use datafusion_datasource_parquet::{ - fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc, - ObjectStoreFetch, ParquetFormat, ParquetFormatFactory, ParquetSink, + ParquetFormat, ParquetFormatFactory, ParquetSink, }; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; @@ -143,6 +142,7 @@ mod tests { use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{collect, ExecutionPlan}; + use crate::test_util::bounded_stream; use arrow::array::{ types::Int32Type, Array, ArrayRef, DictionaryArray, Int32Array, Int64Array, StringArray, @@ -150,6 +150,7 @@ mod tests { use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use datafusion_datasource::file_groups::FileGroup; + use datafusion_datasource_parquet::metadata::DFParquetMetadata; use futures::stream::BoxStream; use futures::StreamExt; use insta::assert_snapshot; @@ -167,8 +168,6 @@ mod tests { use parquet::format::FileMetaData; use tokio::fs::File; - use crate::test_util::bounded_stream; - enum ForceViews { Yes, No, @@ -195,15 +194,12 @@ mod tests { let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&ctx, &store, &meta).await?; - let stats = fetch_statistics( - store.as_ref(), - schema.clone(), - &meta[0], - None, - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let file_metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); + let stats = DFParquetMetadata::new(&store, &meta[0]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -211,15 +207,11 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!(c2_stats.null_count, Precision::Exact(3)); - let stats = fetch_statistics( - store.as_ref(), - schema, - &meta[1], - None, - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let stats = DFParquetMetadata::new(&store, &meta[1]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; + assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; let c2_stats = &stats.column_statistics[1]; @@ -392,51 +384,27 @@ mod tests { // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let file_metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); + let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0]) + .with_metadata_size_hint(Some(9)); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + // Increases by 3 because cache has no entries yet - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 5); // No increase because cache has an entry - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 5); // Increase by 2 because `get_file_metadata_cache()` is None - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let df_meta = df_meta.with_file_metadata_cache(None); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 7); let force_views = match force_views { @@ -454,15 +422,9 @@ mod tests { assert_eq!(store.request_count(), 10); // No increase, cache being used - let stats = fetch_statistics( - store.upcast().as_ref(), - schema.clone(), - &meta[0], - Some(9), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let stats = df_meta.fetch_statistics(&schema).await?; assert_eq!(store.request_count(), 10); assert_eq!(stats.num_rows, Precision::Exact(3)); @@ -477,55 +439,30 @@ mod tests { // Use the file size as the hint so we can get the full metadata from the first fetch let size_hint = meta[0].size as usize; + let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0]) + .with_metadata_size_hint(Some(size_hint)); - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); let session = SessionContext::new(); let ctx = session.state(); + let file_metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); // Increases by 1 because cache has no entries yet and new session context - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); // No increase because cache has an entry - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); // Increase by 1 because `get_file_metadata_cache` is None - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let df_meta = df_meta.with_file_metadata_cache(None); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 3); let format = ParquetFormat::default() @@ -538,15 +475,9 @@ mod tests { let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; assert_eq!(store.request_count(), 4); // No increase, cache being used - let stats = fetch_statistics( - store.upcast().as_ref(), - schema.clone(), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let stats = df_meta.fetch_statistics(&schema).await?; assert_eq!(store.request_count(), 4); assert_eq!(stats.num_rows, Precision::Exact(3)); @@ -559,29 +490,18 @@ mod tests { LocalFileSystem::new(), ))); - // Use the a size hint larger than the file size to make sure we don't panic + // Use a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0]) + .with_metadata_size_hint(Some(size_hint)); + + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 1); // No increase because cache has an entry - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 1); Ok(()) @@ -622,16 +542,12 @@ mod tests { assert_eq!(store.request_count(), 3); // No increase in request count because cache is not empty - let pq_meta = fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref(), &files[0]), - &files[0], - None, - None, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; - assert_eq!(store.request_count(), 3); - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let stats = DFParquetMetadata::new(store.as_ref(), &files[0]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(stats.num_rows, Precision::Exact(4)); // column c_dic @@ -691,16 +607,13 @@ mod tests { }; // No increase in request count because cache is not empty - let pq_meta = fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref(), &files[0]), - &files[0], - None, - None, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let stats = DFParquetMetadata::new(store.as_ref(), &files[0]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(store.request_count(), 6); - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 let c1_stats = &stats.column_statistics[0]; @@ -725,16 +638,11 @@ mod tests { assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); // No increase in request count because cache is not empty - let pq_meta = fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref(), &files[1]), - &files[1], - None, - None, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let stats = DFParquetMetadata::new(store.as_ref(), &files[1]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(store.request_count(), 6); - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null let c1_stats = &stats.column_statistics[0]; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index f7e48fa9cb91..f7e491146792 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -23,7 +23,6 @@ use std::time::SystemTime; use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; -use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ @@ -38,7 +37,7 @@ use datafusion_common::Result; use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; -use datafusion_datasource_parquet::ObjectStoreFetch; +use datafusion_datasource_parquet::metadata::DFParquetMetadata; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -238,20 +237,15 @@ impl AsyncFileReader for ParquetFileReader { _options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { Box::pin(async move { - let fetch = ObjectStoreFetch::new(self.store.as_ref(), &self.meta); - let metadata = fetch_parquet_metadata( - fetch, - &self.meta, - self.metadata_size_hint, - None, - None, - ) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_metadata error: {e}" - )) - })?; + let metadata = DFParquetMetadata::new(self.store.as_ref(), &self.meta) + .with_metadata_size_hint(self.metadata_size_hint) + .fetch_metadata() + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {e}" + )) + })?; Ok(metadata) }) } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index ab4d84ee368e..52a8d3d26567 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -36,17 +36,15 @@ use datafusion_datasource::write::{ use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; -use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; #[cfg(feature = "parquet_encryption")] use datafusion_common::encryption::map_config_decryption_to_decryption; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::parsers::CompressionTypeVariant; -use datafusion_common::stats::Precision; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics, - DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION, + internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt, + HashSet, Result, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -57,13 +55,11 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; -use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use crate::reader::{CachedParquetFileReaderFactory, CachedParquetMetaData}; +use crate::reader::CachedParquetFileReaderFactory; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -71,22 +67,21 @@ use datafusion_datasource::source::DataSourceExec; use datafusion_execution::runtime_env::RuntimeEnv; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; -use log::debug; use object_store::buffered::BufWriter; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowWriterOptions, }; use parquet::arrow::async_reader::MetadataFetch; -use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; +use parquet::arrow::{ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; +use crate::metadata::DFParquetMetadata; use datafusion_execution::cache::cache_manager::FileMetadataCache; use parquet::errors::ParquetError; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; @@ -306,30 +301,6 @@ fn clear_metadata( }) } -async fn fetch_schema_with_location( - state: &dyn Session, - store: &dyn ObjectStore, - options: &TableParquetOptions, - file: &ObjectMeta, - metadata_size_hint: Option, - coerce_int96: Option, - file_metadata_cache: Option>, -) -> Result<(Path, Schema)> { - let file_decryption_properties = - get_file_decryption_properties(state, options, &file.location)?; - let loc_path = file.location.clone(); - let schema = fetch_schema( - store, - file, - metadata_size_hint, - file_decryption_properties.as_ref(), - coerce_int96, - file_metadata_cache, - ) - .await?; - Ok((loc_path, schema)) -} - #[cfg(feature = "parquet_encryption")] fn get_file_decryption_properties( state: &dyn Session, @@ -399,19 +370,27 @@ impl FileFormat for ParquetFormat { None => None, }; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let mut schemas: Vec<_> = futures::stream::iter(objects) - .map(|object| { - fetch_schema_with_location( + .map(|object| async { + let file_decryption_properties = get_file_decryption_properties( state, - store.as_ref(), &self.options, - object, - self.metadata_size_hint(), - coerce_int96, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) + &object.location, + )?; + let result = DFParquetMetadata::new(store.as_ref(), object) + .with_metadata_size_hint(self.metadata_size_hint()) + .with_decryption_properties(file_decryption_properties.as_ref()) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_coerce_int96(coerce_int96) + .fetch_schema_with_location() + .await?; + Ok::<_, DataFusionError>(result) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + // fetch schemas concurrently, if requested .buffered(state.config_options().execution.meta_fetch_concurrency) .try_collect() .await?; @@ -459,16 +438,14 @@ impl FileFormat for ParquetFormat { ) -> Result { let file_decryption_properties = get_file_decryption_properties(state, &self.options, &object.location)?; - let stats = fetch_statistics( - store.as_ref(), - table_schema, - object, - self.metadata_size_hint(), - file_decryption_properties.as_ref(), - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; - Ok(stats) + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + DFParquetMetadata::new(store, object) + .with_metadata_size_hint(self.metadata_size_hint()) + .with_decryption_properties(file_decryption_properties.as_ref()) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_statistics(&table_schema) + .await } async fn create_physical_plan( @@ -1038,98 +1015,32 @@ impl MetadataFetch for ObjectStoreFetch<'_> { /// through [`ParquetFileReaderFactory`]. /// /// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory -pub async fn fetch_parquet_metadata( - fetch: F, +#[deprecated( + since = "50.0.0", + note = "Use `DFParquetMetadata::fetch_metadata` instead" +)] +pub async fn fetch_parquet_metadata( + store: &dyn ObjectStore, object_meta: &ObjectMeta, size_hint: Option, #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result> { - let cache_metadata = - !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); - - if cache_metadata { - if let Some(parquet_metadata) = file_metadata_cache - .as_ref() - .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) - .and_then(|file_metadata| { - file_metadata - .as_any() - .downcast_ref::() - .map(|cached_parquet_metadata| { - Arc::clone(cached_parquet_metadata.parquet_metadata()) - }) - }) - { - return Ok(parquet_metadata); - } - } - - let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); - - #[cfg(feature = "parquet_encryption")] - if let Some(decryption_properties) = decryption_properties { - reader = reader.with_decryption_properties(Some(decryption_properties)); - } - - if cache_metadata && file_metadata_cache.is_some() { - // Need to retrieve the entire metadata for the caching to be effective. - reader = reader.with_page_indexes(true); - } - - let metadata = Arc::new( - reader - .load_and_finish(fetch, object_meta.size) - .await - .map_err(DataFusionError::from)?, - ); - - if cache_metadata { - if let Some(file_metadata_cache) = file_metadata_cache { - file_metadata_cache.put( - object_meta, - Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), - ); - } - } - - Ok(metadata) -} - -/// Read and parse the schema of the Parquet file at location `path` -async fn fetch_schema( - store: &dyn ObjectStore, - file: &ObjectMeta, - metadata_size_hint: Option, - file_decryption_properties: Option<&FileDecryptionProperties>, - coerce_int96: Option, - file_metadata_cache: Option>, -) -> Result { - let fetch = ObjectStoreFetch::new(store, file); - let metadata = fetch_parquet_metadata( - fetch, - file, - metadata_size_hint, - file_decryption_properties, - file_metadata_cache, - ) - .await?; - let file_metadata = metadata.file_metadata(); - let schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - )?; - let schema = coerce_int96 - .and_then(|time_unit| { - coerce_int96_to_resolution(file_metadata.schema_descr(), &schema, &time_unit) - }) - .unwrap_or(schema); - Ok(schema) + DFParquetMetadata::new(store, object_meta) + .with_metadata_size_hint(size_hint) + .with_decryption_properties(decryption_properties) + .with_file_metadata_cache(file_metadata_cache) + .fetch_metadata() + .await } /// Read and parse the statistics of the Parquet file at location `path` /// /// See [`statistics_from_parquet_meta_calc`] for more details +#[deprecated( + since = "50.0.0", + note = "Use `DFParquetMetadata::fetch_statistics` instead" +)] pub async fn fetch_statistics( store: &dyn ObjectStore, table_schema: SchemaRef, @@ -1138,181 +1049,23 @@ pub async fn fetch_statistics( decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result { - let fetch = ObjectStoreFetch::new(store, file); - let metadata = fetch_parquet_metadata( - fetch, - file, - metadata_size_hint, - decryption_properties, - file_metadata_cache, - ) - .await?; - statistics_from_parquet_meta_calc(&metadata, table_schema) + DFParquetMetadata::new(store, file) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(decryption_properties) + .with_file_metadata_cache(file_metadata_cache) + .fetch_statistics(&table_schema) + .await } -/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`] -/// -/// The statistics are calculated for each column in the table schema -/// using the row group statistics in the parquet metadata. -/// -/// # Key behaviors: -/// -/// 1. Extracts row counts and byte sizes from all row groups -/// 2. Applies schema type coercions to align file schema with table schema -/// 3. Collects and aggregates statistics across row groups when available -/// -/// # When there are no statistics: -/// -/// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with: -/// - Exact row count -/// - Exact byte size -/// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema) -/// # When only some columns have statistics: -/// -/// For columns with statistics: -/// - Min/max values are properly extracted and represented as Precision::Exact -/// - Null counts are calculated by summing across row groups -/// -/// For columns without statistics, -/// - For min/max, there are two situations: -/// 1. The column isn't in arrow schema, then min/max values are set to Precision::Absent -/// 2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null) -/// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null) +#[deprecated( + since = "50.0.0", + note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead" +)] pub fn statistics_from_parquet_meta_calc( metadata: &ParquetMetaData, table_schema: SchemaRef, ) -> Result { - let row_groups_metadata = metadata.row_groups(); - - let mut statistics = Statistics::new_unknown(&table_schema); - let mut has_statistics = false; - let mut num_rows = 0_usize; - let mut total_byte_size = 0_usize; - for row_group_meta in row_groups_metadata { - num_rows += row_group_meta.num_rows() as usize; - total_byte_size += row_group_meta.total_byte_size() as usize; - - if !has_statistics { - has_statistics = row_group_meta - .columns() - .iter() - .any(|column| column.statistics().is_some()); - } - } - statistics.num_rows = Precision::Exact(num_rows); - statistics.total_byte_size = Precision::Exact(total_byte_size); - - let file_metadata = metadata.file_metadata(); - let mut file_schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - )?; - - if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &file_schema) { - file_schema = merged; - } - - statistics.column_statistics = if has_statistics { - let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); - let mut null_counts_array = - vec![Precision::Exact(0); table_schema.fields().len()]; - - table_schema - .fields() - .iter() - .enumerate() - .for_each(|(idx, field)| { - match StatisticsConverter::try_new( - field.name(), - &file_schema, - file_metadata.schema_descr(), - ) { - Ok(stats_converter) => { - summarize_min_max_null_counts( - &mut min_accs, - &mut max_accs, - &mut null_counts_array, - idx, - num_rows, - &stats_converter, - row_groups_metadata, - ) - .ok(); - } - Err(e) => { - debug!("Failed to create statistics converter: {e}"); - null_counts_array[idx] = Precision::Exact(num_rows); - } - } - }); - - get_col_stats( - &table_schema, - null_counts_array, - &mut max_accs, - &mut min_accs, - ) - } else { - Statistics::unknown_column(&table_schema) - }; - - Ok(statistics) -} - -fn get_col_stats( - schema: &Schema, - null_counts: Vec>, - max_values: &mut [Option], - min_values: &mut [Option], -) -> Vec { - (0..schema.fields().len()) - .map(|i| { - let max_value = match max_values.get_mut(i).unwrap() { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match min_values.get_mut(i).unwrap() { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: null_counts[i], - max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), - min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - } - }) - .collect() -} - -fn summarize_min_max_null_counts( - min_accs: &mut [Option], - max_accs: &mut [Option], - null_counts_array: &mut [Precision], - arrow_schema_index: usize, - num_rows: usize, - stats_converter: &StatisticsConverter, - row_groups_metadata: &[RowGroupMetaData], -) -> Result<()> { - let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; - let min_values = stats_converter.row_group_mins(row_groups_metadata)?; - let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; - - if let Some(max_acc) = &mut max_accs[arrow_schema_index] { - max_acc.update_batch(&[max_values])?; - } - - if let Some(min_acc) = &mut min_accs[arrow_schema_index] { - min_acc.update_batch(&[min_values])?; - } - - null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { - Some(null_count) => null_count as usize, - None => num_rows, - }); - - Ok(()) + DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema) } /// Implements [`DataSink`] for writing to a parquet file. @@ -1935,40 +1688,9 @@ async fn output_single_parquet_file_parallelized( Ok(file_metadata) } -/// Min/max aggregation can take Dictionary encode input but always produces unpacked -/// (aka non Dictionary) output. We need to adjust the output data type to reflect this. -/// The reason min/max aggregate produces unpacked output because there is only one -/// min/max value per group; there is no needs to keep them Dictionary encode -fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { - if let DataType::Dictionary(_, value_type) = input_type { - value_type.as_ref() - } else { - input_type - } -} - -fn create_max_min_accs( - schema: &Schema, -) -> (Vec>, Vec>) { - let max_values: Vec> = schema - .fields() - .iter() - .map(|field| { - MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() - }) - .collect(); - let min_values: Vec> = schema - .fields() - .iter() - .map(|field| { - MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() - }) - .collect(); - (max_values, min_values) -} - #[cfg(test)] mod tests { + use parquet::arrow::parquet_to_arrow_schema; use std::sync::Arc; use super::*; diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs new file mode 100644 index 000000000000..7a7392d52e78 --- /dev/null +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics +//! and schema information. + +use crate::{ + apply_file_schema_type_coercions, coerce_int96_to_resolution, ObjectStoreFetch, +}; +use arrow::compute::sum; +use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; +use datafusion_common::encryption::FileDecryptionProperties; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, DataFusionError, Result, Statistics}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_plan::Accumulator; +use log::debug; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use std::any::Any; +use std::sync::Arc; + +/// Handles fetching Parquet file schema, metadata and statistics +/// from object store. +/// +/// This component is exposed for low level integrations through +/// [`ParquetFileReaderFactory`]. +/// +/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory +#[derive(Debug)] +pub struct DFParquetMetadata<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + metadata_size_hint: Option, + decryption_properties: Option<&'a FileDecryptionProperties>, + file_metadata_cache: Option>, + /// timeunit to coerce INT96 timestamps to + pub coerce_int96: Option, +} + +impl<'a> DFParquetMetadata<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + metadata_size_hint: None, + decryption_properties: None, + file_metadata_cache: None, + coerce_int96: None, + } + } + + /// set metadata size hint + pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option) -> Self { + self.metadata_size_hint = metadata_size_hint; + self + } + + /// set decryption properties + pub fn with_decryption_properties( + mut self, + decryption_properties: Option<&'a FileDecryptionProperties>, + ) -> Self { + self.decryption_properties = decryption_properties; + self + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// Set timeunit to coerce INT96 timestamps to + pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { + self.coerce_int96 = time_unit; + self + } + + /// fetch parquet metadata + pub async fn fetch_metadata(&self) -> Result> { + let Self { + store, + object_meta, + metadata_size_hint, + decryption_properties, + file_metadata_cache, + coerce_int96: _, + } = self; + + let fetch = ObjectStoreFetch::new(*store, object_meta); + + // implementation to fetch parquet metadata + let cache_metadata = + !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); + + if cache_metadata { + if let Some(parquet_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::() + .map(|cached_parquet_metadata| { + Arc::clone(cached_parquet_metadata.parquet_metadata()) + }) + }) + { + return Ok(parquet_metadata); + } + } + + let mut reader = + ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint); + + #[cfg(feature = "parquet_encryption")] + if let Some(decryption_properties) = decryption_properties { + reader = reader.with_decryption_properties(Some(decryption_properties)); + } + + if cache_metadata && file_metadata_cache.is_some() { + // Need to retrieve the entire metadata for the caching to be effective. + reader = reader.with_page_indexes(true); + } + + let metadata = Arc::new( + reader + .load_and_finish(fetch, object_meta.size) + .await + .map_err(DataFusionError::from)?, + ); + + if cache_metadata { + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put( + object_meta, + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + ); + } + } + + Ok(metadata) + } + + /// Read and parse the schema of the Parquet file + pub async fn fetch_schema(&self) -> Result { + let metadata = self.fetch_metadata().await?; + + let file_metadata = metadata.file_metadata(); + let schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + let schema = self + .coerce_int96 + .as_ref() + .and_then(|time_unit| { + coerce_int96_to_resolution( + file_metadata.schema_descr(), + &schema, + time_unit, + ) + }) + .unwrap_or(schema); + Ok(schema) + } + + /// Return (path, schema) tuple by fetching the schema from Parquet file + pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> { + let loc_path = self.object_meta.location.clone(); + let schema = self.fetch_schema().await?; + Ok((loc_path, schema)) + } + + pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result { + let metadata = self.fetch_metadata().await?; + Self::statistics_from_parquet_metadata(&metadata, table_schema) + } + + /// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`] + /// + /// The statistics are calculated for each column in the table schema + /// using the row group statistics in the parquet metadata. + /// + /// # Key behaviors: + /// + /// 1. Extracts row counts and byte sizes from all row groups + /// 2. Applies schema type coercions to align file schema with table schema + /// 3. Collects and aggregates statistics across row groups when available + /// + /// # When there are no statistics: + /// + /// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with: + /// - Exact row count + /// - Exact byte size + /// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema) + /// # When only some columns have statistics: + /// + /// For columns with statistics: + /// - Min/max values are properly extracted and represented as Precision::Exact + /// - Null counts are calculated by summing across row groups + /// + /// For columns without statistics, + /// - For min/max, there are two situations: + /// 1. The column isn't in arrow schema, then min/max values are set to Precision::Absent + /// 2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null) + /// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null) + pub fn statistics_from_parquet_metadata( + metadata: &ParquetMetaData, + table_schema: &SchemaRef, + ) -> Result { + let row_groups_metadata = metadata.row_groups(); + + let mut statistics = Statistics::new_unknown(table_schema); + let mut has_statistics = false; + let mut num_rows = 0_usize; + let mut total_byte_size = 0_usize; + for row_group_meta in row_groups_metadata { + num_rows += row_group_meta.num_rows() as usize; + total_byte_size += row_group_meta.total_byte_size() as usize; + + if !has_statistics { + has_statistics = row_group_meta + .columns() + .iter() + .any(|column| column.statistics().is_some()); + } + } + statistics.num_rows = Precision::Exact(num_rows); + statistics.total_byte_size = Precision::Exact(total_byte_size); + + let file_metadata = metadata.file_metadata(); + let mut file_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + + if let Some(merged) = apply_file_schema_type_coercions(table_schema, &file_schema) + { + file_schema = merged; + } + + statistics.column_statistics = if has_statistics { + let (mut max_accs, mut min_accs) = create_max_min_accs(table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; + + table_schema + .fields() + .iter() + .enumerate() + .for_each(|(idx, field)| { + match StatisticsConverter::try_new( + field.name(), + &file_schema, + file_metadata.schema_descr(), + ) { + Ok(stats_converter) => { + summarize_min_max_null_counts( + &mut min_accs, + &mut max_accs, + &mut null_counts_array, + idx, + num_rows, + &stats_converter, + row_groups_metadata, + ) + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {e}"); + null_counts_array[idx] = Precision::Exact(num_rows); + } + } + }); + + get_col_stats( + table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + ) + } else { + Statistics::unknown_column(table_schema) + }; + + Ok(statistics) + } +} + +/// Min/max aggregation can take Dictionary encode input but always produces unpacked +/// (aka non Dictionary) output. We need to adjust the output data type to reflect this. +/// The reason min/max aggregate produces unpacked output because there is only one +/// min/max value per group; there is no needs to keep them Dictionary encode +fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { + if let DataType::Dictionary(_, value_type) = input_type { + value_type.as_ref() + } else { + input_type + } +} + +fn create_max_min_accs( + schema: &Schema, +) -> (Vec>, Vec>) { + let max_values: Vec> = schema + .fields() + .iter() + .map(|field| { + MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() + }) + .collect(); + let min_values: Vec> = schema + .fields() + .iter() + .map(|field| { + MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() + }) + .collect(); + (max_values, min_values) +} + +fn get_col_stats( + schema: &Schema, + null_counts: Vec>, + max_values: &mut [Option], + min_values: &mut [Option], +) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match max_values.get_mut(i).unwrap() { + Some(max_value) => max_value.evaluate().ok(), + None => None, + }; + let min_value = match min_values.get_mut(i).unwrap() { + Some(min_value) => min_value.evaluate().ok(), + None => None, + }; + ColumnStatistics { + null_count: null_counts[i], + max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), + min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + } + }) + .collect() +} + +fn summarize_min_max_null_counts( + min_accs: &mut [Option], + max_accs: &mut [Option], + null_counts_array: &mut [Precision], + arrow_schema_index: usize, + num_rows: usize, + stats_converter: &StatisticsConverter, + row_groups_metadata: &[RowGroupMetaData], +) -> Result<()> { + let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; + let min_values = stats_converter.row_group_mins(row_groups_metadata)?; + let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; + + if let Some(max_acc) = &mut max_accs[arrow_schema_index] { + max_acc.update_batch(&[max_values])?; + } + + if let Some(min_acc) = &mut min_accs[arrow_schema_index] { + min_acc.update_batch(&[min_values])?; + } + + null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { + Some(null_count) => null_count as usize, + None => num_rows, + }); + + Ok(()) +} + +/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. +pub struct CachedParquetMetaData(Arc); + +impl CachedParquetMetaData { + pub fn new(metadata: Arc) -> Self { + Self(metadata) + } + + pub fn parquet_metadata(&self) -> &Arc { + &self.0 + } +} + +impl FileMetadata for CachedParquetMetaData { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.0.memory_size() + } +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index ad59e7261cba..2f64f34bc09b 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -21,6 +21,7 @@ pub mod access_plan; pub mod file_format; +pub mod metadata; mod metrics; mod opener; mod page_filter; diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index df375818689c..68bf37fe9709 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -18,10 +18,11 @@ //! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for //! low level control of parquet file readers -use crate::{fetch_parquet_metadata, ParquetFileMetrics}; +use crate::metadata::DFParquetMetadata; +use crate::ParquetFileMetrics; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; -use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; use futures::FutureExt; @@ -29,7 +30,6 @@ use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::file::metadata::ParquetMetaData; -use std::any::Any; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -201,6 +201,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { }; Ok(Box::new(CachedParquetFileReader { + store: Arc::clone(&self.store), inner, file_metrics, file_meta, @@ -214,6 +215,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { /// updates the cache. pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, + store: Arc, pub inner: ParquetObjectReader, file_meta: FileMeta, metadata_cache: Arc, @@ -256,44 +258,19 @@ impl AsyncFileReader for CachedParquetFileReader { #[cfg(not(feature = "parquet_encryption"))] let file_decryption_properties = None; - fetch_parquet_metadata( - &mut self.inner, - &file_meta.object_meta, - None, - file_decryption_properties, - Some(metadata_cache), - ) - .await - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch metadata for file {}: {e}", - file_meta.object_meta.location, - )) - }) + // TODO should there be metadata prefetch hint here? + DFParquetMetadata::new(&self.store, &file_meta.object_meta) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(Arc::clone(&metadata_cache))) + .fetch_metadata() + .await + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch metadata for file {}: {e}", + file_meta.object_meta.location, + )) + }) } .boxed() } } - -/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. -pub struct CachedParquetMetaData(Arc); - -impl CachedParquetMetaData { - pub fn new(metadata: Arc) -> Self { - Self(metadata) - } - - pub fn parquet_metadata(&self) -> &Arc { - &self.0 - } -} - -impl FileMetadata for CachedParquetMetaData { - fn as_any(&self) -> &dyn Any { - self - } - - fn memory_size(&self) -> usize { - self.0.memory_size() - } -} From e0a49f04a2870464cc4b1de97e072feca68ab2af Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Aug 2025 17:04:23 -0400 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/datasource-parquet/src/metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 833338c4beaf..dbfd2c3720b7 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -327,7 +327,7 @@ impl<'a> DFParquetMetadata<'a> { /// Min/max aggregation can take Dictionary encode input but always produces unpacked /// (aka non Dictionary) output. We need to adjust the output data type to reflect this. /// The reason min/max aggregate produces unpacked output because there is only one -/// min/max value per group; there is no needs to keep them Dictionary encode +/// min/max value per group; there is no needs to keep them Dictionary encoded fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { if let DataType::Dictionary(_, value_type) = input_type { value_type.as_ref() From 8b402f818246cfbeb257f962e04b4ee922110255 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Aug 2025 17:05:35 -0400 Subject: [PATCH 3/4] Update datafusion/datasource-parquet/src/metadata.rs Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> --- datafusion/datasource-parquet/src/metadata.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index dbfd2c3720b7..5a042cbb7a2f 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -104,7 +104,7 @@ impl<'a> DFParquetMetadata<'a> { self } - /// fetch parquet metadata + /// Fetch parquet metadata pub async fn fetch_metadata(&self) -> Result> { let Self { store, From b0d1e5d165828da0967c4a98c110a5db4339d9a9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 21 Aug 2025 17:14:35 -0400 Subject: [PATCH 4/4] Improved comments --- datafusion/datasource-parquet/src/metadata.rs | 4 +++- datafusion/datasource-parquet/src/reader.rs | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index 5a042cbb7a2f..71c81a25001b 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -104,7 +104,7 @@ impl<'a> DFParquetMetadata<'a> { self } - /// Fetch parquet metadata + /// Fetch parquet metadata from the remote object store pub async fn fetch_metadata(&self) -> Result> { let Self { store, @@ -200,6 +200,8 @@ impl<'a> DFParquetMetadata<'a> { Ok((loc_path, schema)) } + /// Fetch the metadata from the Parquet file via [`Self::fetch_metadata`] and convert + /// the statistics in the metadata using [`Self::statistics_from_parquet_metadata`] pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result { let metadata = self.fetch_metadata().await?; Self::statistics_from_parquet_metadata(&metadata, table_schema) diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index fe676ba65992..9d2c52f721ba 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -261,7 +261,8 @@ impl AsyncFileReader for CachedParquetFileReader { #[cfg(not(feature = "parquet_encryption"))] let file_decryption_properties = None; - // TODO should there be metadata prefetch hint here? + // TODO there should be metadata prefetch hint here + // https://github.com/apache/datafusion/issues/17279 DFParquetMetadata::new(&self.store, &file_meta.object_meta) .with_decryption_properties(file_decryption_properties) .with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))