Skip to content
2 changes: 2 additions & 0 deletions datafusion/common/src/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ pub use parquet::encryption::decrypt::FileDecryptionProperties;
pub use parquet::encryption::encrypt::FileEncryptionProperties;

#[cfg(not(feature = "parquet_encryption"))]
#[derive(Default, Debug)]
pub struct FileDecryptionProperties;
#[cfg(not(feature = "parquet_encryption"))]
#[derive(Default, Debug)]
pub struct FileEncryptionProperties;

pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties};
Expand Down
226 changes: 67 additions & 159 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ mod tests {
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::{ListingTableUrl, PartitionedFile};
use datafusion_datasource_parquet::{
fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc,
ObjectStoreFetch, ParquetFormat, ParquetFormatFactory, ParquetSink,
ParquetFormat, ParquetFormatFactory, ParquetSink,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
Expand All @@ -143,13 +142,15 @@ mod tests {
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{collect, ExecutionPlan};

use crate::test_util::bounded_stream;
use arrow::array::{
types::Int32Type, Array, ArrayRef, DictionaryArray, Int32Array, Int64Array,
StringArray,
};
use arrow::datatypes::{DataType, Field};
use async_trait::async_trait;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource_parquet::metadata::DFParquetMetadata;
use futures::stream::BoxStream;
use futures::StreamExt;
use insta::assert_snapshot;
Expand All @@ -167,8 +168,6 @@ mod tests {
use parquet::format::FileMetaData;
use tokio::fs::File;

use crate::test_util::bounded_stream;

enum ForceViews {
Yes,
No,
Expand All @@ -195,31 +194,24 @@ mod tests {
let format = ParquetFormat::default().with_force_view_types(force_views);
let schema = format.infer_schema(&ctx, &store, &meta).await?;

let stats = fetch_statistics(
store.as_ref(),
schema.clone(),
&meta[0],
None,
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
let file_metadata_cache =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shows the key API difference -- instead of calling a bunch of free functions, you now construct a DFParquetMetadata and call methods on that struct instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks way cleaner now.

ctx.runtime_env().cache_manager.get_file_metadata_cache();
let stats = DFParquetMetadata::new(&store, &meta[0])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.fetch_statistics(&schema)
.await?;

assert_eq!(stats.num_rows, Precision::Exact(3));
let c1_stats = &stats.column_statistics[0];
let c2_stats = &stats.column_statistics[1];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c2_stats.null_count, Precision::Exact(3));

let stats = fetch_statistics(
store.as_ref(),
schema,
&meta[1],
None,
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
let stats = DFParquetMetadata::new(&store, &meta[1])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.fetch_statistics(&schema)
.await?;

assert_eq!(stats.num_rows, Precision::Exact(3));
let c1_stats = &stats.column_statistics[0];
let c2_stats = &stats.column_statistics[1];
Expand Down Expand Up @@ -392,51 +384,27 @@ mod tests {

// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
fetch_parquet_metadata(
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
&meta[0],
Some(9),
None,
None,
)
.await
.expect("error reading metadata with hint");
let file_metadata_cache =
ctx.runtime_env().cache_manager.get_file_metadata_cache();
let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
.with_metadata_size_hint(Some(9));
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 2);

let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));

// Increases by 3 because cache has no entries yet
fetch_parquet_metadata(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new struct makes it much clearer what is being tested vs what is test setup functionality and I find the updated tests to be much easier to read

ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
&meta[0],
Some(9),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await
.expect("error reading metadata with hint");
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 5);

// No increase because cache has an entry
fetch_parquet_metadata(
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
&meta[0],
Some(9),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await
.expect("error reading metadata with hint");
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 5);

// Increase by 2 because `get_file_metadata_cache()` is None
fetch_parquet_metadata(
ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]),
&meta[0],
Some(9),
None,
None,
)
.await
.expect("error reading metadata with hint");
let df_meta = df_meta.with_file_metadata_cache(None);
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 7);

let force_views = match force_views {
Expand All @@ -454,15 +422,9 @@ mod tests {
assert_eq!(store.request_count(), 10);

// No increase, cache being used
let stats = fetch_statistics(
store.upcast().as_ref(),
schema.clone(),
&meta[0],
Some(9),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let stats = df_meta.fetch_statistics(&schema).await?;
assert_eq!(store.request_count(), 10);

assert_eq!(stats.num_rows, Precision::Exact(3));
Expand All @@ -477,55 +439,30 @@ mod tests {

// Use the file size as the hint so we can get the full metadata from the first fetch
let size_hint = meta[0].size as usize;
let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
.with_metadata_size_hint(Some(size_hint));

fetch_parquet_metadata(
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
&meta[0],
Some(size_hint),
None,
None,
)
.await
.expect("error reading metadata with hint");
df_meta.fetch_metadata().await?;
// ensure the requests were coalesced into a single request
assert_eq!(store.request_count(), 1);

let session = SessionContext::new();
let ctx = session.state();
let file_metadata_cache =
ctx.runtime_env().cache_manager.get_file_metadata_cache();
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
// Increases by 1 because cache has no entries yet and new session context
fetch_parquet_metadata(
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
&meta[0],
Some(size_hint),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await
.expect("error reading metadata with hint");
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 2);

// No increase because cache has an entry
fetch_parquet_metadata(
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
&meta[0],
Some(size_hint),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await
.expect("error reading metadata with hint");
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 2);

// Increase by 1 because `get_file_metadata_cache` is None
fetch_parquet_metadata(
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
&meta[0],
Some(size_hint),
None,
None,
)
.await
.expect("error reading metadata with hint");
let df_meta = df_meta.with_file_metadata_cache(None);
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 3);

let format = ParquetFormat::default()
Expand All @@ -538,15 +475,9 @@ mod tests {
let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
assert_eq!(store.request_count(), 4);
// No increase, cache being used
let stats = fetch_statistics(
store.upcast().as_ref(),
schema.clone(),
&meta[0],
Some(size_hint),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
let stats = df_meta.fetch_statistics(&schema).await?;
assert_eq!(store.request_count(), 4);

assert_eq!(stats.num_rows, Precision::Exact(3));
Expand All @@ -559,29 +490,18 @@ mod tests {
LocalFileSystem::new(),
)));

// Use the a size hint larger than the file size to make sure we don't panic
// Use a size hint larger than the file size to make sure we don't panic
let size_hint = (meta[0].size + 100) as usize;
fetch_parquet_metadata(
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
&meta[0],
Some(size_hint),
None,
None,
)
.await
.expect("error reading metadata with hint");
let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
.with_metadata_size_hint(Some(size_hint));

df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 1);

// No increase because cache has an entry
fetch_parquet_metadata(
ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]),
&meta[0],
Some(size_hint),
None,
Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await
.expect("error reading metadata with hint");
let df_meta =
df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
df_meta.fetch_metadata().await?;
assert_eq!(store.request_count(), 1);

Ok(())
Expand Down Expand Up @@ -634,16 +554,12 @@ mod tests {
assert_eq!(store.request_count(), 3);

// No increase in request count because cache is not empty
let pq_meta = fetch_parquet_metadata(
ObjectStoreFetch::new(store.as_ref(), &files[0]),
&files[0],
None,
None,
Some(state.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
assert_eq!(store.request_count(), 3);
let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.fetch_statistics(&schema)
.await?;
assert_eq!(stats.num_rows, Precision::Exact(4));

// column c_dic
Expand Down Expand Up @@ -716,16 +632,13 @@ mod tests {
};

// No increase in request count because cache is not empty
let pq_meta = fetch_parquet_metadata(
ObjectStoreFetch::new(store.as_ref(), &files[0]),
&files[0],
None,
None,
Some(state.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
let file_metadata_cache =
state.runtime_env().cache_manager.get_file_metadata_cache();
let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.fetch_statistics(&schema)
.await?;
assert_eq!(store.request_count(), 6);
let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?;
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1
let c1_stats = &stats.column_statistics[0];
Expand All @@ -750,16 +663,11 @@ mod tests {
assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));

// No increase in request count because cache is not empty
let pq_meta = fetch_parquet_metadata(
ObjectStoreFetch::new(store.as_ref(), &files[1]),
&files[1],
None,
None,
Some(state.runtime_env().cache_manager.get_file_metadata_cache()),
)
.await?;
let stats = DFParquetMetadata::new(store.as_ref(), &files[1])
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
.fetch_statistics(&schema)
.await?;
assert_eq!(store.request_count(), 6);
let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?;
assert_eq!(stats.num_rows, Precision::Exact(3));
// column c1: missing from the file so the table treats all 3 rows as null
let c1_stats = &stats.column_statistics[0];
Expand Down
Loading