Skip to content

Commit ef90d05

Browse files
committed
Consolidate Parquet Metadata handling
1 parent ff77b70 commit ef90d05

File tree

7 files changed

+577
-551
lines changed

7 files changed

+577
-551
lines changed

datafusion/common/src/encryption.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ pub use parquet::encryption::decrypt::FileDecryptionProperties;
2424
pub use parquet::encryption::encrypt::FileEncryptionProperties;
2525

2626
#[cfg(not(feature = "parquet_encryption"))]
27+
#[derive(Default, Debug)]
2728
pub struct FileDecryptionProperties;
2829
#[cfg(not(feature = "parquet_encryption"))]
30+
#[derive(Default, Debug)]
2931
pub struct FileEncryptionProperties;
3032

3133
pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties};

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 67 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,7 @@ mod tests {
133133
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
134134
use datafusion_datasource::{ListingTableUrl, PartitionedFile};
135135
use datafusion_datasource_parquet::{
136-
fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc,
137-
ObjectStoreFetch, ParquetFormat, ParquetFormatFactory, ParquetSink,
136+
ParquetFormat, ParquetFormatFactory, ParquetSink,
138137
};
139138
use datafusion_execution::object_store::ObjectStoreUrl;
140139
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -143,13 +142,15 @@ mod tests {
143142
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
144143
use datafusion_physical_plan::{collect, ExecutionPlan};
145144

145+
use crate::test_util::bounded_stream;
146146
use arrow::array::{
147147
types::Int32Type, Array, ArrayRef, DictionaryArray, Int32Array, Int64Array,
148148
StringArray,
149149
};
150150
use arrow::datatypes::{DataType, Field};
151151
use async_trait::async_trait;
152152
use datafusion_datasource::file_groups::FileGroup;
153+
use datafusion_datasource_parquet::metadata::DFParquetMetadata;
153154
use futures::stream::BoxStream;
154155
use futures::StreamExt;
155156
use insta::assert_snapshot;
@@ -167,8 +168,6 @@ mod tests {
167168
use parquet::format::FileMetaData;
168169
use tokio::fs::File;
169170

170-
use crate::test_util::bounded_stream;
171-
172171
enum ForceViews {
173172
Yes,
174173
No,
@@ -195,31 +194,24 @@ mod tests {
195194
let format = ParquetFormat::default().with_force_view_types(force_views);
196195
let schema = format.infer_schema(&ctx, &store, &meta).await?;
197196

198-
let stats = fetch_statistics(
199-
store.as_ref(),
200-
schema.clone(),
201-
&meta[0],
202-
None,
203-
None,
204-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
205-
)
206-
.await?;
197+
let file_metadata_cache =
198+
ctx.runtime_env().cache_manager.get_file_metadata_cache();
199+
let stats = DFParquetMetadata::new(&store, &meta[0])
200+
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
201+
.fetch_statistics(&schema)
202+
.await?;
207203

208204
assert_eq!(stats.num_rows, Precision::Exact(3));
209205
let c1_stats = &stats.column_statistics[0];
210206
let c2_stats = &stats.column_statistics[1];
211207
assert_eq!(c1_stats.null_count, Precision::Exact(1));
212208
assert_eq!(c2_stats.null_count, Precision::Exact(3));
213209

214-
let stats = fetch_statistics(
215-
store.as_ref(),
216-
schema,
217-
&meta[1],
218-
None,
219-
None,
220-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
221-
)
222-
.await?;
210+
let stats = DFParquetMetadata::new(&store, &meta[1])
211+
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
212+
.fetch_statistics(&schema)
213+
.await?;
214+
223215
assert_eq!(stats.num_rows, Precision::Exact(3));
224216
let c1_stats = &stats.column_statistics[0];
225217
let c2_stats = &stats.column_statistics[1];
@@ -392,51 +384,27 @@ mod tests {
392384

393385
// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
394386
// for the remaining metadata
395-
fetch_parquet_metadata(
396-
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
397-
&meta[0],
398-
Some(9),
399-
None,
400-
None,
401-
)
402-
.await
403-
.expect("error reading metadata with hint");
387+
let file_metadata_cache =
388+
ctx.runtime_env().cache_manager.get_file_metadata_cache();
389+
let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
390+
.with_metadata_size_hint(Some(9));
391+
df_meta.fetch_metadata().await?;
404392
assert_eq!(store.request_count(), 2);
405393

394+
let df_meta =
395+
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
396+
406397
// Increases by 3 because cache has no entries yet
407-
fetch_parquet_metadata(
408-
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
409-
&meta[0],
410-
Some(9),
411-
None,
412-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
413-
)
414-
.await
415-
.expect("error reading metadata with hint");
398+
df_meta.fetch_metadata().await?;
416399
assert_eq!(store.request_count(), 5);
417400

418401
// No increase because cache has an entry
419-
fetch_parquet_metadata(
420-
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
421-
&meta[0],
422-
Some(9),
423-
None,
424-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
425-
)
426-
.await
427-
.expect("error reading metadata with hint");
402+
df_meta.fetch_metadata().await?;
428403
assert_eq!(store.request_count(), 5);
429404

430405
// Increase by 2 because `get_file_metadata_cache()` is None
431-
fetch_parquet_metadata(
432-
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
433-
&meta[0],
434-
Some(9),
435-
None,
436-
None,
437-
)
438-
.await
439-
.expect("error reading metadata with hint");
406+
let df_meta = df_meta.with_file_metadata_cache(None);
407+
df_meta.fetch_metadata().await?;
440408
assert_eq!(store.request_count(), 7);
441409

442410
let force_views = match force_views {
@@ -454,15 +422,9 @@ mod tests {
454422
assert_eq!(store.request_count(), 10);
455423

456424
// No increase, cache being used
457-
let stats = fetch_statistics(
458-
store.upcast().as_ref(),
459-
schema.clone(),
460-
&meta[0],
461-
Some(9),
462-
None,
463-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
464-
)
465-
.await?;
425+
let df_meta =
426+
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
427+
let stats = df_meta.fetch_statistics(&schema).await?;
466428
assert_eq!(store.request_count(), 10);
467429

468430
assert_eq!(stats.num_rows, Precision::Exact(3));
@@ -477,55 +439,30 @@ mod tests {
477439

478440
// Use the file size as the hint so we can get the full metadata from the first fetch
479441
let size_hint = meta[0].size as usize;
442+
let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
443+
.with_metadata_size_hint(Some(size_hint));
480444

481-
fetch_parquet_metadata(
482-
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
483-
&meta[0],
484-
Some(size_hint),
485-
None,
486-
None,
487-
)
488-
.await
489-
.expect("error reading metadata with hint");
445+
df_meta.fetch_metadata().await?;
490446
// ensure the requests were coalesced into a single request
491447
assert_eq!(store.request_count(), 1);
492448

493449
let session = SessionContext::new();
494450
let ctx = session.state();
451+
let file_metadata_cache =
452+
ctx.runtime_env().cache_manager.get_file_metadata_cache();
453+
let df_meta =
454+
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
495455
// Increases by 1 because cache has no entries yet and new session context
496-
fetch_parquet_metadata(
497-
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
498-
&meta[0],
499-
Some(size_hint),
500-
None,
501-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
502-
)
503-
.await
504-
.expect("error reading metadata with hint");
456+
df_meta.fetch_metadata().await?;
505457
assert_eq!(store.request_count(), 2);
506458

507459
// No increase because cache has an entry
508-
fetch_parquet_metadata(
509-
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
510-
&meta[0],
511-
Some(size_hint),
512-
None,
513-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
514-
)
515-
.await
516-
.expect("error reading metadata with hint");
460+
df_meta.fetch_metadata().await?;
517461
assert_eq!(store.request_count(), 2);
518462

519463
// Increase by 1 because `get_file_metadata_cache` is None
520-
fetch_parquet_metadata(
521-
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
522-
&meta[0],
523-
Some(size_hint),
524-
None,
525-
None,
526-
)
527-
.await
528-
.expect("error reading metadata with hint");
464+
let df_meta = df_meta.with_file_metadata_cache(None);
465+
df_meta.fetch_metadata().await?;
529466
assert_eq!(store.request_count(), 3);
530467

531468
let format = ParquetFormat::default()
@@ -538,15 +475,9 @@ mod tests {
538475
let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
539476
assert_eq!(store.request_count(), 4);
540477
// No increase, cache being used
541-
let stats = fetch_statistics(
542-
store.upcast().as_ref(),
543-
schema.clone(),
544-
&meta[0],
545-
Some(size_hint),
546-
None,
547-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
548-
)
549-
.await?;
478+
let df_meta =
479+
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
480+
let stats = df_meta.fetch_statistics(&schema).await?;
550481
assert_eq!(store.request_count(), 4);
551482

552483
assert_eq!(stats.num_rows, Precision::Exact(3));
@@ -559,29 +490,18 @@ mod tests {
559490
LocalFileSystem::new(),
560491
)));
561492

562-
// Use the a size hint larger than the file size to make sure we don't panic
493+
// Use a size hint larger than the file size to make sure we don't panic
563494
let size_hint = (meta[0].size + 100) as usize;
564-
fetch_parquet_metadata(
565-
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
566-
&meta[0],
567-
Some(size_hint),
568-
None,
569-
None,
570-
)
571-
.await
572-
.expect("error reading metadata with hint");
495+
let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
496+
.with_metadata_size_hint(Some(size_hint));
497+
498+
df_meta.fetch_metadata().await?;
573499
assert_eq!(store.request_count(), 1);
574500

575501
// No increase because cache has an entry
576-
fetch_parquet_metadata(
577-
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
578-
&meta[0],
579-
Some(size_hint),
580-
None,
581-
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
582-
)
583-
.await
584-
.expect("error reading metadata with hint");
502+
let df_meta =
503+
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
504+
df_meta.fetch_metadata().await?;
585505
assert_eq!(store.request_count(), 1);
586506

587507
Ok(())
@@ -622,16 +542,12 @@ mod tests {
622542
assert_eq!(store.request_count(), 3);
623543

624544
// No increase in request count because cache is not empty
625-
let pq_meta = fetch_parquet_metadata(
626-
ObjectStoreFetch::new(store.as_ref(), &files[0]),
627-
&files[0],
628-
None,
629-
None,
630-
Some(state.runtime_env().cache_manager.get_file_metadata_cache()),
631-
)
632-
.await?;
633-
assert_eq!(store.request_count(), 3);
634-
let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?;
545+
let file_metadata_cache =
546+
state.runtime_env().cache_manager.get_file_metadata_cache();
547+
let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
548+
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
549+
.fetch_statistics(&schema)
550+
.await?;
635551
assert_eq!(stats.num_rows, Precision::Exact(4));
636552

637553
// column c_dic
@@ -691,16 +607,13 @@ mod tests {
691607
};
692608

693609
// No increase in request count because cache is not empty
694-
let pq_meta = fetch_parquet_metadata(
695-
ObjectStoreFetch::new(store.as_ref(), &files[0]),
696-
&files[0],
697-
None,
698-
None,
699-
Some(state.runtime_env().cache_manager.get_file_metadata_cache()),
700-
)
701-
.await?;
610+
let file_metadata_cache =
611+
state.runtime_env().cache_manager.get_file_metadata_cache();
612+
let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
613+
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
614+
.fetch_statistics(&schema)
615+
.await?;
702616
assert_eq!(store.request_count(), 6);
703-
let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?;
704617
assert_eq!(stats.num_rows, Precision::Exact(3));
705618
// column c1
706619
let c1_stats = &stats.column_statistics[0];
@@ -725,16 +638,11 @@ mod tests {
725638
assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));
726639

727640
// No increase in request count because cache is not empty
728-
let pq_meta = fetch_parquet_metadata(
729-
ObjectStoreFetch::new(store.as_ref(), &files[1]),
730-
&files[1],
731-
None,
732-
None,
733-
Some(state.runtime_env().cache_manager.get_file_metadata_cache()),
734-
)
735-
.await?;
641+
let stats = DFParquetMetadata::new(store.as_ref(), &files[1])
642+
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
643+
.fetch_statistics(&schema)
644+
.await?;
736645
assert_eq!(store.request_count(), 6);
737-
let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?;
738646
assert_eq!(stats.num_rows, Precision::Exact(3));
739647
// column c1: missing from the file so the table treats all 3 rows as null
740648
let c1_stats = &stats.column_statistics[0];

0 commit comments

Comments
 (0)