Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,23 @@ mod tests {
let dic_array = DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values))?;
let c_dic: ArrayRef = Arc::new(dic_array);

let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)])?;
// Data for column string_truncation: ["a".repeat(128), null, "b".repeat(128), null]
let string_truncation: ArrayRef = Arc::new(StringArray::from(vec![
Some("a".repeat(128)),
None,
Some("b".repeat(128)),
None,
]));

let batch1 = RecordBatch::try_from_iter(vec![
("c_dic", c_dic),
("string_truncation", string_truncation),
])?;

// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available.
// - column string_truncation that has 4 rows with 2 nulls. Stats min and max of string column is available but not exact.
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
Expand Down Expand Up @@ -647,6 +659,19 @@ mod tests {
Precision::Exact(Utf8(Some("a".into())))
);

// column string_truncation
let string_truncation_stats = &stats.column_statistics[1];

assert_eq!(string_truncation_stats.null_count, Precision::Exact(2));
assert_eq!(
string_truncation_stats.max_value,
Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c")))
);
assert_eq!(
string_truncation_stats.min_value,
Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64))))
);

Ok(())
}

Expand Down
182 changes: 154 additions & 28 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

use std::any::Any;
use std::cell::RefCell;
use std::fmt;
use std::fmt::Debug;
use std::ops::Range;
use std::rc::Rc;
use std::sync::Arc;
use std::{fmt, vec};

use arrow::array::RecordBatch;
use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
Expand All @@ -36,7 +36,8 @@ use datafusion_datasource::write::{
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;

use arrow::compute::sum;
use arrow::compute::kernels::cmp::eq;
use arrow::compute::{and, sum};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
#[cfg(feature = "parquet_encryption")]
Expand All @@ -46,7 +47,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION,
DataFusionError, GetExt, HashSet, Result, ScalarValue, DEFAULT_PARQUET_EXTENSION,
};
use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
Expand Down Expand Up @@ -1170,7 +1171,8 @@ pub async fn fetch_statistics(
/// # When only some columns have statistics:
///
/// For columns with statistics:
/// - Min/max values are properly extracted and represented as Precision::Exact
/// - Min/max values are properly extracted and represented as [Precision::Exact] or [Precision::Inexact]
/// depending on the `is_max_value_exact` and `is_min_value_exact` flags.
/// - Null counts are calculated by summing across row groups
///
/// For columns without statistics,
Expand Down Expand Up @@ -1216,6 +1218,8 @@ pub fn statistics_from_parquet_meta_calc(
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
let mut null_counts_array =
vec![Precision::Exact(0); table_schema.fields().len()];
let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()];
let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()];

table_schema
.fields()
Expand All @@ -1228,10 +1232,15 @@ pub fn statistics_from_parquet_meta_calc(
file_metadata.schema_descr(),
) {
Ok(stats_converter) => {
let mut accumulators = StatisticsAccumulators {
min_accs: &mut min_accs,
max_accs: &mut max_accs,
null_counts_array: &mut null_counts_array,
is_min_value_exact: &mut is_min_value_exact,
is_max_value_exact: &mut is_max_value_exact,
};
summarize_min_max_null_counts(
&mut min_accs,
&mut max_accs,
&mut null_counts_array,
&mut accumulators,
idx,
num_rows,
&stats_converter,
Expand All @@ -1251,6 +1260,8 @@ pub fn statistics_from_parquet_meta_calc(
null_counts_array,
&mut max_accs,
&mut min_accs,
&mut is_max_value_exact,
&mut is_min_value_exact,
)
} else {
Statistics::unknown_column(&table_schema)
Expand All @@ -1264,32 +1275,57 @@ fn get_col_stats(
null_counts: Vec<Precision<usize>>,
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
is_max_value_exact: &mut [Option<bool>],
is_min_value_exact: &mut [Option<bool>],
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
.map(|i| {
let max_value = match max_values.get_mut(i).unwrap() {
Some(max_value) => max_value.evaluate().ok(),
None => None,
let max_value = match (
max_values.get_mut(i).unwrap(),
is_max_value_exact.get(i).unwrap(),
) {
(Some(max_value), Some(true)) => {
max_value.evaluate().ok().map(Precision::Exact)
}
(Some(max_value), Some(false)) | (Some(max_value), None) => {
max_value.evaluate().ok().map(Precision::Inexact)
}
(None, _) => None,
};
let min_value = match min_values.get_mut(i).unwrap() {
Some(min_value) => min_value.evaluate().ok(),
None => None,
let min_value = match (
min_values.get_mut(i).unwrap(),
is_min_value_exact.get(i).unwrap(),
) {
(Some(min_value), Some(true)) => {
min_value.evaluate().ok().map(Precision::Exact)
}
(Some(min_value), Some(false)) | (Some(min_value), None) => {
min_value.evaluate().ok().map(Precision::Inexact)
}
(None, _) => None,
};
ColumnStatistics {
null_count: null_counts[i],
max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
max_value: max_value.unwrap_or(Precision::Absent),
min_value: min_value.unwrap_or(Precision::Absent),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
}
})
.collect()
}

/// Holds the accumulator state for collecting statistics from row groups
struct StatisticsAccumulators<'a> {
min_accs: &'a mut [Option<MinAccumulator>],
max_accs: &'a mut [Option<MaxAccumulator>],
null_counts_array: &'a mut [Precision<usize>],
is_min_value_exact: &'a mut [Option<bool>],
is_max_value_exact: &'a mut [Option<bool>],
}

fn summarize_min_max_null_counts(
min_accs: &mut [Option<MinAccumulator>],
max_accs: &mut [Option<MaxAccumulator>],
null_counts_array: &mut [Precision<usize>],
accumulators: &mut StatisticsAccumulators,
arrow_schema_index: usize,
num_rows: usize,
stats_converter: &StatisticsConverter,
Expand All @@ -1298,19 +1334,36 @@ fn summarize_min_max_null_counts(
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;

if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
max_acc.update_batch(&[max_values])?;
let is_max_value_exact_stat =
stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
let is_min_value_exact_stat =
stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;

if let Some(max_acc) = &mut accumulators.max_accs[arrow_schema_index] {
max_acc.update_batch(&[Arc::clone(&max_values)])?;
let mut cur_max_acc = max_acc.clone();
accumulators.is_max_value_exact[arrow_schema_index] = has_any_exact_match(
cur_max_acc.evaluate()?,
max_values,
is_max_value_exact_stat,
);
}

if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
min_acc.update_batch(&[min_values])?;
if let Some(min_acc) = &mut accumulators.min_accs[arrow_schema_index] {
min_acc.update_batch(&[Arc::clone(&min_values)])?;
let mut cur_min_acc = min_acc.clone();
accumulators.is_min_value_exact[arrow_schema_index] = has_any_exact_match(
cur_min_acc.evaluate()?,
min_values,
is_min_value_exact_stat,
);
}

null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
Some(null_count) => null_count as usize,
None => num_rows,
});
accumulators.null_counts_array[arrow_schema_index] =
Precision::Exact(match sum(&null_counts) {
Some(null_count) => null_count as usize,
None => num_rows,
});

Ok(())
}
Expand Down Expand Up @@ -1967,12 +2020,38 @@ fn create_max_min_accs(
(max_values, min_values)
}

/// Checks if any occurrence of `value` in `array` corresponds to a `true`
/// entry in the `exactness` array.
///
/// This is used to determine if a calculated statistic (e.g., min or max)
/// is exact, by checking if at least one of its source values was exact.
///
/// # Example
/// - `value`: `0`
/// - `array`: `[0, 1, 0, 3, 0, 5]`
/// - `exactness`: `[true, false, false, false, false, false]`
///
/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness
/// values are `[true, false, false]`. Since at least one is `true`, the
/// function returns `Some(true)`.
fn has_any_exact_match(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated a unit test with 4 possible scenarios. Also use a struct to make clippy happy, PTAL :)

value: ScalarValue,
array: ArrayRef,
exactness: BooleanArray,
) -> Option<bool> {
let scalar_array = value.to_scalar().ok()?;
let eq_mask = eq(&scalar_array, &array).ok()?;
let combined_mask = and(&eq_mask, &exactness).ok()?;
Some(combined_mask.true_count() > 0)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;

use arrow::array::{BooleanArray, Int32Array};
use arrow::datatypes::DataType;
use parquet::schema::parser::parse_message_type;

Expand Down Expand Up @@ -2182,4 +2261,51 @@ mod tests {

assert_eq!(result, expected_schema);
}

#[test]
fn test_has_any_exact_match() {
// Case 1: Mixed exact and inexact matches
{
let computed_min = ScalarValue::Int32(Some(0));
let row_group_mins =
Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
let exactness =
BooleanArray::from(vec![true, false, false, false, false, false]);

let result = has_any_exact_match(computed_min, row_group_mins, exactness);
assert_eq!(result, Some(true));
}
// Case 2: All inexact matches
{
let computed_min = ScalarValue::Int32(Some(0));
let row_group_mins =
Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
let exactness =
BooleanArray::from(vec![false, false, false, false, false, false]);

let result = has_any_exact_match(computed_min, row_group_mins, exactness);
assert_eq!(result, Some(false));
}
// Case 3: All exact matches
{
let computed_max = ScalarValue::Int32(Some(5));
let row_group_maxes =
Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef;
let exactness =
BooleanArray::from(vec![false, true, true, true, false, true]);

let result = has_any_exact_match(computed_max, row_group_maxes, exactness);
assert_eq!(result, Some(true));
}
// Case 4: All maxes are null values
{
let computed_max = ScalarValue::Int32(None);
let row_group_maxes =
Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef;
let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]);

let result = has_any_exact_match(computed_max, row_group_maxes, exactness);
assert_eq!(result, Some(false));
}
}
}
4 changes: 2 additions & 2 deletions datafusion/functions-aggregate/src/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ macro_rules! min_max {
}

/// An accumulator to compute the maximum value
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MaxAccumulator {
max: ScalarValue,
}
Expand Down Expand Up @@ -1057,7 +1057,7 @@ impl AggregateUDFImpl for Min {
}

/// An accumulator to compute the minimum value
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MinAccumulator {
min: ScalarValue,
}
Expand Down