Skip to content

Commit bf10479

Browse files
committed
fix: respect inexact flags in row group metadata
1 parent 8103396 commit bf10479

File tree

3 files changed

+112
-18
lines changed

3 files changed

+112
-18
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,11 +603,23 @@ mod tests {
603603
let dic_array = DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values))?;
604604
let c_dic: ArrayRef = Arc::new(dic_array);
605605

606-
let batch1 = RecordBatch::try_from_iter(vec![("c_dic", c_dic)])?;
606+
// Data for column string_truncation: ["a".repeat(128), null, null, null, null]
607+
let string_truncation: ArrayRef = Arc::new(StringArray::from(vec![
608+
Some("a".repeat(128)),
609+
None,
610+
Some("b".repeat(128)),
611+
None,
612+
]));
613+
614+
let batch1 = RecordBatch::try_from_iter(vec![
615+
("c_dic", c_dic),
616+
("string_truncation", string_truncation),
617+
])?;
607618

608619
// Use store_parquet to write each batch to its own file
609620
// . batch1 written into first file and includes:
610621
// - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available.
622+
// - column string_truncation that has 4 rows with 3 nulls. Stats min and max of string column is available but not exact.
611623
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
612624
LocalFileSystem::new(),
613625
)));
@@ -647,6 +659,19 @@ mod tests {
647659
Precision::Exact(Utf8(Some("a".into())))
648660
);
649661

662+
// column string_truncation
663+
let string_truncation_stats = &stats.column_statistics[1];
664+
665+
assert_eq!(string_truncation_stats.null_count, Precision::Exact(2));
666+
assert_eq!(
667+
string_truncation_stats.max_value,
668+
Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c")))
669+
);
670+
assert_eq!(
671+
string_truncation_stats.min_value,
672+
Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64))))
673+
);
674+
650675
Ok(())
651676
}
652677

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919
2020
use std::any::Any;
2121
use std::cell::RefCell;
22-
use std::fmt;
2322
use std::fmt::Debug;
2423
use std::ops::Range;
2524
use std::rc::Rc;
2625
use std::sync::Arc;
26+
use std::{fmt, vec};
2727

28-
use arrow::array::RecordBatch;
28+
use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
2929
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
3030
use datafusion_datasource::file_compression_type::FileCompressionType;
3131
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
@@ -36,7 +36,8 @@ use datafusion_datasource::write::{
3636
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
3737
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
3838

39-
use arrow::compute::sum;
39+
use arrow::compute::kernels::cmp::eq;
40+
use arrow::compute::{and, sum};
4041
use arrow::datatypes::{DataType, Field, FieldRef};
4142
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
4243
#[cfg(feature = "parquet_encryption")]
@@ -46,7 +47,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
4647
use datafusion_common::stats::Precision;
4748
use datafusion_common::{
4849
internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics,
49-
DataFusionError, GetExt, HashSet, Result, DEFAULT_PARQUET_EXTENSION,
50+
DataFusionError, GetExt, HashSet, Result, ScalarValue, DEFAULT_PARQUET_EXTENSION,
5051
};
5152
use datafusion_common::{HashMap, Statistics};
5253
use datafusion_common_runtime::{JoinSet, SpawnedTask};
@@ -1170,7 +1171,8 @@ pub async fn fetch_statistics(
11701171
/// # When only some columns have statistics:
11711172
///
11721173
/// For columns with statistics:
1173-
/// - Min/max values are properly extracted and represented as Precision::Exact
1174+
/// - Min/max values are properly extracted and represented as [Precision::Exact] or [Precision::Inexact]
1175+
/// depending on the `is_max_value_exact` and `is_min_value_exact` flags.
11741176
/// - Null counts are calculated by summing across row groups
11751177
///
11761178
/// For columns without statistics,
@@ -1216,6 +1218,8 @@ pub fn statistics_from_parquet_meta_calc(
12161218
let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema);
12171219
let mut null_counts_array =
12181220
vec![Precision::Exact(0); table_schema.fields().len()];
1221+
let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()];
1222+
let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()];
12191223

12201224
table_schema
12211225
.fields()
@@ -1232,6 +1236,8 @@ pub fn statistics_from_parquet_meta_calc(
12321236
&mut min_accs,
12331237
&mut max_accs,
12341238
&mut null_counts_array,
1239+
&mut is_min_value_exact,
1240+
&mut is_max_value_exact,
12351241
idx,
12361242
num_rows,
12371243
&stats_converter,
@@ -1251,6 +1257,8 @@ pub fn statistics_from_parquet_meta_calc(
12511257
null_counts_array,
12521258
&mut max_accs,
12531259
&mut min_accs,
1260+
&mut is_max_value_exact,
1261+
&mut is_min_value_exact,
12541262
)
12551263
} else {
12561264
Statistics::unknown_column(&table_schema)
@@ -1264,21 +1272,39 @@ fn get_col_stats(
12641272
null_counts: Vec<Precision<usize>>,
12651273
max_values: &mut [Option<MaxAccumulator>],
12661274
min_values: &mut [Option<MinAccumulator>],
1275+
is_max_value_exact: &mut [Option<bool>],
1276+
is_min_value_exact: &mut [Option<bool>],
12671277
) -> Vec<ColumnStatistics> {
12681278
(0..schema.fields().len())
12691279
.map(|i| {
1270-
let max_value = match max_values.get_mut(i).unwrap() {
1271-
Some(max_value) => max_value.evaluate().ok(),
1272-
None => None,
1280+
let max_value = match (
1281+
max_values.get_mut(i).unwrap(),
1282+
is_max_value_exact.get(i).unwrap(),
1283+
) {
1284+
(Some(max_value), Some(true)) => {
1285+
max_value.evaluate().ok().map(Precision::Exact)
1286+
}
1287+
(Some(max_value), Some(false)) | (Some(max_value), None) => {
1288+
max_value.evaluate().ok().map(Precision::Inexact)
1289+
}
1290+
(None, _) => None,
12731291
};
1274-
let min_value = match min_values.get_mut(i).unwrap() {
1275-
Some(min_value) => min_value.evaluate().ok(),
1276-
None => None,
1292+
let min_value = match (
1293+
min_values.get_mut(i).unwrap(),
1294+
is_min_value_exact.get(i).unwrap(),
1295+
) {
1296+
(Some(min_value), Some(true)) => {
1297+
min_value.evaluate().ok().map(Precision::Exact)
1298+
}
1299+
(Some(min_value), Some(false)) | (Some(min_value), None) => {
1300+
min_value.evaluate().ok().map(Precision::Inexact)
1301+
}
1302+
(None, _) => None,
12771303
};
12781304
ColumnStatistics {
12791305
null_count: null_counts[i],
1280-
max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent),
1281-
min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent),
1306+
max_value: max_value.unwrap_or(Precision::Absent),
1307+
min_value: min_value.unwrap_or(Precision::Absent),
12821308
sum_value: Precision::Absent,
12831309
distinct_count: Precision::Absent,
12841310
}
@@ -1290,6 +1316,8 @@ fn summarize_min_max_null_counts(
12901316
min_accs: &mut [Option<MinAccumulator>],
12911317
max_accs: &mut [Option<MaxAccumulator>],
12921318
null_counts_array: &mut [Precision<usize>],
1319+
is_min_value_exact: &mut [Option<bool>],
1320+
is_max_value_exact: &mut [Option<bool>],
12931321
arrow_schema_index: usize,
12941322
num_rows: usize,
12951323
stats_converter: &StatisticsConverter,
@@ -1298,13 +1326,29 @@ fn summarize_min_max_null_counts(
12981326
let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
12991327
let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
13001328
let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
1329+
let is_max_value_exact_stat =
1330+
stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
1331+
let is_min_value_exact_stat =
1332+
stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
13011333

13021334
if let Some(max_acc) = &mut max_accs[arrow_schema_index] {
1303-
max_acc.update_batch(&[max_values])?;
1335+
max_acc.update_batch(&[Arc::clone(&max_values)])?;
1336+
let mut cur_max_acc = max_acc.clone();
1337+
is_max_value_exact[arrow_schema_index] = has_any_exact_match(
1338+
cur_max_acc.evaluate()?,
1339+
max_values,
1340+
is_max_value_exact_stat,
1341+
);
13041342
}
13051343

13061344
if let Some(min_acc) = &mut min_accs[arrow_schema_index] {
1307-
min_acc.update_batch(&[min_values])?;
1345+
min_acc.update_batch(&[Arc::clone(&min_values)])?;
1346+
let mut cur_min_acc = min_acc.clone();
1347+
is_min_value_exact[arrow_schema_index] = has_any_exact_match(
1348+
cur_min_acc.evaluate()?,
1349+
min_values,
1350+
is_min_value_exact_stat,
1351+
);
13081352
}
13091353

13101354
null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) {
@@ -1967,6 +2011,31 @@ fn create_max_min_accs(
19672011
(max_values, min_values)
19682012
}
19692013

2014+
/// Checks if any occurrence of `value` in `array` corresponds to a `true`
2015+
/// entry in the `exactness` array.
2016+
///
2017+
/// This is used to determine if a calculated statistic (e.g., min or max)
2018+
/// is exact, by checking if at least one of its source values was exact.
2019+
///
2020+
/// # Example
2021+
/// - `value`: `0`
2022+
/// - `array`: `[0, 1, 0, 3, 0, 5]`
2023+
/// - `exactness`: `[true, false, false, false, false, false]`
2024+
///
2025+
/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness
2026+
/// values are `[true, false, false]`. Since at least one is `true`, the
2027+
/// function returns `Some(true)`.
2028+
fn has_any_exact_match(
2029+
value: ScalarValue,
2030+
array: ArrayRef,
2031+
exactness: BooleanArray,
2032+
) -> Option<bool> {
2033+
let scalar_array = value.to_scalar().ok()?;
2034+
let eq_mask = eq(&scalar_array, &array).ok()?;
2035+
let combined_mask = and(&eq_mask, &exactness).ok()?;
2036+
Some(combined_mask.true_count() > 0)
2037+
}
2038+
19702039
#[cfg(test)]
19712040
mod tests {
19722041
use std::sync::Arc;

datafusion/functions-aggregate/src/min_max.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ macro_rules! min_max {
736736
}
737737

738738
/// An accumulator to compute the maximum value
739-
#[derive(Debug)]
739+
#[derive(Debug, Clone)]
740740
pub struct MaxAccumulator {
741741
max: ScalarValue,
742742
}
@@ -1057,7 +1057,7 @@ impl AggregateUDFImpl for Min {
10571057
}
10581058

10591059
/// An accumulator to compute the minimum value
1060-
#[derive(Debug)]
1060+
#[derive(Debug, Clone)]
10611061
pub struct MinAccumulator {
10621062
min: ScalarValue,
10631063
}

0 commit comments

Comments
 (0)