From 2a1052c87198df7a2ad8507919f69b965d1476cb Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 30 Jul 2025 15:14:42 -0500 Subject: [PATCH 1/8] Enable physical filter pushdown for hash joins (#16954) (cherry picked from commit b10f453e7b6b74443e0f1e8f0299817016661478) --- .../physical_optimizer/filter_pushdown/mod.rs | 263 ++++++++++++++++++ .../physical-plan/src/execution_plan.rs | 22 +- .../physical-plan/src/filter_pushdown.rs | 35 ++- .../physical-plan/src/joins/hash_join.rs | 48 +++- 4 files changed, 350 insertions(+), 18 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 68369bc9d906..c06026844301 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -121,6 +121,269 @@ fn test_pushdown_into_scan_with_config_options() { ); } +#[tokio::test] +async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("d", Utf8, ["aa", "ab", "ac", "ad"]), + ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("f", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec + let on = vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + + // Finally let's add a SortExec on the outside to test pushdown of dynamic filters + let sort_expr = + PhysicalSortExpr::new(col("e", &join_schema).unwrap(), SortOptions::default()); + let plan = Arc::new( + SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), join) + .with_fetch(Some(2)), + ) as Arc; + + let mut config = ConfigOptions::default(); + config.optimizer.enable_dynamic_filter_pushdown = true; + config.execution.parquet.pushdown_filters = true; + + // Appy the FilterPushdown optimizer rule + let plan = FilterPushdown::new_post_optimization() + .optimize(Arc::clone(&plan), &config) + .unwrap(); + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + format_plan_for_test(&plan), + @r" + - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Put some data through the plan to check that the filter is updated to reflect the TopK state + let session_ctx = SessionContext::new_with_config(SessionConfig::new()); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + stream.next().await.unwrap().unwrap(); + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + format_plan_for_test(&plan), + @r" + - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] + " + ); +} + +// Test both static and dynamic filter pushdown in HashJoinExec. +// Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase. +// However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup. +// Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups. +#[tokio::test] +async fn test_static_filter_pushdown_through_hash_join() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("d", Utf8, ["aa", "ab", "ac", "ad"]), + ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("f", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec + let on = vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Create filters that can be pushed down to different sides + // We need to create filters in the context of the join output schema + let join_schema = join.schema(); + + // Filter on build side column: a = 'aa' + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column: e = 'ba' + let right_filter = col_lit_predicate("e", "ba", &join_schema); + // Filter that references both sides: a = d (should not be pushed down) + let cross_filter = Arc::new(BinaryExpr::new( + col("a", &join_schema).unwrap(), + Operator::Eq, + col("d", &join_schema).unwrap(), + )) as Arc; + + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()); + let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap()) + as Arc; + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = d@3 + - FilterExec: e@4 = ba + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = d@3 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba + " + ); + + // Test left join - filters should NOT be pushed down + let join = Arc::new( + HashJoinExec::try_new( + TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .build(), + TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .build(), + vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )], + None, + &JoinType::Left, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + let join_schema = join.schema(); + let filter = col_lit_predicate("a", "aa", &join_schema); + let plan = + Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc; + + // Test that filters are NOT pushed down for left join + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + " + ); +} + #[test] fn test_filter_collapse() { // filter should be pushed down into the parquet scan with two filters diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6d51bf195dc6..6b402528c1c8 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,8 +17,8 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, PushedDownPredicate, + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -33,7 +33,6 @@ pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ expressions, Distribution, Partitioning, PhysicalExpr, }; -use itertools::Itertools; use std::any::Any; use std::fmt::Debug; @@ -521,19 +520,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { parent_filters: Vec>, _config: &ConfigOptions, ) -> Result { - // Default implementation: mark all filters as unsupported for all children - let mut desc = FilterDescription::new(); - let child_filters = parent_filters - .iter() - .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) - .collect_vec(); - for _ in 0..self.children().len() { - desc = desc.with_child(ChildFilterDescription { - parent_filters: child_filters.clone(), - self_filters: vec![], - }); - } - Ok(desc) + Ok(FilterDescription::all_unsupported( + &parent_filters, + &self.children(), + )) } /// Handle the result of a child pushdown. diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index a3e94a75c8e7..d10bd7aea336 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -40,8 +40,9 @@ use std::sync::Arc; use datafusion_common::Result; use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use itertools::Itertools; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum FilterPushdownPhase { /// Pushdown that happens before most other optimizations. /// This pushdown allows static filters that do not reference any [`ExecutionPlan`]s to be pushed down. @@ -257,6 +258,19 @@ impl FilterPushdownPropagation { } } + /// Create a new [`FilterPushdownPropagation`] that tells the parent node that no filters were pushed down regardless of the child results. + pub fn all_unsupported(child_pushdown_result: ChildPushdownResult) -> Self { + let filters = child_pushdown_result + .parent_filters + .into_iter() + .map(|_| PushedDown::No) + .collect(); + Self { + filters, + updated_node: None, + } + } + /// Create a new [`FilterPushdownPropagation`] with the specified filter support. /// This transmits up to our parent node what the result of pushing down the filters into our node and possibly our subtree was. pub fn with_parent_pushdown_result(filters: Vec) -> Self { @@ -413,6 +427,25 @@ impl FilterDescription { Ok(desc) } + /// Mark all parent filters as unsupported for all children. + pub fn all_unsupported( + parent_filters: &[Arc], + children: &[&Arc], + ) -> Self { + let mut desc = Self::new(); + let child_filters = parent_filters + .iter() + .map(|f| PushedDownPredicate::unsupported(Arc::clone(f))) + .collect_vec(); + for _ in 0..children.len() { + desc = desc.with_child(ChildFilterDescription { + parent_filters: child_filters.clone(), + self_filters: vec![], + }); + } + desc + } + pub fn parent_filters(&self) -> Vec> { self.child_filter_descriptions .iter() diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a7f28ede4408..0a26039462a4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -34,6 +34,10 @@ use super::{ }; use super::{JoinOn, JoinOnRef}; use crate::execution_plan::{boundedness_from_children, EmissionType}; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; use crate::projection::{ try_embed_projection, try_pushdown_through_join, EmbeddedProjection, JoinData, @@ -68,6 +72,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; +use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, JoinType, @@ -79,7 +84,7 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; -use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; @@ -944,6 +949,47 @@ impl ExecutionPlan for HashJoinExec { try_embed_projection(projection, self) } } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + // Other types of joins can support *some* filters, but restrictions are complex and error prone. + // For now we don't support them. + // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + // See https://github.com/apache/datafusion/issues/16973 for tracking. + if self.join_type != JoinType::Inner { + return Ok(FilterDescription::all_unsupported( + &parent_filters, + &self.children(), + )); + } + FilterDescription::from_children(parent_filters, &self.children()) + // TODO: push down our self filters to children in the post optimization phase + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + // Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for + // non-inner joins in `gather_filters_for_pushdown`. + // However it's a cheap check and serves to inform future devs touching this function that they need to be really + // careful pushing down filters through non-inner joins. + if self.join_type != JoinType::Inner { + // Other types of joins can support *some* filters, but restrictions are complex and error prone. + // For now we don't support them. + // See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs + return Ok(FilterPushdownPropagation::all_unsupported( + child_pushdown_result, + )); + } + Ok(FilterPushdownPropagation::if_any(child_pushdown_result)) + } } /// Reads the left (build) side of the input, buffering it in memory, to build a From 79e84c578653e7b560f6ebe3709caf7a9a47e663 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 7 Aug 2025 16:26:34 -0500 Subject: [PATCH 2/8] Add ExecutionPlan::reset_state (#17028) * Add ExecutionPlan::reset_state Co-authored-by: Robert Ream * Update datafusion/sqllogictest/test_files/cte.slt * Add reference * fmt * add to upgrade guide * add explain plan, implement in more plans * fmt * only explain --------- Co-authored-by: Robert Ream --- .../src/expressions/dynamic_filters.rs | 8 ++ .../physical-plan/src/execution_plan.rs | 25 ++++ .../physical-plan/src/joins/cross_join.rs | 12 ++ .../physical-plan/src/joins/hash_join.rs | 20 +++ .../physical-plan/src/recursive_query.rs | 5 +- datafusion/physical-plan/src/sorts/sort.rs | 80 ++++++++--- datafusion/sqllogictest/test_files/cte.slt | 55 ++++++++ docs/source/library-user-guide/upgrading.md | 133 ++++++++++++++++++ 8 files changed, 313 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index ba30b916b9f8..ea10b1197b1d 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -32,6 +32,10 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. +/// +/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also +/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where +/// the same `ExecutionPlan` is reused with different data. #[derive(Debug)] pub struct DynamicFilterPhysicalExpr { /// The original children of this PhysicalExpr, if any. @@ -121,6 +125,10 @@ impl DynamicFilterPhysicalExpr { /// do not change* since those will be used to determine what columns need to read or projected /// when evaluating the expression. /// + /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also + /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where + /// the same `ExecutionPlan` is reused with different data. + /// /// [`collect_columns`]: crate::utils::collect_columns #[allow(dead_code)] // Only used in tests for now pub fn new( diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 6b402528c1c8..fe97ca1fc1db 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -194,6 +194,31 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { children: Vec>, ) -> Result>; + /// Reset any internal state within this [`ExecutionPlan`]. + /// + /// This method is called when an [`ExecutionPlan`] needs to be re-executed, + /// such as in recursive queries. Unlike [`ExecutionPlan::with_new_children`], this method + /// ensures that any stateful components (e.g., [`DynamicFilterPhysicalExpr`]) + /// are reset to their initial state. + /// + /// The default implementation simply calls [`ExecutionPlan::with_new_children`] with the existing children, + /// effectively creating a new instance of the [`ExecutionPlan`] with the same children but without + /// necessarily resetting any internal state. Implementations that require resetting of some + /// internal state should override this method to provide the necessary logic. + /// + /// This method should *not* reset state recursively for children, as it is expected that + /// it will be called from within a walk of the execution plan tree so that it will be called on each child later + /// or was already called on each child. + /// + /// Note to implementers: unlike [`ExecutionPlan::with_new_children`] this method does not accept new children as an argument, + /// thus it is expected that any cached plan properties will remain valid after the reset. + /// + /// [`DynamicFilterPhysicalExpr`]: datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr + fn reset_state(self: Arc) -> Result> { + let children = self.children().into_iter().cloned().collect(); + self.with_new_children(children) + } + /// If supported, attempt to increase the partitioning of this `ExecutionPlan` to /// produce `target_partitions` partitions. /// diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index a41e668ab4da..b8ea6330a1e2 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -270,6 +270,18 @@ impl ExecutionPlan for CrossJoinExec { ))) } + fn reset_state(self: Arc) -> Result> { + let new_exec = CrossJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + schema: Arc::clone(&self.schema), + left_fut: Default::default(), // reset the build side! + metrics: ExecutionPlanMetricsSet::default(), + cache: self.cache.clone(), + }; + Ok(Arc::new(new_exec)) + } + fn required_input_distribution(&self) -> Vec { vec![ Distribution::SinglePartition, diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 0a26039462a4..6058b7974e92 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -774,6 +774,26 @@ impl ExecutionPlan for HashJoinExec { )?)) } + fn reset_state(self: Arc) -> Result> { + // Reset the left_fut to allow re-execution + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: OnceAsync::default(), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + })) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 99b460dfcfdc..700a9076fecf 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -372,7 +372,7 @@ fn assign_work_table( } /// Some plans will change their internal states after execution, making them unable to be executed again. -/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states. +/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan. /// /// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. /// However, if the data of the left table is derived from the work table, it will become outdated @@ -383,8 +383,7 @@ fn reset_plan_states(plan: Arc) -> Result() { Ok(Transformed::no(plan)) } else { - let new_plan = Arc::clone(&plan) - .with_new_children(plan.children().into_iter().cloned().collect())?; + let new_plan = Arc::clone(&plan).reset_state()?; Ok(Transformed::yes(new_plan)) } }) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index bb572c4315fb..b82f1769d092 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -899,6 +899,29 @@ impl SortExec { self } + /// Add or reset `self.filter` to a new `DynamicFilterPhysicalExpr`. + fn create_filter(&self) -> Arc { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) + } + + fn cloned(&self) -> Self { + SortExec { + input: Arc::clone(&self.input), + expr: self.expr.clone(), + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + common_sort_prefix: self.common_sort_prefix.clone(), + fetch: self.fetch, + cache: self.cache.clone(), + filter: self.filter.clone(), + } + } + /// Modify how many rows to include in the result /// /// If None, then all rows will be returned, in sorted order. @@ -920,25 +943,13 @@ impl SortExec { } let filter = fetch.is_some().then(|| { // If we already have a filter, keep it. Otherwise, create a new one. - self.filter.clone().unwrap_or_else(|| { - let children = self - .expr - .iter() - .map(|sort_expr| Arc::clone(&sort_expr.expr)) - .collect::>(); - Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true))) - }) + self.filter.clone().unwrap_or_else(|| self.create_filter()) }); - SortExec { - input: Arc::clone(&self.input), - expr: self.expr.clone(), - metrics_set: self.metrics_set.clone(), - preserve_partitioning: self.preserve_partitioning, - common_sort_prefix: self.common_sort_prefix.clone(), - fetch, - cache, - filter, - } + let mut new_sort = self.cloned(); + new_sort.fetch = fetch; + new_sort.cache = cache; + new_sort.filter = filter; + new_sort } /// Input schema @@ -1110,10 +1121,35 @@ impl ExecutionPlan for SortExec { self: Arc, children: Vec>, ) -> Result> { - let mut new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) - .with_fetch(self.fetch) - .with_preserve_partitioning(self.preserve_partitioning); - new_sort.filter = self.filter.clone(); + let mut new_sort = self.cloned(); + assert!( + children.len() == 1, + "SortExec should have exactly one child" + ); + new_sort.input = Arc::clone(&children[0]); + // Recompute the properties based on the new input since they may have changed + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + )?; + new_sort.cache = cache; + new_sort.common_sort_prefix = sort_prefix; + + Ok(Arc::new(new_sort)) + } + + fn reset_state(self: Arc) -> Result> { + let children = self.children().into_iter().cloned().collect(); + let new_sort = self.with_new_children(children)?; + let mut new_sort = new_sort + .as_any() + .downcast_ref::() + .expect("cloned 1 lines above this line, we know the type") + .clone(); + // Our dynamic filter and execution metrics are the state we need to reset. + new_sort.filter = Some(new_sort.create_filter()); + new_sort.metrics_set = ExecutionPlanMetricsSet::new(); Ok(Arc::new(new_sort)) } diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 32320a06f4fb..5f8fd1a0b5ef 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -996,6 +996,61 @@ physical_plan 08)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 09)------------WorkTableExec: name=numbers +# Test for issue #16998: SortExec shares DynamicFilterPhysicalExpr across multiple executions +query II +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +0 0 +0 0 +0 0 +0 0 +0 0 + +query TT +explain +with recursive r as ( + select 0 as k, 0 as v + union all + ( + select * + from r + order by v + limit 1 + ) +) +select * +from r +limit 5; +---- +logical_plan +01)SubqueryAlias: r +02)--Limit: skip=0, fetch=5 +03)----RecursiveQuery: is_distinct=false +04)------Projection: Int64(0) AS k, Int64(0) AS v +05)--------EmptyRelation +06)------Sort: r.v ASC NULLS LAST, fetch=1 +07)--------Projection: r.k, r.v +08)----------TableScan: r +physical_plan +01)GlobalLimitExec: skip=0, fetch=5 +02)--RecursiveQueryExec: name=r, is_distinct=false +03)----ProjectionExec: expr=[0 as k, 0 as v] +04)------PlaceholderRowExec +05)----SortExec: TopK(fetch=1), expr=[v@1 ASC NULLS LAST], preserve_partitioning=[false] +06)------WorkTableExec: name=r + statement count 0 set datafusion.execution.enable_recursive_ctes = false; diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index e6a2f06305c1..1c85aa32198b 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -19,6 +19,139 @@ # Upgrade Guides +## DataFusion `50.0.0` + +**Note:** DataFusion `50.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. +You can see the current [status of the `50.0.0 `release here](https://github.com/apache/datafusion/issues/16799) + +### `AsyncScalarUDFImpl::invoke_async_with_args` returns `ColumnarValue` + +In order to enable single value optimizations and be consistent with other +user defined function APIs, the `AsyncScalarUDFImpl::invoke_async_with_args` method now +returns a `ColumnarValue` instead of a `ArrayRef`. + +To upgrade, change the return type of your implementation + +```rust +# /* comment to avoid running +impl AsyncScalarUDFImpl for AskLLM { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + _option: &ConfigOptions, + ) -> Result { + .. + return array_ref; // old code + } +} +# */ +``` + +To return a `ColumnarValue` + +```rust +# /* comment to avoid running +impl AsyncScalarUDFImpl for AskLLM { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + _option: &ConfigOptions, + ) -> Result { + .. + return ColumnarValue::from(array_ref); // new code + } +} +# */ +``` + +See [#16896](https://github.com/apache/datafusion/issues/16896) for more details. + +### `SessionState`, `SessionConfig`, and `OptimizerConfig` returns `&Arc` instead of `&ConfigOptions` + +To provide broader access to `ConfigOptions` and reduce required clones, some +APIs have been changed to return a `&Arc` instead of a +`&ConfigOptions`. This allows sharing the same `ConfigOptions` across multiple +threads without needing to clone the entire `ConfigOptions` structure unless it +is modified. + +Most users will not be impacted by this change since the Rust compiler typically +automatically dereference the `Arc` when needed. However, in some cases you may +have to change your code to explicitly call `as_ref()` for example, from + +```rust +# /* comment to avoid running +let optimizer_config: &ConfigOptions = state.options(); +# */ +``` + +To + +```rust +# /* comment to avoid running +let optimizer_config: &ConfigOptions = state.options().as_ref(); +# */ +``` + +See PR [#16970](https://github.com/apache/datafusion/pull/16970) + +### API Change to `AsyncScalarUDFImpl::invoke_async_with_args` + +The `invoke_async_with_args` method of the `AsyncScalarUDFImpl` trait has been +updated to remove the `_option: &ConfigOptions` parameter to simplify the API +now that the `ConfigOptions` can be accessed through the `ScalarFunctionArgs` +parameter. + +You can change your code like this + +```rust +# /* comment to avoid running +impl AsyncScalarUDFImpl for AskLLM { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + _option: &ConfigOptions, + ) -> Result { + .. + } + ... +} +# */ +``` + +To this: + +```rust +# /* comment to avoid running + +impl AsyncScalarUDFImpl for AskLLM { + async fn invoke_async_with_args( + &self, + args: ScalarFunctionArgs, + ) -> Result { + let options = &args.config_options; + .. + } + ... +} +# */ +``` + +### Upgrade to arrow `56.0.0` and parquet `56.0.0` + +This version of DataFusion upgrades the underlying Apache Arrow implementation +to version `56.0.0`. See the [release notes](https://github.com/apache/arrow-rs/releases/tag/56.0.0) +for more details. + +### Added `ExecutionPlan::reset_state` + +In order to fix a bug in DataFusion `49.0.0` where dynamic filters (currently only generated in the precense of a query such as `ORDER BY ... LIMIT ...`) +produced incorrect results in recursive queries, a new method `reset_state` has been added to the `ExecutionPlan` trait. + +Any `ExecutionPlan` that needs to maintain internal state or references to other nodes in the execution plan tree should implement this method to reset that state. +See [#17028] for more details and an example implementation for `SortExec`. + +[#17028]: https://github.com/apache/datafusion/pull/17028 + ## DataFusion `49.0.0` **Note:** DataFusion `49.0.0` has not been released yet. The information provided in this section pertains to features and changes that have already been merged to the main branch and are awaiting release in this version. From 06fe6874b50d8f69ca2ae6edaf81e9a0d6b13a0c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 11 Aug 2025 12:58:20 -0500 Subject: [PATCH 3/8] Add dynamic filter (bounds) pushdown to HashJoinExec (#16445) (cherry picked from commit ff77b702003b5bf6d9eaa759d37c4af3aefd9f4d) --- Cargo.lock | 1 + .../physical_optimizer/filter_pushdown/mod.rs | 382 +++++++++++++++++- datafusion/physical-plan/Cargo.toml | 1 + .../physical-plan/src/joins/hash_join.rs | 149 ++++++- 4 files changed, 512 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a088005a0f19..20b894864647 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2506,6 +2506,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate", + "datafusion-functions-aggregate-common", "datafusion-functions-window", "datafusion-functions-window-common", "datafusion-physical-expr", diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index c06026844301..5059e27ac9c5 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -129,13 +129,13 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { // Create build side with limited values let build_batches = vec![record_batch!( ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["ba", "bb"]), + ("b", Utf8View, ["ba", "bb"]), ("c", Float64, [1.0, 2.0]) ) .unwrap()]; let build_side_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), + Field::new("b", DataType::Utf8View, false), Field::new("c", DataType::Float64, false), ])); let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -146,13 +146,13 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { // Create probe side with more values let probe_batches = vec![record_batch!( ("d", Utf8, ["aa", "ab", "ac", "ad"]), - ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Utf8View, ["ba", "bb", "bc", "bd"]), ("f", Float64, [1.0, 2.0, 3.0, 4.0]) ) .unwrap()]; let probe_side_schema = Arc::new(Schema::new(vec![ Field::new("d", DataType::Utf8, false), - Field::new("e", DataType::Utf8, false), + Field::new("e", DataType::Utf8View, false), Field::new("f", DataType::Float64, false), ])); let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) @@ -205,7 +205,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false] - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] AND DynamicFilterPhysicalExpr [ true ] " ); @@ -226,9 +226,9 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { format_plan_for_test(&plan), @r" - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] " ); } @@ -245,13 +245,13 @@ async fn test_static_filter_pushdown_through_hash_join() { // Create build side with limited values let build_batches = vec![record_batch!( ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["ba", "bb"]), + ("b", Utf8View, ["ba", "bb"]), ("c", Float64, [1.0, 2.0]) ) .unwrap()]; let build_side_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), + Field::new("b", DataType::Utf8View, false), Field::new("c", DataType::Float64, false), ])); let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -262,13 +262,13 @@ async fn test_static_filter_pushdown_through_hash_join() { // Create probe side with more values let probe_batches = vec![record_batch!( ("d", Utf8, ["aa", "ab", "ac", "ad"]), - ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Utf8View, ["ba", "bb", "bc", "bd"]), ("f", Float64, [1.0, 2.0, 3.0, 4.0]) ) .unwrap()]; let probe_side_schema = Arc::new(Schema::new(vec![ Field::new("d", DataType::Utf8, false), - Field::new("e", DataType::Utf8, false), + Field::new("e", DataType::Utf8View, false), Field::new("f", DataType::Float64, false), ])); let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) @@ -696,6 +696,366 @@ async fn test_topk_dynamic_filter_pushdown() { ); } +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) // Extra column not used in join + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with dynamic filter + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + ", + ); + + // Actually apply the optimization to the plan and execute to see the filter in action + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + stream.next().await.unwrap().unwrap(); + + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + " + ); +} + +#[tokio::test] +async fn test_nested_hashjoin_dynamic_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create test data for three tables: t1, t2, t3 + // t1: small table with limited values (will be build side of outer join) + let t1_batches = + vec![ + record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0, 2.0])).unwrap(), + ]; + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("x", DataType::Float64, false), + ])); + let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema)) + .with_support(true) + .with_batches(t1_batches) + .build(); + + // t2: larger table (will be probe side of inner join, build side of outer join) + let t2_batches = vec![record_batch!( + ("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]), + ("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]), + ("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0]) + ) + .unwrap()]; + let t2_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Utf8, false), + Field::new("y", DataType::Float64, false), + ])); + let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema)) + .with_support(true) + .with_batches(t2_batches) + .build(); + + // t3: largest table (will be probe side of inner join) + let t3_batches = vec![record_batch!( + ("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]), + ("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]) + ) + .unwrap()]; + let t3_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("z", DataType::Float64, false), + ])); + let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema)) + .with_support(true) + .with_batches(t3_batches) + .build(); + + // Create nested join structure: + // Join (t1.a = t2.b) + // / \ + // t1 Join(t2.c = t3.d) + // / \ + // t2 t3 + + // First create inner join: t2.c = t3.d + let inner_join_on = + vec![(col("c", &t2_schema).unwrap(), col("d", &t3_schema).unwrap())]; + let inner_join = Arc::new( + HashJoinExec::try_new( + t2_scan, + t3_scan, + inner_join_on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Then create outer join: t1.a = t2.b (from inner join result) + let outer_join_on = vec![( + col("a", &t1_schema).unwrap(), + col("b", &inner_join.schema()).unwrap(), + )]; + let outer_join = Arc::new( + HashJoinExec::try_new( + t1_scan, + inner_join as Arc, + outer_join_on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc; + + // Test that dynamic filters are pushed down correctly through nested joins + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&outer_join), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + ", + ); + + // Execute the plan to verify the dynamic filters are properly updated + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(outer_join, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Execute to populate the dynamic filters + stream.next().await.unwrap().unwrap(); + + // Verify that both the inner and outer join have updated dynamic filters + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] + " + ); +} + +#[tokio::test] +async fn test_hashjoin_parent_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("d", Utf8, ["aa", "ab", "ac", "ad"]), + ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("f", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec + let on = vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Create filters that can be pushed down to different sides + // We need to create filters in the context of the join output schema + let join_schema = join.schema(); + + // Filter on build side column: a = 'aa' + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column: e = 'ba' + let right_filter = col_lit_predicate("e", "ba", &join_schema); + // Filter that references both sides: a = d (should not be pushed down) + let cross_filter = Arc::new(BinaryExpr::new( + col("a", &join_schema).unwrap(), + Operator::Eq, + col("d", &join_schema).unwrap(), + )) as Arc; + + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()); + let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap()) + as Arc; + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = d@3 + - FilterExec: e@4 = ba + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = d@3 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba + " + ); +} + /// Integration test for dynamic filter pushdown with TopK. /// We use an integration test because there are complex interactions in the optimizer rules /// that the unit tests applying a single optimizer rule do not cover. diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 095ee78cd0d6..9889b45cc5a5 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -53,6 +53,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 6058b7974e92..d3999c6cd824 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -76,14 +76,16 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, JoinType, - NullEquality, Result, + NullEquality, Result, ScalarValue, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; +use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch}; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; +use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use datafusion_physical_expr_common::datum::compare_op_for_nested; @@ -363,6 +365,8 @@ pub struct HashJoinExec { pub null_equality: NullEquality, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Dynamic filter for pushing down to the probe side + dynamic_filter: Arc, } impl HashJoinExec { @@ -409,6 +413,8 @@ impl HashJoinExec { projection.as_ref(), )?; + let dynamic_filter = Self::create_dynamic_filter(&on); + Ok(HashJoinExec { left, right, @@ -424,9 +430,17 @@ impl HashJoinExec { column_indices, null_equality, cache, + dynamic_filter, }) } + fn create_dynamic_filter(on: &JoinOn) -> Arc { + // Extract the right-side keys from the `on` clauses + let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + // Initialize with a placeholder expression (true) that will be updated when the hash table is built + Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + } + /// left (build) side which gets hashed pub fn left(&self) -> &Arc { &self.left @@ -672,10 +686,21 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); + let dynamic_filter_display = match self.dynamic_filter.current() { + Ok(current) if current != lit(true) => { + format!(", filter=[{current}]") + } + _ => "".to_string(), + }; write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", - self.mode, self.join_type, on, display_filter, display_projections + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", + self.mode, + self.join_type, + on, + display_filter, + display_projections, + dynamic_filter_display ) } DisplayFormatType::TreeRender => { @@ -762,7 +787,7 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(HashJoinExec::try_new( + let mut new_join = HashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), self.on.clone(), @@ -771,7 +796,10 @@ impl ExecutionPlan for HashJoinExec { self.projection.clone(), self.mode, self.null_equality, - )?)) + )?; + // Preserve the dynamic filter if it exists + new_join.dynamic_filter = Arc::clone(&self.dynamic_filter); + Ok(Arc::new(new_join)) } fn reset_state(self: Arc) -> Result> { @@ -791,6 +819,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), + dynamic_filter: Self::create_dynamic_filter(&self.on), })) } @@ -827,6 +856,12 @@ impl ExecutionPlan for HashJoinExec { ); } + let enable_dynamic_filter_pushdown = context + .session_config() + .options() + .optimizer + .enable_dynamic_filter_pushdown; + let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { @@ -843,6 +878,9 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), + enable_dynamic_filter_pushdown + .then_some(Arc::clone(&self.dynamic_filter)), + on_right.clone(), )) })?, PartitionMode::Partitioned => { @@ -860,6 +898,9 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, + enable_dynamic_filter_pushdown + .then_some(Arc::clone(&self.dynamic_filter)), + on_right.clone(), )) } PartitionMode::Auto => { @@ -972,9 +1013,9 @@ impl ExecutionPlan for HashJoinExec { fn gather_filters_for_pushdown( &self, - _phase: FilterPushdownPhase, + phase: FilterPushdownPhase, parent_filters: Vec>, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result { // Other types of joins can support *some* filters, but restrictions are complex and error prone. // For now we don't support them. @@ -986,8 +1027,30 @@ impl ExecutionPlan for HashJoinExec { &self.children(), )); } - FilterDescription::from_children(parent_filters, &self.children()) - // TODO: push down our self filters to children in the post optimization phase + + // Get basic filter descriptions for both children + let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.left(), + )?; + let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.right(), + )?; + + // Add dynamic filters in Post phase if enabled + if matches!(phase, FilterPushdownPhase::Post) + && config.optimizer.enable_dynamic_filter_pushdown + { + // Add actual dynamic filter to right side (probe side) + let dynamic_filter = + Arc::clone(&self.dynamic_filter) as Arc; + right_child = right_child.with_self_filter(dynamic_filter); + } + + Ok(FilterDescription::new() + .with_child(left_child) + .with_child(right_child)) } fn handle_child_pushdown_result( @@ -1012,8 +1075,31 @@ impl ExecutionPlan for HashJoinExec { } } +/// Compute min/max bounds for each column in the given arrays +fn compute_bounds(arrays: &[ArrayRef]) -> Result> { + arrays + .iter() + .map(|array| { + if array.is_empty() { + // Return NULL values for empty arrays + return Ok(( + ScalarValue::try_from(array.data_type())?, + ScalarValue::try_from(array.data_type())?, + )); + } + + // Use Arrow kernels for efficient min/max computation + let min_val = min_batch(array)?; + let max_val = max_batch(array)?; + + Ok((min_val, max_val)) + }) + .collect() +} + /// Reads the left (build) side of the input, buffering it in memory, to build a /// hash table (`LeftJoinData`) +#[expect(clippy::too_many_arguments)] async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1022,6 +1108,8 @@ async fn collect_left_input( reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, + dynamic_filter: Option>, + on_right: Vec, ) -> Result { let schema = left_stream.schema(); @@ -1114,12 +1202,53 @@ async fn collect_left_input( let data = JoinLeftData::new( hashmap, single_batch, - left_values, + left_values.clone(), Mutex::new(visited_indices_bitmap), AtomicUsize::new(probe_threads_count), reservation, ); + // Update dynamic filter with min/max bounds if provided + if let Some(dynamic_filter) = dynamic_filter { + if num_rows > 0 { + let bounds = compute_bounds(&left_values)?; + + // Create range predicates for each join key + let mut predicates = Vec::with_capacity(bounds.len()); + for ((min_val, max_val), right_expr) in bounds.iter().zip(on_right.iter()) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(min_val.clone()), + )) as Arc; + + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(max_val.clone()), + )) as Arc; + + let range_expr = + Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc; + + predicates.push(range_expr); + } + + // Combine all predicates with AND + let combined_predicate = predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap_or_else(|| lit(true)); + + dynamic_filter.update(combined_predicate)?; + } + } + Ok(data) } From 1a16ba9972edc58f9892dda4140417b876bed804 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Tue, 19 Aug 2025 22:43:19 +0100 Subject: [PATCH 4/8] Push dynamic pushdown through CooperativeExec and ProjectionExec (#17238) (cherry picked from commit 4bc069693dc374c1b52c6266ae7a01ef46e1fdf5) --- datafusion/physical-plan/src/coop.rs | 31 ++++++++++++++++--- datafusion/physical-plan/src/projection.rs | 26 ++++++++++++++++ .../sqllogictest/test_files/explain_tree.slt | 2 +- datafusion/sqllogictest/test_files/topk.slt | 2 +- 4 files changed, 55 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index be0afa07eac2..64dc32f0ab37 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -65,10 +65,9 @@ //! The optimizer rule currently checks the plan for exchange-like operators and leave operators //! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). -#[cfg(any( - datafusion_coop = "tokio_fallback", - not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream")) -))] +use datafusion_common::config::ConfigOptions; +use datafusion_physical_expr::PhysicalExpr; +#[cfg(datafusion_coop = "tokio_fallback")] use futures::Future; use std::any::Any; use std::pin::Pin; @@ -76,6 +75,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use crate::execution_plan::CardinalityEffect::{self, Equal}; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -164,6 +167,8 @@ where // after the work has been done and just assume that that succeeded. // The poll result is ignored because we don't want to discard // or buffer the Ready result we got from the inner stream. + + use std::future::Future; let consume = tokio::task::coop::consume_budget(); let consume_ref = std::pin::pin!(consume); let _ = consume_ref.poll(cx); @@ -291,6 +296,24 @@ impl ExecutionPlan for CooperativeExec { fn cardinality_effect(&self) -> CardinalityEffect { Equal } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } } /// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`]. diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index a29f4aeb4090..bf2397a3d035 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -33,11 +33,16 @@ use super::{ SendableRecordBatchStream, Statistics, }; use crate::execution_plan::CardinalityEffect; +use crate::filter_pushdown::{ + ChildPushdownResult, FilterDescription, FilterPushdownPhase, + FilterPushdownPropagation, +}; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{ColumnStatistics, DisplayFormatType, ExecutionPlan, PhysicalExpr}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, @@ -273,6 +278,27 @@ impl ExecutionPlan for ProjectionExec { Ok(Some(Arc::new(projection.clone()))) } } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + // TODO: In future, we can try to handle inverting aliases here. + // For the time being, we pass through untransformed filters, so filters on aliases are not handled. + // https://github.com/apache/datafusion/issues/17246 + FilterDescription::from_children(parent_filters, &self.children()) + } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::if_all(child_pushdown_result)) + } } fn stats_projection( diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index f4188f4cb395..46672b8a8eb1 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -383,7 +383,7 @@ physical_plan 44)-----------------------------│ -------------------- ││ -------------------- │ 45)-----------------------------│ files: 1 ││ partition_count(in->out): │ 46)-----------------------------│ format: parquet ││ 1 -> 4 │ -47)-----------------------------│ ││ │ +47)-----------------------------│ predicate: true ││ │ 48)-----------------------------│ ││ partitioning_scheme: │ 49)-----------------------------│ ││ RoundRobinBatch(4) │ 50)-----------------------------└───────────────────────────┘└─────────────┬─────────────┘ diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index afa78e43de2b..ce59b0204616 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -372,7 +372,7 @@ explain select number, letter, age, number as column4, letter as column5 from pa physical_plan 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, letter@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[number@0 as number, letter@1 as letter, age@2 as age, number@0 as column4, letter@1 as column5] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]}, projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ] # Verify that the sort prefix is correctly computed over normalized, order-maintaining projections (number + 1, number, number + 1, age) query TT From 1369c45b26ccbc3147a10964f8f7d3ab4d5032de Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 19 Aug 2025 06:34:51 -0500 Subject: [PATCH 5/8] Fix dynamic filter pushdown in HashJoinExec (#17201) (cherry picked from commit 1d4d74bedafb73138237d8194d36778373d39a8e) --- .../physical-expr-common/src/physical_expr.rs | 2 +- .../physical-plan/src/joins/hash_join.rs | 99 ++++++++++++++----- .../test_files/push_down_filter.slt | 32 ++++++ 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index b4cb08715f53..556fa84e25b0 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -68,7 +68,7 @@ pub type PhysicalExprRef = Arc; /// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html /// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html /// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html -pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { +pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash { /// Returns the physical expression as [`Any`] so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d3999c6cd824..90e55afe5aef 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -329,7 +329,6 @@ impl JoinLeftData { /// Note this structure includes a [`OnceAsync`] that is used to coordinate the /// loading of the left side with the processing in each output stream. /// Therefore it can not be [`Clone`] -#[derive(Debug)] pub struct HashJoinExec { /// left (build) side which gets hashed pub left: Arc, @@ -350,7 +349,7 @@ pub struct HashJoinExec { /// /// Each output stream waits on the `OnceAsync` to signal the completion of /// the hash table creation. - left_fut: OnceAsync, + left_fut: Arc>, /// Shared the `RandomState` for the hashing algorithm random_state: RandomState, /// Partitioning mode to use @@ -366,7 +365,29 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side - dynamic_filter: Arc, + dynamic_filter: Option>, +} + +impl fmt::Debug for HashJoinExec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("HashJoinExec") + .field("left", &self.left) + .field("right", &self.right) + .field("on", &self.on) + .field("filter", &self.filter) + .field("join_type", &self.join_type) + .field("join_schema", &self.join_schema) + .field("left_fut", &self.left_fut) + .field("random_state", &self.random_state) + .field("mode", &self.mode) + .field("metrics", &self.metrics) + .field("projection", &self.projection) + .field("column_indices", &self.column_indices) + .field("null_equality", &self.null_equality) + .field("cache", &self.cache) + // Explicitly exclude dynamic_filter to avoid runtime state differences in tests + .finish() + } } impl HashJoinExec { @@ -413,8 +434,6 @@ impl HashJoinExec { projection.as_ref(), )?; - let dynamic_filter = Self::create_dynamic_filter(&on); - Ok(HashJoinExec { left, right, @@ -430,12 +449,13 @@ impl HashJoinExec { column_indices, null_equality, cache, - dynamic_filter, + dynamic_filter: None, }) } fn create_dynamic_filter(on: &JoinOn) -> Arc { - // Extract the right-side keys from the `on` clauses + // Extract the right-side keys (probe side keys) from the `on` clauses + // Dynamic filter will be created from build side values (left side) and applied to probe side (right side) let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); // Initialize with a placeholder expression (true) that will be updated when the hash table is built Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) @@ -686,11 +706,14 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); - let dynamic_filter_display = match self.dynamic_filter.current() { - Ok(current) if current != lit(true) => { - format!(", filter=[{current}]") - } - _ => "".to_string(), + let dynamic_filter_display = match self.dynamic_filter.as_ref() { + Some(dynamic_filter) => match dynamic_filter.current() { + Ok(current) if current != lit(true) => { + format!(", filter=[{current}]") + } + _ => "".to_string(), + }, + None => "".to_string(), }; write!( f, @@ -787,7 +810,7 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { - let mut new_join = HashJoinExec::try_new( + let new_join = HashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), self.on.clone(), @@ -797,8 +820,6 @@ impl ExecutionPlan for HashJoinExec { self.mode, self.null_equality, )?; - // Preserve the dynamic filter if it exists - new_join.dynamic_filter = Arc::clone(&self.dynamic_filter); Ok(Arc::new(new_join)) } @@ -811,7 +832,7 @@ impl ExecutionPlan for HashJoinExec { filter: self.filter.clone(), join_type: self.join_type, join_schema: Arc::clone(&self.join_schema), - left_fut: OnceAsync::default(), + left_fut: Arc::new(OnceAsync::default()), random_state: self.random_state.clone(), mode: self.mode, metrics: ExecutionPlanMetricsSet::new(), @@ -819,7 +840,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter: Self::create_dynamic_filter(&self.on), + dynamic_filter: None, })) } @@ -879,7 +900,8 @@ impl ExecutionPlan for HashJoinExec { need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), enable_dynamic_filter_pushdown - .then_some(Arc::clone(&self.dynamic_filter)), + .then_some(self.dynamic_filter.clone()) + .flatten(), on_right.clone(), )) })?, @@ -899,7 +921,8 @@ impl ExecutionPlan for HashJoinExec { need_produce_result_in_final(self.join_type), 1, enable_dynamic_filter_pushdown - .then_some(Arc::clone(&self.dynamic_filter)), + .then_some(self.dynamic_filter.clone()) + .flatten(), on_right.clone(), )) } @@ -1043,8 +1066,7 @@ impl ExecutionPlan for HashJoinExec { && config.optimizer.enable_dynamic_filter_pushdown { // Add actual dynamic filter to right side (probe side) - let dynamic_filter = - Arc::clone(&self.dynamic_filter) as Arc; + let dynamic_filter = Self::create_dynamic_filter(&self.on); right_child = right_child.with_self_filter(dynamic_filter); } @@ -1071,7 +1093,40 @@ impl ExecutionPlan for HashJoinExec { child_pushdown_result, )); } - Ok(FilterPushdownPropagation::if_any(child_pushdown_result)) + + let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone()); + assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children + let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child + // We expect 0 or 1 self filters + if let Some(filter) = right_child_self_filters.first() { + // Note that we don't check PushdDownPredicate::discrimnant because even if nothing said + // "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating + let predicate = Arc::clone(&filter.predicate); + if let Ok(dynamic_filter) = + Arc::downcast::(predicate) + { + // We successfully pushed down our self filter - we need to make a new node with the dynamic filter + let new_node = Arc::new(HashJoinExec { + left: Arc::clone(&self.left), + right: Arc::clone(&self.right), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: self.cache.clone(), + dynamic_filter: Some(dynamic_filter), + }); + result = result.with_updated_node(new_node as Arc); + } + } + Ok(result) } } diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index f6d71cad60d4..0082da643b05 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -267,5 +267,37 @@ explain select a from t where CAST(a AS string) = '0123'; physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123 +# Test dynamic filter pushdown with swapped join inputs (issue #17196) +# Create tables with different sizes to force join input swapping +statement ok +copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; + +statement ok +create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; + +# Test that dynamic filter is applied to the correct table after join input swapping +# The small_table should be the build side, large_table should be the probe side with dynamic filter +query TT +explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; +---- +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet +04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] + +statement ok +drop table small_table; + +statement ok +drop table large_table; + statement ok drop table t; From 2188d265ccab44852855e87b881a629a1051ba50 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 20 Aug 2025 14:21:38 -0500 Subject: [PATCH 6/8] Fix HashJoinExec sideways information passing for partitioned queries (#17197) (cherry picked from commit 64bc58d82ac7e63b97e07e9a639b021e3dd904bc) --- .../physical_optimizer/filter_pushdown/mod.rs | 235 ++++++++- .../physical-plan/src/joins/hash_join.rs | 491 ++++++++++++++---- .../test_files/push_down_filter.slt | 75 +++ 3 files changed, 699 insertions(+), 102 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 5059e27ac9c5..71635b929f84 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -43,6 +43,7 @@ use datafusion_physical_optimizer::{ use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, + coalesce_partitions::CoalescePartitionsExec, filter::FilterExec, repartition::RepartitionExec, sorts::sort::SortExec, @@ -226,7 +227,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { format_plan_for_test(&plan), @r" - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] " @@ -754,7 +755,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { None, &JoinType::Inner, None, - PartitionMode::Partitioned, + PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, ) .unwrap(), @@ -766,12 +767,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() { @r" OptimizationTest: input: - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true output: Ok: - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] ", @@ -800,13 +801,233 @@ async fn test_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] " ); } +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Rouugh plan we're trying to recreate: + // COPY (select i as k from generate_series(1, 10000000) as t(i)) + // TO 'test_files/scratch/push_down_filter/t1.parquet' + // STORED AS PARQUET; + // COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) + // TO 'test_files/scratch/push_down_filter/t2.parquet' + // STORED AS PARQUET; + // create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; + // create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; + // explain + // select * + // from t1 + // join t2 on t1.k = t2.k; + // +---------------+------------------------------------------------------------+ + // | plan_type | plan | + // +---------------+------------------------------------------------------------+ + // | physical_plan | ┌───────────────────────────┐ | + // | | │ CoalesceBatchesExec │ | + // | | │ -------------------- │ | + // | | │ target_batch_size: │ | + // | | │ 8192 │ | + // | | └─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐ | + // | | │ HashJoinExec │ | + // | | │ -------------------- ├──────────────┐ | + // | | │ on: (k = k) │ │ | + // | | └─────────────┬─────────────┘ │ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ target_batch_size: ││ target_batch_size: │ | + // | | │ 8192 ││ 8192 │ | + // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ RepartitionExec ││ RepartitionExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ partition_count(in->out): ││ partition_count(in->out): │ | + // | | │ 12 -> 12 ││ 12 -> 12 │ | + // | | │ ││ │ | + // | | │ partitioning_scheme: ││ partitioning_scheme: │ | + // | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ | + // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ DataSourceExec ││ DataSourceExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ files: 12 ││ files: 12 │ | + // | | │ format: parquet ││ format: parquet │ | + // | | │ ││ predicate: true │ | + // | | └───────────────────────────┘└───────────────────────────┘ | + // | | | + // +---------------+------------------------------------------------------------+ + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) // Extra column not used in join + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create RepartitionExec nodes for both sides with hash partitioning on join keys + let partition_count = 12; + + // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + let build_hash_exprs = vec![ + col("a", &build_side_schema).unwrap(), + col("b", &build_side_schema).unwrap(), + ]; + let build_repartition = Arc::new( + RepartitionExec::try_new( + build_scan, + Partitioning::Hash(build_hash_exprs, partition_count), + ) + .unwrap(), + ); + let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192)); + + // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + let probe_hash_exprs = vec![ + col("a", &probe_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ]; + let probe_repartition = Arc::new( + RepartitionExec::try_new( + probe_scan, + Partitioning::Hash(probe_hash_exprs, partition_count), + ) + .unwrap(), + ); + let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); + + // Create HashJoinExec with partitioned inputs + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let hash_join = Arc::new( + HashJoinExec::try_new( + build_coalesce, + probe_coalesce, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Top-level CoalesceBatchesExec + let cb = + Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; + // Top-level CoalesceParititionsExec + let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and execute to see the filter in action + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + if let Some(batch_result) = stream.next().await { + batch_result.unwrap(); + } + + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] + " + ); +} + #[tokio::test] async fn test_nested_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; @@ -946,9 +1167,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] " diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 90e55afe5aef..e32d3c9a83a5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -20,7 +20,7 @@ use std::fmt; use std::mem::size_of; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::task::Poll; use std::{any::Any, vec}; @@ -98,6 +98,262 @@ use parking_lot::Mutex; const HASH_JOIN_SEED: RandomState = RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +/// Represents the minimum and maximum values for a specific column. +/// Used in dynamic filter pushdown to establish value boundaries. +#[derive(Debug, Clone, PartialEq)] +struct ColumnBounds { + /// The minimum value observed for this column + min: ScalarValue, + /// The maximum value observed for this column + max: ScalarValue, +} + +impl ColumnBounds { + fn new(min: ScalarValue, max: ScalarValue) -> Self { + Self { min, max } + } +} + +/// Represents the bounds for all join key columns from a single partition. +/// This contains the min/max values computed from one partition's build-side data. +#[derive(Debug, Clone)] +struct PartitionBounds { + /// Min/max bounds for each join key column in this partition. + /// Index corresponds to the join key expression index. + column_bounds: Vec, +} + +impl PartitionBounds { + fn new(column_bounds: Vec) -> Self { + Self { column_bounds } + } + + fn len(&self) -> usize { + self.column_bounds.len() + } + + fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> { + self.column_bounds.get(index) + } +} + +/// Coordinates dynamic filter bounds collection across multiple partitions +/// +/// This structure ensures that dynamic filters are built with complete information from all +/// relevant partitions before being applied to probe-side scans. Incomplete filters would +/// incorrectly eliminate valid join results. +/// +/// ## Synchronization Strategy +/// +/// 1. Each partition computes bounds from its build-side data +/// 2. Bounds are stored in the shared HashMap (indexed by partition_id) +/// 3. A counter tracks how many partitions have reported their bounds +/// 4. When the last partition reports (completed == total), bounds are merged and filter is updated +/// +/// ## Partition Counting +/// +/// The `total_partitions` count represents how many times `collect_build_side` will be called: +/// - **CollectLeft**: Number of output partitions (each accesses shared build data) +/// - **Partitioned**: Number of input partitions (each builds independently) +/// +/// ## Thread Safety +/// +/// All fields use a single mutex to ensure correct coordination between concurrent +/// partition executions. +struct SharedBoundsAccumulator { + /// Shared state protected by a single mutex to avoid ordering concerns + inner: Mutex, + /// Total number of partitions. + /// Need to know this so that we can update the dynamic filter once we are done + /// building *all* of the hash tables. + total_partitions: usize, + /// Dynamic filter for pushdown to probe side + dynamic_filter: Arc, + /// Right side join expressions needed for creating filter bounds + on_right: Vec, +} + +/// State protected by SharedBoundsAccumulator's mutex +struct SharedBoundsState { + /// Bounds from completed partitions. + /// Each element represents the column bounds computed by one partition. + bounds: Vec, + /// Number of partitions that have reported completion. + completed_partitions: usize, +} + +impl SharedBoundsAccumulator { + /// Creates a new SharedBoundsAccumulator configured for the given partition mode + /// + /// This method calculates how many times `collect_build_side` will be called based on the + /// partition mode's execution pattern. This count is critical for determining when we have + /// complete information from all partitions to build the dynamic filter. + /// + /// ## Partition Mode Execution Patterns + /// + /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut` + /// across all output partitions. Each output partition calls `collect_build_side` to access + /// the shared build data. Expected calls = number of output partitions. + /// + /// - **Partitioned**: Each partition independently builds its own hash table by calling + /// `collect_build_side` once. Expected calls = number of build partitions. + /// + /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since + /// the actual mode will be determined and a new bounds_accumulator created before execution. + /// + /// ## Why This Matters + /// + /// We cannot build a partial filter from some partitions - it would incorrectly eliminate + /// valid join results. We must wait until we have complete bounds information from ALL + /// relevant partitions before updating the dynamic filter. + fn new_from_partition_mode( + partition_mode: PartitionMode, + left_child: &dyn ExecutionPlan, + right_child: &dyn ExecutionPlan, + dynamic_filter: Arc, + on_right: Vec, + ) -> Self { + // Troubleshooting: If partition counts are incorrect, verify this logic matches + // the actual execution pattern in collect_build_side() + let expected_calls = match partition_mode { + // Each output partition accesses shared build data + PartitionMode::CollectLeft => { + right_child.output_partitioning().partition_count() + } + // Each partition builds its own data + PartitionMode::Partitioned => { + left_child.output_partitioning().partition_count() + } + // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) + PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), + }; + Self { + inner: Mutex::new(SharedBoundsState { + bounds: Vec::with_capacity(expected_calls), + completed_partitions: 0, + }), + total_partitions: expected_calls, + dynamic_filter, + on_right, + } + } + + /// Create a filter expression from individual partition bounds using OR logic. + /// + /// This creates a filter where each partition's bounds form a conjunction (AND) + /// of column range predicates, and all partitions are combined with OR. + /// + /// For example, with 2 partitions and 2 columns: + /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) + /// OR + /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1)) + fn create_filter_from_partition_bounds( + &self, + bounds: &[PartitionBounds], + ) -> Result> { + if bounds.is_empty() { + return Ok(lit(true)); + } + + // Create a predicate for each partition + let mut partition_predicates = Vec::with_capacity(bounds.len()); + + for partition_bounds in bounds.iter() { + // Create range predicates for each join key in this partition + let mut column_predicates = Vec::with_capacity(partition_bounds.len()); + + for (col_idx, right_expr) in self.on_right.iter().enumerate() { + if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) as Arc; + let range_expr = + Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc; + column_predicates.push(range_expr); + } + } + + // Combine all column predicates for this partition with AND + if !column_predicates.is_empty() { + let partition_predicate = column_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap(); + partition_predicates.push(partition_predicate); + } + } + + // Combine all partition predicates with OR + let combined_predicate = partition_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) + as Arc + }) + .unwrap_or_else(|| lit(true)); + + Ok(combined_predicate) + } + + /// Report bounds from a completed partition and update dynamic filter if all partitions are done + /// + /// This method coordinates the dynamic filter updates across all partitions. It stores the + /// bounds from the current partition, increments the completion counter, and when all + /// partitions have reported, creates an OR'd filter from individual partition bounds. + /// + /// # Arguments + /// * `partition_bounds` - The bounds computed by this partition (if any) + /// + /// # Returns + /// * `Result<()>` - Ok if successful, Err if filter update failed + fn report_partition_bounds( + &self, + partition_bounds: Option>, + ) -> Result<()> { + let mut inner = self.inner.lock(); + + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = partition_bounds { + // Only push actual bounds if they exist + inner.bounds.push(PartitionBounds::new(bounds)); + } + + // Increment the completion counter + // Even empty partitions must report to ensure proper termination + inner.completed_partitions += 1; + let completed = inner.completed_partitions; + let total_partitions = self.total_partitions; + + // Critical synchronization point: Only update the filter when ALL partitions are complete + // Troubleshooting: If you see "completed > total_partitions", check partition + // count calculation in new_from_partition_mode() - it may not match actual execution calls + if completed == total_partitions && !inner.bounds.is_empty() { + let filter_expr = self.create_filter_from_partition_bounds(&inner.bounds)?; + self.dynamic_filter.update(filter_expr)?; + } + + Ok(()) + } +} + +impl fmt::Debug for SharedBoundsAccumulator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SharedBoundsAccumulator") + } +} + /// HashTable and input data for the left (build side) of a join struct JoinLeftData { /// The hash table with indices into `batch` @@ -116,6 +372,8 @@ struct JoinLeftData { /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption. /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle. _reservation: MemoryReservation, + /// Bounds computed from the build side for dynamic filter pushdown + bounds: Option>, } impl JoinLeftData { @@ -127,6 +385,7 @@ impl JoinLeftData { visited_indices_bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, reservation: MemoryReservation, + bounds: Option>, ) -> Self { Self { hash_map, @@ -135,6 +394,7 @@ impl JoinLeftData { visited_indices_bitmap, probe_threads_counter, _reservation: reservation, + bounds, } } @@ -365,7 +625,12 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side + /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result dynamic_filter: Option>, + /// Shared bounds accumulator for coordinating dynamic filter updates across partitions + /// Only created when dynamic filter pushdown is enabled. + /// Lazily initialized at execution time to use actual runtime partition counts + bounds_accumulator: Option>>, } impl fmt::Debug for HashJoinExec { @@ -434,6 +699,9 @@ impl HashJoinExec { projection.as_ref(), )?; + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled + Ok(HashJoinExec { left, right, @@ -450,6 +718,7 @@ impl HashJoinExec { null_equality, cache, dynamic_filter: None, + bounds_accumulator: None, }) } @@ -706,24 +975,10 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); - let dynamic_filter_display = match self.dynamic_filter.as_ref() { - Some(dynamic_filter) => match dynamic_filter.current() { - Ok(current) if current != lit(true) => { - format!(", filter=[{current}]") - } - _ => "".to_string(), - }, - None => "".to_string(), - }; write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", - self.mode, - self.join_type, - on, - display_filter, - display_projections, - dynamic_filter_display + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", + self.mode, self.join_type, on, display_filter, display_projections, ) } DisplayFormatType::TreeRender => { @@ -806,25 +1061,45 @@ impl ExecutionPlan for HashJoinExec { vec![&self.left, &self.right] } + /// Creates a new HashJoinExec with different children while preserving configuration. + /// + /// This method is called during query optimization when the optimizer creates new + /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new` + /// rather than cloning the existing one because partitioning may have changed. fn with_new_children( self: Arc, children: Vec>, ) -> Result> { - let new_join = HashJoinExec::try_new( - Arc::clone(&children[0]), - Arc::clone(&children[1]), - self.on.clone(), - self.filter.clone(), - &self.join_type, - self.projection.clone(), - self.mode, - self.null_equality, - )?; - Ok(Arc::new(new_join)) + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&children[0]), + right: Arc::clone(&children[1]), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: Self::compute_properties( + &children[0], + &children[1], + Arc::clone(&self.join_schema), + self.join_type, + &self.on, + self.mode, + self.projection.as_ref(), + )?, + // Keep the dynamic filter, bounds accumulator will be reset + dynamic_filter: self.dynamic_filter.clone(), + bounds_accumulator: None, + })) } fn reset_state(self: Arc) -> Result> { - // Reset the left_fut to allow re-execution Ok(Arc::new(HashJoinExec { left: Arc::clone(&self.left), right: Arc::clone(&self.right), @@ -832,6 +1107,7 @@ impl ExecutionPlan for HashJoinExec { filter: self.filter.clone(), join_type: self.join_type, join_schema: Arc::clone(&self.join_schema), + // Reset the left_fut to allow re-execution left_fut: Arc::new(OnceAsync::default()), random_state: self.random_state.clone(), mode: self.mode, @@ -840,7 +1116,9 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), + // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, + bounds_accumulator: None, })) } @@ -854,11 +1132,6 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| Arc::clone(&on.0)) .collect::>(); - let on_right = self - .on - .iter() - .map(|on| Arc::clone(&on.1)) - .collect::>(); let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); @@ -877,11 +1150,7 @@ impl ExecutionPlan for HashJoinExec { ); } - let enable_dynamic_filter_pushdown = context - .session_config() - .options() - .optimizer - .enable_dynamic_filter_pushdown; + let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { @@ -899,10 +1168,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), - enable_dynamic_filter_pushdown - .then_some(self.dynamic_filter.clone()) - .flatten(), - on_right.clone(), + enable_dynamic_filter_pushdown, )) })?, PartitionMode::Partitioned => { @@ -920,10 +1186,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, - enable_dynamic_filter_pushdown - .then_some(self.dynamic_filter.clone()) - .flatten(), - on_right.clone(), + enable_dynamic_filter_pushdown, )) } PartitionMode::Auto => { @@ -936,6 +1199,34 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); + // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled) + let bounds_accumulator = if enable_dynamic_filter_pushdown + && self.dynamic_filter.is_some() + { + if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator { + let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap()); + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + + Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| { + Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + dynamic_filter, + on_right, + )) + }))) + } else { + None + } + } else { + None + }; + // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; @@ -949,6 +1240,12 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + Ok(Box::pin(HashJoinStream { schema: self.schema(), on_right, @@ -964,6 +1261,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), + bounds_accumulator, })) } @@ -1122,6 +1420,7 @@ impl ExecutionPlan for HashJoinExec { null_equality: self.null_equality, cache: self.cache.clone(), dynamic_filter: Some(dynamic_filter), + bounds_accumulator: Some(OnceLock::new()), }); result = result.with_updated_node(new_node as Arc); } @@ -1131,13 +1430,13 @@ impl ExecutionPlan for HashJoinExec { } /// Compute min/max bounds for each column in the given arrays -fn compute_bounds(arrays: &[ArrayRef]) -> Result> { +fn compute_bounds(arrays: &[ArrayRef]) -> Result> { arrays .iter() .map(|array| { if array.is_empty() { // Return NULL values for empty arrays - return Ok(( + return Ok(ColumnBounds::new( ScalarValue::try_from(array.data_type())?, ScalarValue::try_from(array.data_type())?, )); @@ -1147,14 +1446,40 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result let min_val = min_batch(array)?; let max_val = max_batch(array)?; - Ok((min_val, max_val)) + Ok(ColumnBounds::new(min_val, max_val)) }) .collect() } -/// Reads the left (build) side of the input, buffering it in memory, to build a -/// hash table (`LeftJoinData`) #[expect(clippy::too_many_arguments)] +/// Collects all batches from the left (build) side stream and creates a hash map for joining. +/// +/// This function is responsible for: +/// 1. Consuming the entire left stream and collecting all batches into memory +/// 2. Building a hash map from the join key columns for efficient probe operations +/// 3. Computing bounds for dynamic filter pushdown (if enabled) +/// 4. Preparing visited indices bitmap for certain join types +/// +/// # Parameters +/// * `random_state` - Random state for consistent hashing across partitions +/// * `left_stream` - Stream of record batches from the build side +/// * `on_left` - Physical expressions for the left side join keys +/// * `metrics` - Metrics collector for tracking memory usage and row counts +/// * `reservation` - Memory reservation tracker for the hash table and data +/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) +/// * `probe_threads_count` - Number of threads that will probe this hash table +/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering +/// +/// # Dynamic Filter Coordination +/// When `should_compute_bounds` is true, this function computes the min/max bounds +/// for each join key column but does NOT update the dynamic filter. Instead, the +/// bounds are stored in the returned `JoinLeftData` and later coordinated by +/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds +/// before updating the filter exactly once. +/// +/// # Returns +/// `JoinLeftData` containing the hash map, consolidated batch, join key values, +/// visited indices bitmap, and computed bounds (if requested). async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1163,8 +1488,7 @@ async fn collect_left_input( reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, - dynamic_filter: Option>, - on_right: Vec, + should_compute_bounds: bool, ) -> Result { let schema = left_stream.schema(); @@ -1254,6 +1578,13 @@ async fn collect_left_input( }) .collect::>>()?; + // Compute bounds for dynamic filter if enabled + let bounds = if should_compute_bounds && num_rows > 0 { + Some(compute_bounds(&left_values)?) + } else { + None + }; + let data = JoinLeftData::new( hashmap, single_batch, @@ -1261,49 +1592,9 @@ async fn collect_left_input( Mutex::new(visited_indices_bitmap), AtomicUsize::new(probe_threads_count), reservation, + bounds, ); - // Update dynamic filter with min/max bounds if provided - if let Some(dynamic_filter) = dynamic_filter { - if num_rows > 0 { - let bounds = compute_bounds(&left_values)?; - - // Create range predicates for each join key - let mut predicates = Vec::with_capacity(bounds.len()); - for ((min_val, max_val), right_expr) in bounds.iter().zip(on_right.iter()) { - // Create predicate: col >= min AND col <= max - let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::GtEq, - lit(min_val.clone()), - )) as Arc; - - let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::LtEq, - lit(max_val.clone()), - )) as Arc; - - let range_expr = - Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) - as Arc; - - predicates.push(range_expr); - } - - // Combine all predicates with AND - let combined_predicate = predicates - .into_iter() - .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::And, pred)) - as Arc - }) - .unwrap_or_else(|| lit(true)); - - dynamic_filter.update(combined_predicate)?; - } - } - Ok(data) } @@ -1499,6 +1790,8 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, + /// Shared bounds accumulator for coordinating dynamic filter updates (optional) + bounds_accumulator: Option>, } impl RecordBatchStream for HashJoinStream { @@ -1688,6 +1981,14 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); + // Handle dynamic filter bounds accumulation + // + // Dynamic filter coordination between partitions: + // Report bounds to the accumulator which will handle synchronization and filter updates + if let Some(ref bounds_accumulator) = self.bounds_accumulator { + bounds_accumulator.report_partition_bounds(left_data.bounds.clone())?; + } + self.state = HashJoinStreamState::FetchProbeBatch; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index 0082da643b05..8bbd47a07be4 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -301,3 +301,78 @@ drop table large_table; statement ok drop table t; + +# Regression test for https://github.com/apache/datafusion/issues/17188 +query I +COPY (select i as k from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter/t1.parquet' +STORED AS PARQUET; +---- +10000000 + +query I +COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter/t2.parquet' +STORED AS PARQUET; +---- +10000000 + +statement ok +create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; + +statement ok +create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; + +# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random +# So we'll run the same query a couple of times just to have more certainty it's fixed +# Sorry about the spam in this slt test... + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 From 43ec445e66f631111b546e395030563688679fb0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 29 Jul 2025 13:03:43 -0500 Subject: [PATCH 7/8] disallow pushdown of volatile functions (#16861) * dissallow pushdown of volatile PhysicalExprs * fix * add FilteredVec helper to handle filter / remap pattern (#34) * checkpoint: Address PR feedback in https://github.com/apach... * add FilteredVec to consolidate handling of filter / remap pattern * lint * Add slt test for pushing volatile predicates down (#35) --------- Co-authored-by: Andrew Lamb (cherry picked from commit 94e85488df31738b7c83d57015f51440e285feff) --- .../physical_optimizer/filter_pushdown/mod.rs | 40 +- .../filter_pushdown/util.rs | 5 +- .../physical-optimizer/src/filter_pushdown.rs | 358 ++++++++++++++++-- .../test_files/parquet_filter_pushdown.slt | 15 + 4 files changed, 380 insertions(+), 38 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 71635b929f84..6d3474924bda 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -34,8 +34,12 @@ use datafusion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_expr::ScalarUDF; +use datafusion_functions::math::random::RandomFunc; use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning}; +use datafusion_physical_expr::{ + aggregate::AggregateExprBuilder, Partitioning, ScalarFunctionExpr, +}; use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::{ filter_pushdown::FilterPushdown, PhysicalOptimizerRule, @@ -77,6 +81,40 @@ fn test_pushdown_into_scan() { ); } +#[test] +fn test_pushdown_volatile_functions_not_allowed() { + // Test that we do not push down filters with volatile functions + // Use random() as an example of a volatile function + let scan = TestScanBuilder::new(schema()).with_support(true).build(); + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("a", &schema()).unwrap()), + Operator::Eq, + Arc::new( + ScalarFunctionExpr::try_new( + Arc::new(ScalarUDF::from(RandomFunc::new())), + vec![], + &schema(), + ) + .unwrap(), + ), + )) as Arc; + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + // expect the filter to not be pushed down + insta::assert_snapshot!( + OptimizationTest::new(plan, FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = random() + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = random() + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + ", + ); +} + /// Show that we can use config options to determine how to do pushdown. #[test] fn test_pushdown_into_scan_with_config_options() { diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index ea12bea1cf89..acb2b808ef8f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -26,7 +26,6 @@ use datafusion_datasource::{ file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile, }; -use datafusion_physical_expr::conjunction; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter_pushdown::{FilterPushdownPhase, PushedDown}; @@ -224,7 +223,9 @@ impl FileSource for TestSource { filters.push(Arc::clone(internal)); } let new_node = Arc::new(TestSource { - predicate: Some(conjunction(filters.clone())), + predicate: datafusion_physical_expr::utils::conjunction_opt( + filters.clone(), + ), ..self.clone() }); Ok(FilterPushdownPropagation::with_parent_pushdown_result( diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index 66ccc1a79853..2838d1d1f243 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -35,7 +35,9 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_expr_common::signature::Volatility; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase, @@ -447,8 +449,15 @@ fn push_down_filters( let mut new_children = Vec::with_capacity(node.children().len()); let children = node.children(); - let filter_description = - node.gather_filters_for_pushdown(phase, parent_predicates.clone(), config)?; + + // Filter out expressions that are not allowed for pushdown + let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr); + + let filter_description = node.gather_filters_for_pushdown( + phase, + parent_filtered.items().to_vec(), + config, + )?; let filter_description_parent_filters = filter_description.parent_filters(); let filter_description_self_filters = filter_description.self_filters(); @@ -485,27 +494,21 @@ fn push_down_filters( // currently. `self_filters` are the predicates which are provided by the current node, // and tried to be pushed down over the child similarly. - let num_self_filters = self_filters.len(); - let mut all_predicates = self_filters.clone(); - - // Track which parent filters are supported for this child - let mut parent_filter_indices = vec![]; - - // Iterate over each predicate coming from the parent - for (parent_filter_idx, filter) in parent_filters.into_iter().enumerate() { - // Check if we can push this filter down to our child. - // These supports are defined in `gather_filters_for_pushdown()` - match filter.discriminant { - PushedDown::Yes => { - // Queue this filter up for pushdown to this child - all_predicates.push(filter.predicate); - parent_filter_indices.push(parent_filter_idx); - } - PushedDown::No => { - // This filter won't be pushed down to this child - // Will be marked as unsupported later in the initialization loop - } - } + // Filter out self_filters that contain volatile expressions and track indices + let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr); + + let num_self_filters = self_filtered.len(); + let mut all_predicates = self_filtered.items().to_vec(); + + // Apply second filter pass: collect indices of parent filters that can be pushed down + let parent_filters_for_child = parent_filtered + .chain_filter_slice(&parent_filters, |filter| { + matches!(filter.discriminant, PushedDown::Yes) + }); + + // Add the filtered parent predicates to all_predicates + for filter in parent_filters_for_child.items() { + all_predicates.push(Arc::clone(&filter.predicate)); } let num_parent_filters = all_predicates.len() - num_self_filters; @@ -540,13 +543,18 @@ fn push_down_filters( .split_off(num_self_filters) .into_iter() .collect_vec(); - self_filters_pushdown_supports.push( - all_filters - .into_iter() - .zip(self_filters) - .map(|(s, f)| s.wrap_expression(f)) - .collect(), - ); + // Map the results from filtered self filters back to their original positions using FilteredVec + let mapped_self_results = + self_filtered.map_results_to_original(all_filters, PushedDown::No); + + // Wrap each result with its corresponding expression + let self_filter_results: Vec<_> = mapped_self_results + .into_iter() + .zip(self_filters) + .map(|(support, filter)| support.wrap_expression(filter)) + .collect(); + + self_filters_pushdown_supports.push(self_filter_results); // Start by marking all parent filters as unsupported for this child for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() { @@ -558,11 +566,13 @@ fn push_down_filters( ); } // Map results from pushed-down filters back to original parent filter indices - for (result_idx, parent_filter_support) in parent_filters.into_iter().enumerate() - { - let original_parent_idx = parent_filter_indices[result_idx]; - parent_filter_pushdown_supports[original_parent_idx][child_idx] = - parent_filter_support; + let mapped_parent_results = parent_filters_for_child + .map_results_to_original(parent_filters, PushedDown::No); + + // Update parent_filter_pushdown_supports with the mapped results + // mapped_parent_results already has the results at their original indices + for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() { + support[child_idx] = mapped_parent_results[idx]; } } @@ -597,3 +607,281 @@ fn push_down_filters( } Ok(res) } + +/// A helper structure for filtering elements from a vector through multiple passes while +/// tracking their original indices, allowing results to be mapped back to the original positions. +struct FilteredVec { + items: Vec, + // Chain of index mappings: each Vec maps from current level to previous level + // index_mappings[0] maps from first filter to original indices + // index_mappings[1] maps from second filter to first filter indices, etc. + index_mappings: Vec>, + original_len: usize, +} + +impl FilteredVec { + /// Creates a new FilteredVec by filtering items based on the given predicate + fn new(items: &[T], predicate: F) -> Self + where + F: Fn(&T) -> bool, + { + let mut filtered_items = Vec::new(); + let mut original_indices = Vec::new(); + + for (idx, item) in items.iter().enumerate() { + if predicate(item) { + filtered_items.push(item.clone()); + original_indices.push(idx); + } + } + + Self { + items: filtered_items, + index_mappings: vec![original_indices], + original_len: items.len(), + } + } + + /// Returns a reference to the filtered items + fn items(&self) -> &[T] { + &self.items + } + + /// Returns the number of filtered items + fn len(&self) -> usize { + self.items.len() + } + + /// Maps results from the filtered items back to their original positions + /// Returns a vector with the same length as the original input, filled with default_value + /// and updated with results at their original positions + fn map_results_to_original( + &self, + results: Vec, + default_value: R, + ) -> Vec { + let mut mapped_results = vec![default_value; self.original_len]; + + for (result_idx, result) in results.into_iter().enumerate() { + let original_idx = self.trace_to_original_index(result_idx); + mapped_results[original_idx] = result; + } + + mapped_results + } + + /// Traces a filtered index back to its original index through all filter passes + fn trace_to_original_index(&self, mut current_idx: usize) -> usize { + // Work backwards through the chain of index mappings + for mapping in self.index_mappings.iter().rev() { + current_idx = mapping[current_idx]; + } + current_idx + } + + /// Apply a filter to a new set of items while chaining the index mapping from self (parent) + /// This is useful when you have filtered items and then get a transformed slice + /// (e.g., from gather_filters_for_pushdown) that you need to filter again + fn chain_filter_slice(&self, items: &[U], predicate: F) -> FilteredVec + where + F: Fn(&U) -> bool, + { + let mut filtered_items = Vec::new(); + let mut filtered_indices = Vec::new(); + + for (idx, item) in items.iter().enumerate() { + if predicate(item) { + filtered_items.push(item.clone()); + filtered_indices.push(idx); + } + } + + // Chain the index mappings from parent (self) + let mut index_mappings = self.index_mappings.clone(); + index_mappings.push(filtered_indices); + + FilteredVec { + items: filtered_items, + index_mappings, + original_len: self.original_len, + } + } +} + +fn allow_pushdown_for_expr(expr: &Arc) -> bool { + let mut allow_pushdown = true; + expr.apply(|e| { + allow_pushdown = allow_pushdown && allow_pushdown_for_expr_inner(e); + if allow_pushdown { + Ok(TreeNodeRecursion::Continue) + } else { + Ok(TreeNodeRecursion::Stop) + } + }) + .expect("Infallible traversal of PhysicalExpr tree failed"); + allow_pushdown +} + +fn allow_pushdown_for_expr_inner(expr: &Arc) -> bool { + if let Some(scalar_function) = + expr.as_any() + .downcast_ref::() + { + // Check if the function is volatile using the proper volatility API + if scalar_function.fun().signature().volatility == Volatility::Volatile { + // Volatile functions should not be pushed down + return false; + } + } + true +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_filtered_vec_single_pass() { + let items = vec![1, 2, 3, 4, 5, 6]; + let filtered = FilteredVec::new(&items, |&x| x % 2 == 0); + + // Check filtered items + assert_eq!(filtered.items(), &[2, 4, 6]); + assert_eq!(filtered.len(), 3); + + // Check index mapping + let results = vec!["a", "b", "c"]; + let mapped = filtered.map_results_to_original(results, "default"); + assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]); + } + + #[test] + fn test_filtered_vec_empty_filter() { + let items = vec![1, 3, 5]; + let filtered = FilteredVec::new(&items, |&x| x % 2 == 0); + + assert_eq!(filtered.items(), &[] as &[i32]); + assert_eq!(filtered.len(), 0); + + let results: Vec<&str> = vec![]; + let mapped = filtered.map_results_to_original(results, "default"); + assert_eq!(mapped, vec!["default", "default", "default"]); + } + + #[test] + fn test_filtered_vec_all_pass() { + let items = vec![2, 4, 6]; + let filtered = FilteredVec::new(&items, |&x| x % 2 == 0); + + assert_eq!(filtered.items(), &[2, 4, 6]); + assert_eq!(filtered.len(), 3); + + let results = vec!["a", "b", "c"]; + let mapped = filtered.map_results_to_original(results, "default"); + assert_eq!(mapped, vec!["a", "b", "c"]); + } + + #[test] + fn test_chain_filter_slice_different_types() { + // First pass: filter numbers + let numbers = vec![1, 2, 3, 4, 5, 6]; + let first_pass = FilteredVec::new(&numbers, |&x| x > 3); + assert_eq!(first_pass.items(), &[4, 5, 6]); + + // Transform to strings (simulating gather_filters_for_pushdown transformation) + let strings = vec!["four", "five", "six"]; + + // Second pass: filter strings that contain 'i' + let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i')); + assert_eq!(second_pass.items(), &["five", "six"]); + + // Map results back to original indices + let results = vec![100, 200]; + let mapped = second_pass.map_results_to_original(results, 0); + // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6) + assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]); + } + + #[test] + fn test_chain_filter_slice_complex_scenario() { + // Simulating the filter pushdown scenario + // Parent predicates: [A, B, C, D, E] + let parent_predicates = vec!["A", "B", "C", "D", "E"]; + + // First pass: filter out some predicates (simulating allow_pushdown_for_expr) + let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D"); + assert_eq!(first_pass.items(), &["A", "C", "E"]); + + // After gather_filters_for_pushdown, we get transformed results for a specific child + // Let's say child gets [A_transformed, C_transformed, E_transformed] + // but only C and E can be pushed down + #[derive(Clone, Debug, PartialEq)] + struct TransformedPredicate { + name: String, + can_push: bool, + } + + let child_predicates = vec![ + TransformedPredicate { + name: "A_transformed".to_string(), + can_push: false, + }, + TransformedPredicate { + name: "C_transformed".to_string(), + can_push: true, + }, + TransformedPredicate { + name: "E_transformed".to_string(), + can_push: true, + }, + ]; + + // Second pass: filter based on can_push + let second_pass = + first_pass.chain_filter_slice(&child_predicates, |p| p.can_push); + assert_eq!(second_pass.len(), 2); + assert_eq!(second_pass.items()[0].name, "C_transformed"); + assert_eq!(second_pass.items()[1].name, "E_transformed"); + + // Simulate getting results back from child + let child_results = vec!["C_result", "E_result"]; + let mapped = second_pass.map_results_to_original(child_results, "no_result"); + + // Results should be at original positions: C was at index 2, E was at index 4 + assert_eq!( + mapped, + vec![ + "no_result", + "no_result", + "C_result", + "no_result", + "E_result" + ] + ); + } + + #[test] + fn test_trace_to_original_index() { + let items = vec![10, 20, 30, 40, 50]; + let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40); + + // filtered items are [10, 30, 50] at original indices [0, 2, 4] + assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0 + assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2 + assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4 + } + + #[test] + fn test_chain_filter_preserves_original_len() { + let items = vec![1, 2, 3, 4, 5]; + let first = FilteredVec::new(&items, |&x| x > 2); + + let strings = vec!["three", "four", "five"]; + let second = first.chain_filter_slice(&strings, |s| s.len() == 4); + + // Original length should still be 5 + let results = vec!["x", "y"]; + let mapped = second.map_results_to_original(results, "-"); + assert_eq!(mapped.len(), 5); + } +} diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 61f4d6fc12a3..9c20941e211e 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -405,6 +405,21 @@ physical_plan 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)] + +# should not push down volatile predicates such as RANDOM +# expect that the random predicate is evaluated after the scan +query TT +EXPLAIN select a from t_pushdown where b > random(); +---- +logical_plan +01)Projection: t_pushdown.a +02)--Filter: CAST(t_pushdown.b AS Float64) > random() +03)----TableScan: t_pushdown projection=[a, b] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: CAST(b@1 AS Float64) > random(), projection=[a@0] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet + ## cleanup statement ok DROP TABLE t; From 1650a566496e895bc033fccc711668272b1596b8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 5 Sep 2025 04:39:44 -0500 Subject: [PATCH 8/8] fix bounds accumulator reset in HashJoinExec dynamic filter pushdown (#17371) --- .../physical_optimizer/filter_pushdown/mod.rs | 5 ++ .../physical-plan/src/joins/hash_join.rs | 75 ++++++++++--------- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 6d3474924bda..8f237c24ae8f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -823,6 +823,11 @@ async fn test_hashjoin_dynamic_filter_pushdown() { let plan = FilterPushdown::new_post_optimization() .optimize(plan, &config) .unwrap(); + + // Test for https://github.com/apache/datafusion/pull/17371: dynamic filter linking survives `with_new_children` + let children = plan.children().into_iter().map(Arc::clone).collect(); + let plan = plan.with_new_children(children).unwrap(); + let config = SessionConfig::new().with_batch_size(10); let session_ctx = SessionContext::new_with_config(config); session_ctx.register_object_store( diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e32d3c9a83a5..ec228d4b40b8 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -625,12 +625,18 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side - /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result - dynamic_filter: Option>, - /// Shared bounds accumulator for coordinating dynamic filter updates across partitions - /// Only created when dynamic filter pushdown is enabled. - /// Lazily initialized at execution time to use actual runtime partition counts - bounds_accumulator: Option>>, + /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. + /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. + dynamic_filter: Option, +} + +#[derive(Clone)] +struct HashJoinExecDynamicFilter { + /// Dynamic filter that we'll update with the results of the build side once that is done. + filter: Arc, + /// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition. + /// It is lazily initialized during execution to make sure we use the actual execution time partition counts. + bounds_accumulator: OnceLock>, } impl fmt::Debug for HashJoinExec { @@ -718,7 +724,6 @@ impl HashJoinExec { null_equality, cache, dynamic_filter: None, - bounds_accumulator: None, }) } @@ -1095,7 +1100,6 @@ impl ExecutionPlan for HashJoinExec { )?, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), - bounds_accumulator: None, })) } @@ -1118,7 +1122,6 @@ impl ExecutionPlan for HashJoinExec { cache: self.cache.clone(), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, - bounds_accumulator: None, })) } @@ -1200,32 +1203,28 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled) - let bounds_accumulator = if enable_dynamic_filter_pushdown - && self.dynamic_filter.is_some() - { - if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator { - let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap()); - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); - - Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| { - Arc::new(SharedBoundsAccumulator::new_from_partition_mode( - self.mode, - self.left.as_ref(), - self.right.as_ref(), - dynamic_filter, - on_right, - )) - }))) - } else { - None - } - } else { - None - }; + let bounds_accumulator = enable_dynamic_filter_pushdown + .then(|| { + self.dynamic_filter.as_ref().map(|df| { + let filter = Arc::clone(&df.filter); + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + Some(Arc::clone(df.bounds_accumulator.get_or_init(|| { + Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + filter, + on_right, + )) + }))) + }) + }) + .flatten() + .flatten(); // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. @@ -1419,8 +1418,10 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), - dynamic_filter: Some(dynamic_filter), - bounds_accumulator: Some(OnceLock::new()), + dynamic_filter: Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }), }); result = result.with_updated_node(new_node as Arc); }