diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 118b860c5b18..9f588519ecac 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -1219,6 +1219,183 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ); } +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_collect_left() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) // Extra column not used in join + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create RepartitionExec nodes for both sides with hash partitioning on join keys + let partition_count = 12; + + // Probe side: DataSource -> RepartitionExec(Hash) -> CoalesceBatchesExec + let probe_hash_exprs = vec![ + col("a", &probe_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ]; + let probe_repartition = Arc::new( + RepartitionExec::try_new( + Arc::clone(&probe_scan), + Partitioning::Hash(probe_hash_exprs, partition_count), // create multi partitions on probSide + ) + .unwrap(), + ); + let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); + + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let hash_join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_coalesce, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Top-level CoalesceBatchesExec + let cb = + Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; + // Top-level CoalescePartitionsExec + let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + // Add a sort for determistic output + let plan = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + col("a", &probe_side_schema).unwrap(), + SortOptions::new(true, false), // descending, nulls_first + )]) + .unwrap(), + cp, + )) as Arc; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and execute to see the filter in action + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); + + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + " + ); + + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + + let probe_scan_metrics = probe_scan.metrics().unwrap(); + + // The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain. + // The number of output rows from the probe side scan should stay consistent across executions. + // Issue: https://github.com/apache/datafusion/issues/17451 + assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2); + + insta::assert_snapshot!( + result, + @r" + +----+----+-----+----+----+-----+ + | a | b | c | a | b | e | + +----+----+-----+----+----+-----+ + | ab | bb | 2.0 | ab | bb | 2.0 | + | aa | ba | 1.0 | aa | ba | 1.0 | + +----+----+-----+----+----+-----+ + ", + ); +} + #[tokio::test] async fn test_nested_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index cb697d460995..c8ed1960393c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1020,6 +1020,7 @@ impl ExecutionPlan for HashJoinExec { vec![], self.right.output_ordering().is_some(), bounds_accumulator, + self.mode, ))) } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 40dc4ac2e5d1..25f7a0de31ac 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -128,8 +128,10 @@ impl SharedBoundsAccumulator { /// ## Partition Mode Execution Patterns /// /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut` - /// across all output partitions. Each output partition calls `collect_build_side` to access - /// the shared build data. Expected calls = number of output partitions. + /// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data. + /// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely. + /// Expected calls = number of output partitions. + /// /// /// - **Partitioned**: Each partition independently builds its own hash table by calling /// `collect_build_side` once. Expected calls = number of build partitions. @@ -260,22 +262,34 @@ impl SharedBoundsAccumulator { /// consider making the resulting future shared so the ready result can be reused. /// /// # Arguments - /// * `partition` - The partition identifier reporting its bounds + /// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds /// * `partition_bounds` - The bounds computed by this partition (if any) /// /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed pub(crate) async fn report_partition_bounds( &self, - partition: usize, + left_side_partition_id: usize, partition_bounds: Option>, ) -> Result<()> { // Store bounds in the accumulator - this runs once per partition if let Some(bounds) = partition_bounds { - self.inner - .lock() - .bounds - .push(PartitionBounds::new(partition, bounds)); + let mut guard = self.inner.lock(); + + let should_push = if let Some(last_bound) = guard.bounds.last() { + // In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0). + // Since this function can be called multiple times for that same partition, we must deduplicate + // by checking against the last recorded bound. + last_bound.partition != left_side_partition_id + } else { + true + }; + + if should_push { + guard + .bounds + .push(PartitionBounds::new(left_side_partition_id, bounds)); + } } if self.barrier.wait().await.is_leader() { diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 4484eeabd326..adc00d9fe75e 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -28,6 +28,7 @@ use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator; use crate::joins::utils::{ equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut, }; +use crate::joins::PartitionMode; use crate::{ handle_state, hash_utils::create_hashes, @@ -210,6 +211,9 @@ pub(super) struct HashJoinStream { /// Optional future to signal when bounds have been reported by all partitions /// and the dynamic filter has been updated bounds_waiter: Option>, + + /// Partitioning mode to use + mode: PartitionMode, } impl RecordBatchStream for HashJoinStream { @@ -312,6 +316,7 @@ impl HashJoinStream { hashes_buffer: Vec, right_side_ordered: bool, bounds_accumulator: Option>, + mode: PartitionMode, ) -> Self { Self { partition, @@ -331,6 +336,7 @@ impl HashJoinStream { right_side_ordered, bounds_accumulator, bounds_waiter: None, + mode, } } @@ -406,11 +412,17 @@ impl HashJoinStream { // Report bounds to the accumulator which will handle synchronization and filter updates if let Some(ref bounds_accumulator) = self.bounds_accumulator { let bounds_accumulator = Arc::clone(bounds_accumulator); - let partition = self.partition; + + let left_side_partition_id = match self.mode { + PartitionMode::Partitioned => self.partition, + PartitionMode::CollectLeft => 0, + PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), + }; + let left_data_bounds = left_data.bounds.clone(); self.bounds_waiter = Some(OnceFut::new(async move { bounds_accumulator - .report_partition_bounds(partition, left_data_bounds) + .report_partition_bounds(left_side_partition_id, left_data_bounds) .await })); self.state = HashJoinStreamState::WaitPartitionBoundsReport;