From a1a4eefb06f89a294fdae3f22f50e2b55c08c4a6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 19 Aug 2025 07:25:41 -0500 Subject: [PATCH 01/16] Fix HashJoinExec sideways information passing for partitioned queries --- .../physical-plan/src/joins/hash_join.rs | 374 ++++++++++++++---- .../test_files/push_down_filter.slt | 75 ++++ 2 files changed, 383 insertions(+), 66 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 65b9a54f9ae6..b37f1ce817e1 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -20,7 +20,7 @@ use std::fmt; use std::mem::size_of; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::task::Poll; use std::{any::Any, vec}; @@ -98,6 +98,151 @@ use parking_lot::Mutex; const HASH_JOIN_SEED: RandomState = RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +/// Coordinates dynamic filter bounds collection across multiple partitions +/// +/// This structure ensures that dynamic filters are built with complete information from all +/// relevant partitions before being applied to probe-side scans. Incomplete filters would +/// incorrectly eliminate valid join results. +/// +/// ## Synchronization Strategy +/// +/// 1. Each partition computes bounds from its build-side data +/// 2. Bounds are stored in the shared HashMap (indexed by partition_id) +/// 3. A counter tracks how many partitions have reported their bounds +/// 4. When the last partition reports (completed == total), bounds are merged and filter is updated +/// +/// ## Partition Counting +/// +/// The `total_partitions` count represents how many times `collect_build_side` will be called: +/// - **CollectLeft**: Number of output partitions (each accesses shared build data) +/// - **Partitioned**: Number of input partitions (each builds independently) +/// +/// ## Thread Safety +/// +/// All fields use atomic operations or mutexes to ensure correct coordination between concurrent +/// partition executions. +struct SharedBoundsAccumulator { + /// Bounds from completed partitions. + bounds: Mutex>>, + /// Number of partitions that have reported completion. + completed_partitions: AtomicUsize, + /// Total number of partitions. + /// Need to know this so that we can update the dynamic filter once we are done + /// building *all* of the hash tables. + total_partitions: usize, +} + +impl SharedBoundsAccumulator { + /// Creates a new SharedBoundsAccumulator configured for the given partition mode + /// + /// This method calculates how many times `collect_build_side` will be called based on the + /// partition mode's execution pattern. This count is critical for determining when we have + /// complete information from all partitions to build the dynamic filter. + /// + /// ## Partition Mode Execution Patterns + /// + /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut` + /// across all output partitions. Each output partition calls `collect_build_side` to access + /// the shared build data. Expected calls = number of output partitions. + /// + /// - **Partitioned**: Each partition independently builds its own hash table by calling + /// `collect_build_side` once. Expected calls = number of build partitions. + /// + /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since + /// the actual mode will be determined and a new bounds_accumulator created before execution. + /// + /// ## Why This Matters + /// + /// We cannot build a partial filter from some partitions - it would incorrectly eliminate + /// valid join results. We must wait until we have complete bounds information from ALL + /// relevant partitions before updating the dynamic filter. + fn new_from_partition_mode( + partition_mode: PartitionMode, + left_child: &dyn ExecutionPlan, + right_child: &dyn ExecutionPlan, + ) -> Self { + // Troubleshooting: If partition counts are incorrect, verify this logic matches + // the actual execution pattern in collect_build_side() + let expected_calls = match partition_mode { + // Each output partition accesses shared build data + PartitionMode::CollectLeft => { + right_child.output_partitioning().partition_count() + } + // Each partition builds its own data + PartitionMode::Partitioned => { + left_child.output_partitioning().partition_count() + } + // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) + PartitionMode::Auto => 1, + }; + Self { + bounds: Mutex::new(Vec::new()), + completed_partitions: AtomicUsize::new(0), + total_partitions: expected_calls, + } + } + + /// Merge all bounds from completed partitions into global min/max. + /// + /// Troubleshooting: If this returns None when you expect bounds, check: + /// 1. All partitions called collect_build_side with bounds data + /// 2. collect_left_input was called with should_compute_bounds=true + /// 3. The build side had at least one non-empty batch + fn merge_bounds(&self) -> Option> { + let bounds = self.bounds.lock(); + let all_bounds: Vec<_> = bounds.iter().collect(); + + if all_bounds.is_empty() { + return None; + } + + let num_columns = all_bounds[0].len(); + let mut merged = Vec::with_capacity(num_columns); + + for col_idx in 0..num_columns { + let mut global_min = None; + let mut global_max = None; + + for partition_bounds in &all_bounds { + if let Some((min_val, max_val)) = partition_bounds.get(col_idx) { + global_min = match global_min { + None => Some(min_val.clone()), + Some(current_min) => Some(if min_val < ¤t_min { + min_val.clone() + } else { + current_min + }), + }; + global_max = match global_max { + None => Some(max_val.clone()), + Some(current_max) => Some(if max_val > ¤t_max { + max_val.clone() + } else { + current_max + }), + }; + } + } + + if let (Some(min), Some(max)) = (global_min, global_max) { + merged.push((min, max)); + } + } + + if merged.is_empty() { + None + } else { + Some(merged) + } + } +} + +impl fmt::Debug for SharedBoundsAccumulator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SharedBoundsAccumulator") + } +} + /// HashTable and input data for the left (build side) of a join struct JoinLeftData { /// The hash table with indices into `batch` @@ -116,6 +261,8 @@ struct JoinLeftData { /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption. /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle. _reservation: MemoryReservation, + /// Bounds computed from the build side for dynamic filter pushdown + bounds: Option>, } impl JoinLeftData { @@ -127,6 +274,7 @@ impl JoinLeftData { visited_indices_bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, reservation: MemoryReservation, + bounds: Option>, ) -> Self { Self { hash_map, @@ -135,6 +283,7 @@ impl JoinLeftData { visited_indices_bitmap, probe_threads_counter, _reservation: reservation, + bounds, } } @@ -365,7 +514,10 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side - dynamic_filter: Option>, + dynamic_filter: Arc, + /// Shared bounds accumulator for coordinating dynamic filter updates across partitions + /// Lazily initialized at execution time to use actual runtime partition counts + bounds_accumulator: Arc>>, } impl fmt::Debug for HashJoinExec { @@ -434,6 +586,10 @@ impl HashJoinExec { projection.as_ref(), )?; + let dynamic_filter = Self::create_dynamic_filter(&on); + + let bounds_accumulator = Arc::new(OnceLock::new()); + Ok(HashJoinExec { left, right, @@ -449,7 +605,8 @@ impl HashJoinExec { column_indices, null_equality, cache, - dynamic_filter: None, + dynamic_filter, + bounds_accumulator, }) } @@ -706,14 +863,11 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); - let dynamic_filter_display = match self.dynamic_filter.as_ref() { - Some(dynamic_filter) => match dynamic_filter.current() { - Ok(current) if current != lit(true) => { - format!(", filter=[{current}]") - } - _ => "".to_string(), - }, - None => "".to_string(), + let dynamic_filter_display = match self.dynamic_filter.current() { + Ok(current) if current != lit(true) => { + format!(", filter=[{current}]") + } + _ => "".to_string(), }; write!( f, @@ -813,6 +967,11 @@ impl ExecutionPlan for HashJoinExec { vec![&self.left, &self.right] } + /// Creates a new HashJoinExec with different children while preserving configuration. + /// + /// This method is called during query optimization when the optimizer creates new + /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new` + /// rather than cloning the existing one because partitioning may have changed. fn with_new_children( self: Arc, children: Vec>, @@ -847,7 +1006,8 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter: None, + dynamic_filter: Self::create_dynamic_filter(&self.on), + bounds_accumulator: Arc::new(OnceLock::new()), })) } @@ -906,10 +1066,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), - enable_dynamic_filter_pushdown - .then_some(self.dynamic_filter.clone()) - .flatten(), - on_right.clone(), + enable_dynamic_filter_pushdown, )) })?, PartitionMode::Partitioned => { @@ -927,10 +1084,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, - enable_dynamic_filter_pushdown - .then_some(self.dynamic_filter.clone()) - .flatten(), - on_right.clone(), + enable_dynamic_filter_pushdown, )) } PartitionMode::Auto => { @@ -943,6 +1097,15 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); + // Initialize bounds_accumulator lazily with runtime partition counts + let bounds_accumulator = Arc::clone(self.bounds_accumulator.get_or_init(|| { + Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + )) + })); + // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; @@ -971,6 +1134,9 @@ impl ExecutionPlan for HashJoinExec { batch_size, hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), + bounds_accumulator, + dynamic_filter: enable_dynamic_filter_pushdown + .then_some(Arc::clone(&self.dynamic_filter)), })) } @@ -1128,7 +1294,8 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter: Some(dynamic_filter), + dynamic_filter: dynamic_filter, + bounds_accumulator: Arc::new(OnceLock::new()), }); result = result.with_updated_node(new_node as Arc); } @@ -1159,9 +1326,35 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result .collect() } -/// Reads the left (build) side of the input, buffering it in memory, to build a -/// hash table (`LeftJoinData`) #[expect(clippy::too_many_arguments)] +/// Collects all batches from the left (build) side stream and creates a hash map for joining. +/// +/// This function is responsible for: +/// 1. Consuming the entire left stream and collecting all batches into memory +/// 2. Building a hash map from the join key columns for efficient probe operations +/// 3. Computing bounds for dynamic filter pushdown (if enabled) +/// 4. Preparing visited indices bitmap for certain join types +/// +/// # Parameters +/// * `random_state` - Random state for consistent hashing across partitions +/// * `left_stream` - Stream of record batches from the build side +/// * `on_left` - Physical expressions for the left side join keys +/// * `metrics` - Metrics collector for tracking memory usage and row counts +/// * `reservation` - Memory reservation tracker for the hash table and data +/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) +/// * `probe_threads_count` - Number of threads that will probe this hash table +/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering +/// +/// # Dynamic Filter Coordination +/// When `should_compute_bounds` is true, this function computes the min/max bounds +/// for each join key column but does NOT update the dynamic filter. Instead, the +/// bounds are stored in the returned `JoinLeftData` and later coordinated by +/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds +/// before updating the filter exactly once. +/// +/// # Returns +/// `JoinLeftData` containing the hash map, consolidated batch, join key values, +/// visited indices bitmap, and computed bounds (if requested). async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1170,8 +1363,7 @@ async fn collect_left_input( reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, - dynamic_filter: Option>, - on_right: Vec, + should_compute_bounds: bool, ) -> Result { let schema = left_stream.schema(); @@ -1261,6 +1453,13 @@ async fn collect_left_input( }) .collect::>>()?; + // Compute bounds for dynamic filter if enabled + let bounds = if should_compute_bounds && num_rows > 0 { + Some(compute_bounds(&left_values)?) + } else { + None + }; + let data = JoinLeftData::new( hashmap, single_batch, @@ -1268,49 +1467,9 @@ async fn collect_left_input( Mutex::new(visited_indices_bitmap), AtomicUsize::new(probe_threads_count), reservation, + bounds, ); - // Update dynamic filter with min/max bounds if provided - if let Some(dynamic_filter) = dynamic_filter { - if num_rows > 0 { - let bounds = compute_bounds(&left_values)?; - - // Create range predicates for each join key - let mut predicates = Vec::with_capacity(bounds.len()); - for ((min_val, max_val), right_expr) in bounds.iter().zip(on_right.iter()) { - // Create predicate: col >= min AND col <= max - let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::GtEq, - lit(min_val.clone()), - )) as Arc; - - let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::LtEq, - lit(max_val.clone()), - )) as Arc; - - let range_expr = - Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) - as Arc; - - predicates.push(range_expr); - } - - // Combine all predicates with AND - let combined_predicate = predicates - .into_iter() - .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::And, pred)) - as Arc - }) - .unwrap_or_else(|| lit(true)); - - dynamic_filter.update(combined_predicate)?; - } - } - Ok(data) } @@ -1506,6 +1665,10 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, + /// Shared bounds accumulator for coordinating dynamic filter updates + bounds_accumulator: Arc, + /// Dynamic filter for pushdown to probe side + dynamic_filter: Option>, } impl RecordBatchStream for HashJoinStream { @@ -1695,12 +1858,91 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); + // Handle dynamic filter bounds accumulation + // + // This coordination ensures the dynamic filter contains complete bounds information + // from all relevant partitions before being applied to probe-side scans. + // + // Process: + // 1. Store this partition's bounds in the shared accumulator + // 2. Atomically increment the completion counter + // 3. If we're the last partition to complete, merge all bounds and update the filter + // + // Note: In CollectLeft mode, multiple partitions may access the SAME build data + // (shared via OnceFut), but each partition must report separately to ensure proper + // coordination across all output partitions. + // + // The consequences of not doing this synchronization properly would be that a filter + // with incomplete bounds would be pushed down resulting in incorrect results (missing rows). + if let Some(dynamic_filter) = &self.dynamic_filter { + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = &left_data.bounds { + // Only push actual bounds if they exist + self.bounds_accumulator.bounds.lock().push(bounds.clone()); + } + + // Atomically increment the completion counter + // Even empty partitions must report to ensure proper termination + let completed = self + .bounds_accumulator + .completed_partitions + .fetch_add(1, Ordering::SeqCst) + + 1; + let total_partitions = self.bounds_accumulator.total_partitions; + + // Critical synchronization point: Only the last partition updates the filter + // Troubleshooting: If you see "completed > total_partitions", check partition + // count calculation in try_new() - it may not match actual execution calls + if completed == total_partitions { + if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() { + let filter_expr = self.create_filter_from_bounds(merged_bounds)?; + dynamic_filter.update(filter_expr)?; + } + } + } + self.state = HashJoinStreamState::FetchProbeBatch; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) } + /// Create a filter expression from merged bounds + fn create_filter_from_bounds( + &self, + bounds: Vec<(ScalarValue, ScalarValue)>, + ) -> Result> { + // Create range predicates for each join key + let mut predicates = Vec::with_capacity(bounds.len()); + for ((min_val, max_val), right_expr) in bounds.iter().zip(self.on_right.iter()) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(min_val.clone()), + )) as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(max_val.clone()), + )) as Arc; + let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc; + predicates.push(range_expr); + } + + // Combine all predicates with AND + let combined_predicate = predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap_or_else(|| lit(true)); + + Ok(combined_predicate) + } + /// Fetches next batch from probe-side /// /// If non-empty batch has been fetched, updates state to `ProcessProbeBatchState`, diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index c999aa71fe5b..a7e3338df367 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -320,3 +320,78 @@ drop table large_table; statement ok drop table t; + +# Regression test for https://github.com/apache/datafusion/issues/17188 +query I +COPY (select i as k from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter/t1.parquet' +STORED AS PARQUET; +---- +10000000 + +query I +COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter/t2.parquet' +STORED AS PARQUET; +---- +10000000 + +statement ok +create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; + +statement ok +create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; + +# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random +# So we'll run the same query a couple of times just to have more certainty it's fixed +# Sorry about the spam in this slt test... + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 From 51a3d310cee23869ae42840334e7f3faf5684c17 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 19 Aug 2025 08:30:13 -0500 Subject: [PATCH 02/16] cleanup --- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index b37f1ce817e1..8ecec2ca87c1 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1294,7 +1294,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter: dynamic_filter, + dynamic_filter, bounds_accumulator: Arc::new(OnceLock::new()), }); result = result.with_updated_node(new_node as Arc); From c93530b8f84095fce82852a6e3a7a481f1c3bde4 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:42:21 -0500 Subject: [PATCH 03/16] pre-allocate --- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 8ecec2ca87c1..2fef63b4841f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -176,7 +176,7 @@ impl SharedBoundsAccumulator { PartitionMode::Auto => 1, }; Self { - bounds: Mutex::new(Vec::new()), + bounds: Mutex::new(Vec::with_capacity(expected_calls)), completed_partitions: AtomicUsize::new(0), total_partitions: expected_calls, } From e4baf7b01b3330e74dc421827011f020b340eb9b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 19 Aug 2025 18:27:03 -0500 Subject: [PATCH 04/16] make scope unreachable --- datafusion/physical-plan/src/joins/hash_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 2fef63b4841f..5d9fc2fce871 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -173,7 +173,7 @@ impl SharedBoundsAccumulator { left_child.output_partitioning().partition_count() } // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) - PartitionMode::Auto => 1, + PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), }; Self { bounds: Mutex::new(Vec::with_capacity(expected_calls)), From 52cfc85d5fe792e22cc2cdc3c3702ff439170f47 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 09:19:57 -0500 Subject: [PATCH 05/16] refactor --- .../physical-plan/src/joins/hash_join.rs | 356 +++++++++++------- 1 file changed, 218 insertions(+), 138 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 5d9fc2fce871..c4bde62c33f2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -98,6 +98,45 @@ use parking_lot::Mutex; const HASH_JOIN_SEED: RandomState = RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +/// Represents the minimum and maximum values for a specific column. +/// Used in dynamic filter pushdown to establish value boundaries. +#[derive(Debug, Clone, PartialEq)] +struct ColumnBounds { + /// The minimum value observed for this column + min: ScalarValue, + /// The maximum value observed for this column + max: ScalarValue, +} + +impl ColumnBounds { + fn new(min: ScalarValue, max: ScalarValue) -> Self { + Self { min, max } + } +} + +/// Represents the bounds for all join key columns from a single partition. +/// This contains the min/max values computed from one partition's build-side data. +#[derive(Debug, Clone)] +struct PartitionBounds { + /// Min/max bounds for each join key column in this partition. + /// Index corresponds to the join key expression index. + column_bounds: Vec, +} + +impl PartitionBounds { + fn new(column_bounds: Vec) -> Self { + Self { column_bounds } + } + + fn len(&self) -> usize { + self.column_bounds.len() + } + + fn get(&self, index: usize) -> Option<&ColumnBounds> { + self.column_bounds.get(index) + } +} + /// Coordinates dynamic filter bounds collection across multiple partitions /// /// This structure ensures that dynamic filters are built with complete information from all @@ -123,13 +162,18 @@ const HASH_JOIN_SEED: RandomState = /// partition executions. struct SharedBoundsAccumulator { /// Bounds from completed partitions. - bounds: Mutex>>, + /// Each element represents the column bounds computed by one partition. + bounds: Mutex>, /// Number of partitions that have reported completion. completed_partitions: AtomicUsize, /// Total number of partitions. /// Need to know this so that we can update the dynamic filter once we are done /// building *all* of the hash tables. total_partitions: usize, + /// Dynamic filter for pushdown to probe side + dynamic_filter: Arc, + /// Right side join expressions needed for creating filter bounds + on_right: Vec, } impl SharedBoundsAccumulator { @@ -160,6 +204,8 @@ impl SharedBoundsAccumulator { partition_mode: PartitionMode, left_child: &dyn ExecutionPlan, right_child: &dyn ExecutionPlan, + dynamic_filter: Arc, + on_right: Vec, ) -> Self { // Troubleshooting: If partition counts are incorrect, verify this logic matches // the actual execution pattern in collect_build_side() @@ -179,44 +225,48 @@ impl SharedBoundsAccumulator { bounds: Mutex::new(Vec::with_capacity(expected_calls)), completed_partitions: AtomicUsize::new(0), total_partitions: expected_calls, + dynamic_filter, + on_right, } } /// Merge all bounds from completed partitions into global min/max. /// + /// This combines bounds from all partitions by computing the global minimum and maximum + /// for each join key column across all partitions. + /// /// Troubleshooting: If this returns None when you expect bounds, check: /// 1. All partitions called collect_build_side with bounds data /// 2. collect_left_input was called with should_compute_bounds=true /// 3. The build side had at least one non-empty batch - fn merge_bounds(&self) -> Option> { + fn merge_bounds(&self) -> Option> { let bounds = self.bounds.lock(); - let all_bounds: Vec<_> = bounds.iter().collect(); - if all_bounds.is_empty() { + if bounds.is_empty() { return None; } - let num_columns = all_bounds[0].len(); + let num_columns = bounds[0].len(); let mut merged = Vec::with_capacity(num_columns); for col_idx in 0..num_columns { let mut global_min = None; let mut global_max = None; - for partition_bounds in &all_bounds { - if let Some((min_val, max_val)) = partition_bounds.get(col_idx) { + for partition_bounds in bounds.iter() { + if let Some(column_bounds) = partition_bounds.get(col_idx) { global_min = match global_min { - None => Some(min_val.clone()), - Some(current_min) => Some(if min_val < ¤t_min { - min_val.clone() + None => Some(column_bounds.min.clone()), + Some(current_min) => Some(if column_bounds.min < current_min { + column_bounds.min.clone() } else { current_min }), }; global_max = match global_max { - None => Some(max_val.clone()), - Some(current_max) => Some(if max_val > ¤t_max { - max_val.clone() + None => Some(column_bounds.max.clone()), + Some(current_max) => Some(if column_bounds.max > current_max { + column_bounds.max.clone() } else { current_max }), @@ -225,7 +275,7 @@ impl SharedBoundsAccumulator { } if let (Some(min), Some(max)) = (global_min, global_max) { - merged.push((min, max)); + merged.push(ColumnBounds::new(min, max)); } } @@ -235,6 +285,81 @@ impl SharedBoundsAccumulator { Some(merged) } } + + /// Create a filter expression from merged bounds + fn create_filter_from_bounds( + &self, + bounds: Vec, + ) -> Result> { + // Create range predicates for each join key + let mut predicates = Vec::with_capacity(bounds.len()); + for (column_bounds, right_expr) in bounds.iter().zip(self.on_right.iter()) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) as Arc; + let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc; + predicates.push(range_expr); + } + + // Combine all predicates with AND + let combined_predicate = predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap_or_else(|| lit(true)); + + Ok(combined_predicate) + } + + /// Report bounds from a completed partition and update dynamic filter if all partitions are done + /// + /// This method coordinates the dynamic filter updates across all partitions. It stores the + /// bounds from the current partition, increments the completion counter, and when all + /// partitions have reported, merges their bounds and updates the dynamic filter. + /// + /// # Arguments + /// * `partition_bounds` - The bounds computed by this partition (if any) + /// + /// # Returns + /// * `Result<()>` - Ok if successful, Err if filter update failed + fn report_partition_bounds( + &self, + partition_bounds: Option>, + ) -> Result<()> { + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = partition_bounds { + // Only push actual bounds if they exist + self.bounds.lock().push(PartitionBounds::new(bounds)); + } + + // Atomically increment the completion counter + // Even empty partitions must report to ensure proper termination + let completed = self.completed_partitions.fetch_add(1, Ordering::SeqCst) + 1; + let total_partitions = self.total_partitions; + + // Critical synchronization point: Only update the filter when ALL partitions are complete + // Troubleshooting: If you see "completed > total_partitions", check partition + // count calculation in new_from_partition_mode() - it may not match actual execution calls + if completed == total_partitions { + if let Some(merged_bounds) = self.merge_bounds() { + let filter_expr = self.create_filter_from_bounds(merged_bounds)?; + self.dynamic_filter.update(filter_expr)?; + } + } + + Ok(()) + } } impl fmt::Debug for SharedBoundsAccumulator { @@ -262,7 +387,7 @@ struct JoinLeftData { /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle. _reservation: MemoryReservation, /// Bounds computed from the build side for dynamic filter pushdown - bounds: Option>, + bounds: Option>, } impl JoinLeftData { @@ -274,7 +399,7 @@ impl JoinLeftData { visited_indices_bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, reservation: MemoryReservation, - bounds: Option>, + bounds: Option>, ) -> Self { Self { hash_map, @@ -514,10 +639,12 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side - dynamic_filter: Arc, + /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result + dynamic_filter: Option>, /// Shared bounds accumulator for coordinating dynamic filter updates across partitions + /// Only created when dynamic filter pushdown is enabled. /// Lazily initialized at execution time to use actual runtime partition counts - bounds_accumulator: Arc>>, + bounds_accumulator: Option>>, } impl fmt::Debug for HashJoinExec { @@ -586,9 +713,8 @@ impl HashJoinExec { projection.as_ref(), )?; - let dynamic_filter = Self::create_dynamic_filter(&on); - - let bounds_accumulator = Arc::new(OnceLock::new()); + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled Ok(HashJoinExec { left, @@ -605,8 +731,8 @@ impl HashJoinExec { column_indices, null_equality, cache, - dynamic_filter, - bounds_accumulator, + dynamic_filter: None, + bounds_accumulator: None, }) } @@ -863,12 +989,7 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); - let dynamic_filter_display = match self.dynamic_filter.current() { - Ok(current) if current != lit(true) => { - format!(", filter=[{current}]") - } - _ => "".to_string(), - }; + let dynamic_filter_display = "".to_string(); write!( f, "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", @@ -976,21 +1097,28 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { - let new_join = HashJoinExec::try_new( - Arc::clone(&children[0]), - Arc::clone(&children[1]), - self.on.clone(), - self.filter.clone(), - &self.join_type, - self.projection.clone(), - self.mode, - self.null_equality, - )?; - Ok(Arc::new(new_join)) + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&children[0]), + right: Arc::clone(&children[1]), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + // Keep the dynamic filter, bounds accumulator will be reset + dynamic_filter: self.dynamic_filter.clone(), + bounds_accumulator: None, + })) } fn reset_state(self: Arc) -> Result> { - // Reset the left_fut to allow re-execution Ok(Arc::new(HashJoinExec { left: Arc::clone(&self.left), right: Arc::clone(&self.right), @@ -998,6 +1126,7 @@ impl ExecutionPlan for HashJoinExec { filter: self.filter.clone(), join_type: self.join_type, join_schema: Arc::clone(&self.join_schema), + // Reset the left_fut to allow re-execution left_fut: Arc::new(OnceAsync::default()), random_state: self.random_state.clone(), mode: self.mode, @@ -1006,8 +1135,9 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter: Self::create_dynamic_filter(&self.on), - bounds_accumulator: Arc::new(OnceLock::new()), + // Reset dynamic filter and bounds accumulator to initial state + dynamic_filter: None, + bounds_accumulator: None, })) } @@ -1021,7 +1151,7 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| Arc::clone(&on.0)) .collect::>(); - let on_right = self + let _on_right = self .on .iter() .map(|on| Arc::clone(&on.1)) @@ -1097,14 +1227,33 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); - // Initialize bounds_accumulator lazily with runtime partition counts - let bounds_accumulator = Arc::clone(self.bounds_accumulator.get_or_init(|| { - Arc::new(SharedBoundsAccumulator::new_from_partition_mode( - self.mode, - self.left.as_ref(), - self.right.as_ref(), - )) - })); + // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled) + let bounds_accumulator = if enable_dynamic_filter_pushdown + && self.dynamic_filter.is_some() + { + if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator { + let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap()); + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + + Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| { + Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + dynamic_filter, + on_right, + )) + }))) + } else { + None + } + } else { + None + }; // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. @@ -1119,6 +1268,12 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + Ok(Box::pin(HashJoinStream { schema: self.schema(), on_right, @@ -1135,8 +1290,6 @@ impl ExecutionPlan for HashJoinExec { hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), bounds_accumulator, - dynamic_filter: enable_dynamic_filter_pushdown - .then_some(Arc::clone(&self.dynamic_filter)), })) } @@ -1294,8 +1447,8 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter, - bounds_accumulator: Arc::new(OnceLock::new()), + dynamic_filter: Some(dynamic_filter), + bounds_accumulator: Some(OnceLock::new()), }); result = result.with_updated_node(new_node as Arc); } @@ -1305,13 +1458,13 @@ impl ExecutionPlan for HashJoinExec { } /// Compute min/max bounds for each column in the given arrays -fn compute_bounds(arrays: &[ArrayRef]) -> Result> { +fn compute_bounds(arrays: &[ArrayRef]) -> Result> { arrays .iter() .map(|array| { if array.is_empty() { // Return NULL values for empty arrays - return Ok(( + return Ok(ColumnBounds::new( ScalarValue::try_from(array.data_type())?, ScalarValue::try_from(array.data_type())?, )); @@ -1321,7 +1474,7 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result let min_val = min_batch(array)?; let max_val = max_batch(array)?; - Ok((min_val, max_val)) + Ok(ColumnBounds::new(min_val, max_val)) }) .collect() } @@ -1665,10 +1818,8 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, - /// Shared bounds accumulator for coordinating dynamic filter updates - bounds_accumulator: Arc, - /// Dynamic filter for pushdown to probe side - dynamic_filter: Option>, + /// Shared bounds accumulator for coordinating dynamic filter updates (optional) + bounds_accumulator: Option>, } impl RecordBatchStream for HashJoinStream { @@ -1860,45 +2011,10 @@ impl HashJoinStream { // Handle dynamic filter bounds accumulation // - // This coordination ensures the dynamic filter contains complete bounds information - // from all relevant partitions before being applied to probe-side scans. - // - // Process: - // 1. Store this partition's bounds in the shared accumulator - // 2. Atomically increment the completion counter - // 3. If we're the last partition to complete, merge all bounds and update the filter - // - // Note: In CollectLeft mode, multiple partitions may access the SAME build data - // (shared via OnceFut), but each partition must report separately to ensure proper - // coordination across all output partitions. - // - // The consequences of not doing this synchronization properly would be that a filter - // with incomplete bounds would be pushed down resulting in incorrect results (missing rows). - if let Some(dynamic_filter) = &self.dynamic_filter { - // Store bounds in the accumulator - this runs once per partition - if let Some(bounds) = &left_data.bounds { - // Only push actual bounds if they exist - self.bounds_accumulator.bounds.lock().push(bounds.clone()); - } - - // Atomically increment the completion counter - // Even empty partitions must report to ensure proper termination - let completed = self - .bounds_accumulator - .completed_partitions - .fetch_add(1, Ordering::SeqCst) - + 1; - let total_partitions = self.bounds_accumulator.total_partitions; - - // Critical synchronization point: Only the last partition updates the filter - // Troubleshooting: If you see "completed > total_partitions", check partition - // count calculation in try_new() - it may not match actual execution calls - if completed == total_partitions { - if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() { - let filter_expr = self.create_filter_from_bounds(merged_bounds)?; - dynamic_filter.update(filter_expr)?; - } - } + // Dynamic filter coordination between partitions: + // Report bounds to the accumulator which will handle synchronization and filter updates + if let Some(ref bounds_accumulator) = self.bounds_accumulator { + bounds_accumulator.report_partition_bounds(left_data.bounds.clone())?; } self.state = HashJoinStreamState::FetchProbeBatch; @@ -1907,42 +2023,6 @@ impl HashJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } - /// Create a filter expression from merged bounds - fn create_filter_from_bounds( - &self, - bounds: Vec<(ScalarValue, ScalarValue)>, - ) -> Result> { - // Create range predicates for each join key - let mut predicates = Vec::with_capacity(bounds.len()); - for ((min_val, max_val), right_expr) in bounds.iter().zip(self.on_right.iter()) { - // Create predicate: col >= min AND col <= max - let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::GtEq, - lit(min_val.clone()), - )) as Arc; - let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::LtEq, - lit(max_val.clone()), - )) as Arc; - let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) - as Arc; - predicates.push(range_expr); - } - - // Combine all predicates with AND - let combined_predicate = predicates - .into_iter() - .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::And, pred)) - as Arc - }) - .unwrap_or_else(|| lit(true)); - - Ok(combined_predicate) - } - /// Fetches next batch from probe-side /// /// If non-empty batch has been fetched, updates state to `ProcessProbeBatchState`, From 212d2a650178944341329f2cde782fe27cdb4cb1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 09:21:20 -0500 Subject: [PATCH 06/16] Rename method --- datafusion/physical-plan/src/joins/hash_join.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index c4bde62c33f2..45dcc72e1315 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -132,7 +132,7 @@ impl PartitionBounds { self.column_bounds.len() } - fn get(&self, index: usize) -> Option<&ColumnBounds> { + fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> { self.column_bounds.get(index) } } @@ -254,7 +254,7 @@ impl SharedBoundsAccumulator { let mut global_max = None; for partition_bounds in bounds.iter() { - if let Some(column_bounds) = partition_bounds.get(col_idx) { + if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { global_min = match global_min { None => Some(column_bounds.min.clone()), Some(current_min) => Some(if column_bounds.min < current_min { From a00a84d00a8dfa4f3710303abc72723fc12b432a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 09:28:50 -0500 Subject: [PATCH 07/16] fix locking --- .../physical_optimizer/filter_pushdown/mod.rs | 8 ++-- .../physical-plan/src/joins/hash_join.rs | 39 ++++++++++++------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 1a04753966a2..6b08dfb2e29b 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -267,7 +267,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { format_plan_for_test(&plan), @r" - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] " @@ -936,7 +936,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] " @@ -1082,9 +1082,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] " diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 45dcc72e1315..3def97a78681 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -158,14 +158,11 @@ impl PartitionBounds { /// /// ## Thread Safety /// -/// All fields use atomic operations or mutexes to ensure correct coordination between concurrent +/// All fields use a single mutex to ensure correct coordination between concurrent /// partition executions. struct SharedBoundsAccumulator { - /// Bounds from completed partitions. - /// Each element represents the column bounds computed by one partition. - bounds: Mutex>, - /// Number of partitions that have reported completion. - completed_partitions: AtomicUsize, + /// Shared state protected by a single mutex to avoid ordering concerns + inner: Mutex, /// Total number of partitions. /// Need to know this so that we can update the dynamic filter once we are done /// building *all* of the hash tables. @@ -176,6 +173,15 @@ struct SharedBoundsAccumulator { on_right: Vec, } +/// State protected by SharedBoundsAccumulator's mutex +struct SharedBoundsState { + /// Bounds from completed partitions. + /// Each element represents the column bounds computed by one partition. + bounds: Vec, + /// Number of partitions that have reported completion. + completed_partitions: usize, +} + impl SharedBoundsAccumulator { /// Creates a new SharedBoundsAccumulator configured for the given partition mode /// @@ -222,8 +228,10 @@ impl SharedBoundsAccumulator { PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), }; Self { - bounds: Mutex::new(Vec::with_capacity(expected_calls)), - completed_partitions: AtomicUsize::new(0), + inner: Mutex::new(SharedBoundsState { + bounds: Vec::with_capacity(expected_calls), + completed_partitions: 0, + }), total_partitions: expected_calls, dynamic_filter, on_right, @@ -239,9 +247,7 @@ impl SharedBoundsAccumulator { /// 1. All partitions called collect_build_side with bounds data /// 2. collect_left_input was called with should_compute_bounds=true /// 3. The build side had at least one non-empty batch - fn merge_bounds(&self) -> Option> { - let bounds = self.bounds.lock(); - + fn merge_bounds(&self, bounds: &[PartitionBounds]) -> Option> { if bounds.is_empty() { return None; } @@ -337,22 +343,25 @@ impl SharedBoundsAccumulator { &self, partition_bounds: Option>, ) -> Result<()> { + let mut inner = self.inner.lock(); + // Store bounds in the accumulator - this runs once per partition if let Some(bounds) = partition_bounds { // Only push actual bounds if they exist - self.bounds.lock().push(PartitionBounds::new(bounds)); + inner.bounds.push(PartitionBounds::new(bounds)); } - // Atomically increment the completion counter + // Increment the completion counter // Even empty partitions must report to ensure proper termination - let completed = self.completed_partitions.fetch_add(1, Ordering::SeqCst) + 1; + inner.completed_partitions += 1; + let completed = inner.completed_partitions; let total_partitions = self.total_partitions; // Critical synchronization point: Only update the filter when ALL partitions are complete // Troubleshooting: If you see "completed > total_partitions", check partition // count calculation in new_from_partition_mode() - it may not match actual execution calls if completed == total_partitions { - if let Some(merged_bounds) = self.merge_bounds() { + if let Some(merged_bounds) = self.merge_bounds(&inner.bounds) { let filter_expr = self.create_filter_from_bounds(merged_bounds)?; self.dynamic_filter.update(filter_expr)?; } From 6d2563224057fc9f59f49d925363c2f395c0d4a3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 10:19:33 -0500 Subject: [PATCH 08/16] cleanup --- datafusion/core/tests/dataframe/mod.rs | 53 +++---- .../physical_optimizer/filter_pushdown/mod.rs | 8 +- .../physical-plan/src/joins/hash_join.rs | 131 +++++++----------- 3 files changed, 82 insertions(+), 110 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 38dc0dc73569..0cbe0f19c68c 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2935,32 +2935,33 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), @r" - +---------------+---------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+---------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Projection: t1.a, t1.b | - | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | - | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | - | | Left Join: t1.a = __scalar_sq_1.a | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __scalar_sq_1 | - | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | - | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | - | | TableScan: t2 projection=[a] | - | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | - | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | - | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+---------------------------------------------------------------------------------------------------------------------------+ + +---------------+-----------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Projection: t1.a, t1.b | + | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | + | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | + | | Left Join: t1.a = __scalar_sq_1.a | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __scalar_sq_1 | + | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | + | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | + | | TableScan: t2 projection=[a] | + | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | + | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=4 | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+-----------------------------------------------------------------------------------------------------------------------------+ " ); diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 6b08dfb2e29b..52e69e4b012a 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -890,7 +890,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { None, &JoinType::Inner, None, - PartitionMode::Partitioned, + PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, ) .unwrap(), @@ -902,12 +902,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() { @r" OptimizationTest: input: - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true output: Ok: - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] ", @@ -936,7 +936,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] " diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 3def97a78681..d728caf9aa18 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -238,89 +238,68 @@ impl SharedBoundsAccumulator { } } - /// Merge all bounds from completed partitions into global min/max. + /// Create a filter expression from individual partition bounds using OR logic. /// - /// This combines bounds from all partitions by computing the global minimum and maximum - /// for each join key column across all partitions. + /// This creates a filter where each partition's bounds form a conjunction (AND) + /// of column range predicates, and all partitions are combined with OR. /// - /// Troubleshooting: If this returns None when you expect bounds, check: - /// 1. All partitions called collect_build_side with bounds data - /// 2. collect_left_input was called with should_compute_bounds=true - /// 3. The build side had at least one non-empty batch - fn merge_bounds(&self, bounds: &[PartitionBounds]) -> Option> { + /// For example, with 2 partitions and 2 columns: + /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) + /// OR + /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1)) + fn create_filter_from_partition_bounds( + &self, + bounds: &[PartitionBounds], + ) -> Result> { if bounds.is_empty() { - return None; + return Ok(lit(true)); } - let num_columns = bounds[0].len(); - let mut merged = Vec::with_capacity(num_columns); + // Create a predicate for each partition + let mut partition_predicates = Vec::with_capacity(bounds.len()); - for col_idx in 0..num_columns { - let mut global_min = None; - let mut global_max = None; + for partition_bounds in bounds.iter() { + // Create range predicates for each join key in this partition + let mut column_predicates = Vec::with_capacity(partition_bounds.len()); - for partition_bounds in bounds.iter() { + for (col_idx, right_expr) in self.on_right.iter().enumerate() { if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { - global_min = match global_min { - None => Some(column_bounds.min.clone()), - Some(current_min) => Some(if column_bounds.min < current_min { - column_bounds.min.clone() - } else { - current_min - }), - }; - global_max = match global_max { - None => Some(column_bounds.max.clone()), - Some(current_max) => Some(if column_bounds.max > current_max { - column_bounds.max.clone() - } else { - current_max - }), - }; + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) as Arc; + let range_expr = + Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc; + column_predicates.push(range_expr); } } - if let (Some(min), Some(max)) = (global_min, global_max) { - merged.push(ColumnBounds::new(min, max)); + // Combine all column predicates for this partition with AND + if !column_predicates.is_empty() { + let partition_predicate = column_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap(); + partition_predicates.push(partition_predicate); } } - if merged.is_empty() { - None - } else { - Some(merged) - } - } - - /// Create a filter expression from merged bounds - fn create_filter_from_bounds( - &self, - bounds: Vec, - ) -> Result> { - // Create range predicates for each join key - let mut predicates = Vec::with_capacity(bounds.len()); - for (column_bounds, right_expr) in bounds.iter().zip(self.on_right.iter()) { - // Create predicate: col >= min AND col <= max - let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::GtEq, - lit(column_bounds.min.clone()), - )) as Arc; - let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::LtEq, - lit(column_bounds.max.clone()), - )) as Arc; - let range_expr = Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) - as Arc; - predicates.push(range_expr); - } - - // Combine all predicates with AND - let combined_predicate = predicates + // Combine all partition predicates with OR + let combined_predicate = partition_predicates .into_iter() .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) as Arc }) .unwrap_or_else(|| lit(true)); @@ -332,7 +311,7 @@ impl SharedBoundsAccumulator { /// /// This method coordinates the dynamic filter updates across all partitions. It stores the /// bounds from the current partition, increments the completion counter, and when all - /// partitions have reported, merges their bounds and updates the dynamic filter. + /// partitions have reported, creates an OR'd filter from individual partition bounds. /// /// # Arguments /// * `partition_bounds` - The bounds computed by this partition (if any) @@ -361,8 +340,9 @@ impl SharedBoundsAccumulator { // Troubleshooting: If you see "completed > total_partitions", check partition // count calculation in new_from_partition_mode() - it may not match actual execution calls if completed == total_partitions { - if let Some(merged_bounds) = self.merge_bounds(&inner.bounds) { - let filter_expr = self.create_filter_from_bounds(merged_bounds)?; + if !inner.bounds.is_empty() { + let filter_expr = + self.create_filter_from_partition_bounds(&inner.bounds)?; self.dynamic_filter.update(filter_expr)?; } } @@ -1160,11 +1140,6 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| Arc::clone(&on.0)) .collect::>(); - let _on_right = self - .on - .iter() - .map(|on| Arc::clone(&on.1)) - .collect::>(); let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); @@ -1183,11 +1158,7 @@ impl ExecutionPlan for HashJoinExec { ); } - let enable_dynamic_filter_pushdown = context - .session_config() - .options() - .optimizer - .enable_dynamic_filter_pushdown; + let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { From 6e8e9f061c9cd7888fd7227593a404e0b8bff55a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:07:14 -0500 Subject: [PATCH 09/16] fixes --- datafusion/datasource-parquet/src/opener.rs | 2 +- datafusion/physical-plan/src/joins/hash_join.rs | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 709fcc5c1fc2..ccc29860023d 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -308,7 +308,7 @@ impl FileOpener for ParquetOpener { let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&physical_file_schema)?; - let mask = ProjectionMask::roots( + let mask = ProjectionMask::leaves( builder.parquet_schema(), adapted_projections.iter().cloned(), ); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d728caf9aa18..d177de178108 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1100,7 +1100,15 @@ impl ExecutionPlan for HashJoinExec { projection: self.projection.clone(), column_indices: self.column_indices.clone(), null_equality: self.null_equality, - cache: self.cache.clone(), + cache: Self::compute_properties( + &children[0], + &children[1], + Arc::clone(&self.join_schema), + self.join_type, + &self.on, + self.mode, + self.projection.as_ref(), + )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), bounds_accumulator: None, From c58cf60bc7d3e5f1e69ad4718732a0233d05f402 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:16:25 -0500 Subject: [PATCH 10/16] lint --- datafusion/physical-plan/src/joins/hash_join.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d177de178108..6c8b4844a253 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -339,13 +339,12 @@ impl SharedBoundsAccumulator { // Critical synchronization point: Only update the filter when ALL partitions are complete // Troubleshooting: If you see "completed > total_partitions", check partition // count calculation in new_from_partition_mode() - it may not match actual execution calls - if completed == total_partitions { - if !inner.bounds.is_empty() { + if completed == total_partitions + && !inner.bounds.is_empty() { let filter_expr = self.create_filter_from_partition_bounds(&inner.bounds)?; self.dynamic_filter.update(filter_expr)?; } - } Ok(()) } From bb485101462fbe6b648ef6e2090cbb8da239d022 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:28:20 -0500 Subject: [PATCH 11/16] revert upsie --- datafusion/datasource-parquet/src/opener.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index ccc29860023d..709fcc5c1fc2 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -308,7 +308,7 @@ impl FileOpener for ParquetOpener { let (schema_mapping, adapted_projections) = schema_adapter.map_schema(&physical_file_schema)?; - let mask = ProjectionMask::leaves( + let mask = ProjectionMask::roots( builder.parquet_schema(), adapted_projections.iter().cloned(), ); From 409540f0f28f37f9e4aa941da901d4c3a67ecf62 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:00:31 -0500 Subject: [PATCH 12/16] update snap --- datafusion/core/tests/dataframe/mod.rs | 53 +++++++++++++------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 0cbe0f19c68c..38dc0dc73569 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2935,33 +2935,32 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), @r" - +---------------+-----------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+-----------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Projection: t1.a, t1.b | - | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | - | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | - | | Left Join: t1.a = __scalar_sq_1.a | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __scalar_sq_1 | - | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | - | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | - | | TableScan: t2 projection=[a] | - | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | - | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | - | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=4 | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | - | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+-----------------------------------------------------------------------------------------------------------------------------+ + +---------------+---------------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+---------------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Projection: t1.a, t1.b | + | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | + | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | + | | Left Join: t1.a = __scalar_sq_1.a | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __scalar_sq_1 | + | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | + | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | + | | TableScan: t2 projection=[a] | + | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | + | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+---------------------------------------------------------------------------------------------------------------------------+ " ); From 81b321ffe815cb86e89445a1c980025d586e12b6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:02:11 -0500 Subject: [PATCH 13/16] fmt --- datafusion/physical-plan/src/joins/hash_join.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 6c8b4844a253..3b48aba4e3de 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -339,12 +339,10 @@ impl SharedBoundsAccumulator { // Critical synchronization point: Only update the filter when ALL partitions are complete // Troubleshooting: If you see "completed > total_partitions", check partition // count calculation in new_from_partition_mode() - it may not match actual execution calls - if completed == total_partitions - && !inner.bounds.is_empty() { - let filter_expr = - self.create_filter_from_partition_bounds(&inner.bounds)?; - self.dynamic_filter.update(filter_expr)?; - } + if completed == total_partitions && !inner.bounds.is_empty() { + let filter_expr = self.create_filter_from_partition_bounds(&inner.bounds)?; + self.dynamic_filter.update(filter_expr)?; + } Ok(()) } From 879bae3dc06e9ef44e272e47a50406dad8c8ae5f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:28:20 -0500 Subject: [PATCH 14/16] add new test --- .../physical_optimizer/filter_pushdown/mod.rs | 221 ++++++++++++++++++ 1 file changed, 221 insertions(+) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 52e69e4b012a..90c03cf93563 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -48,6 +48,7 @@ use datafusion_physical_optimizer::{ use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, + coalesce_partitions::CoalescePartitionsExec, filter::FilterExec, repartition::RepartitionExec, sorts::sort::SortExec, @@ -943,6 +944,226 @@ async fn test_hashjoin_dynamic_filter_pushdown() { ); } +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Rouugh plan we're trying to recreate: + // COPY (select i as k from generate_series(1, 10000000) as t(i)) + // TO 'test_files/scratch/push_down_filter/t1.parquet' + // STORED AS PARQUET; + // COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) + // TO 'test_files/scratch/push_down_filter/t2.parquet' + // STORED AS PARQUET; + // create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; + // create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; + // explain + // select * + // from t1 + // join t2 on t1.k = t2.k; + // +---------------+------------------------------------------------------------+ + // | plan_type | plan | + // +---------------+------------------------------------------------------------+ + // | physical_plan | ┌───────────────────────────┐ | + // | | │ CoalesceBatchesExec │ | + // | | │ -------------------- │ | + // | | │ target_batch_size: │ | + // | | │ 8192 │ | + // | | └─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐ | + // | | │ HashJoinExec │ | + // | | │ -------------------- ├──────────────┐ | + // | | │ on: (k = k) │ │ | + // | | └─────────────┬─────────────┘ │ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ target_batch_size: ││ target_batch_size: │ | + // | | │ 8192 ││ 8192 │ | + // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ RepartitionExec ││ RepartitionExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ partition_count(in->out): ││ partition_count(in->out): │ | + // | | │ 12 -> 12 ││ 12 -> 12 │ | + // | | │ ││ │ | + // | | │ partitioning_scheme: ││ partitioning_scheme: │ | + // | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ | + // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ DataSourceExec ││ DataSourceExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ files: 12 ││ files: 12 │ | + // | | │ format: parquet ││ format: parquet │ | + // | | │ ││ predicate: true │ | + // | | └───────────────────────────┘└───────────────────────────┘ | + // | | | + // +---------------+------------------------------------------------------------+ + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) // Extra column not used in join + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create RepartitionExec nodes for both sides with hash partitioning on join keys + let partition_count = 12; + + // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + let build_hash_exprs = vec![ + col("a", &build_side_schema).unwrap(), + col("b", &build_side_schema).unwrap(), + ]; + let build_repartition = Arc::new( + RepartitionExec::try_new( + build_scan, + Partitioning::Hash(build_hash_exprs, partition_count), + ) + .unwrap(), + ); + let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192)); + + // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + let probe_hash_exprs = vec![ + col("a", &probe_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ]; + let probe_repartition = Arc::new( + RepartitionExec::try_new( + probe_scan, + Partitioning::Hash(probe_hash_exprs, partition_count), + ) + .unwrap(), + ); + let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); + + // Create HashJoinExec with partitioned inputs + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let hash_join = Arc::new( + HashJoinExec::try_new( + build_coalesce, + probe_coalesce, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Top-level CoalesceBatchesExec + let cb = + Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; + // Top-level CoalesceParititionsExec + let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and execute to see the filter in action + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + if let Some(batch_result) = stream.next().await { + batch_result.unwrap(); + } + + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] + " + ); +} + #[tokio::test] async fn test_nested_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; From 5b8c6a5af6fdf8c227530647e4e2b270e55ea35b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:34:16 -0500 Subject: [PATCH 15/16] remove unused debug field --- datafusion/physical-plan/src/joins/hash_join.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 3b48aba4e3de..1f8861cdf005 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -975,16 +975,14 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); - let dynamic_filter_display = "".to_string(); write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", self.mode, self.join_type, on, display_filter, display_projections, - dynamic_filter_display ) } DisplayFormatType::TreeRender => { From 57dbec920e1d9e787fe52516b6aa5226a2cdf87e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:35:52 -0500 Subject: [PATCH 16/16] fmt --- datafusion/physical-plan/src/joins/hash_join.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 1f8861cdf005..e7494113775e 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -978,11 +978,7 @@ impl DisplayAs for HashJoinExec { write!( f, "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", - self.mode, - self.join_type, - on, - display_filter, - display_projections, + self.mode, self.join_type, on, display_filter, display_projections, ) } DisplayFormatType::TreeRender => {