Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,183 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

let build_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0]) // Extra column not used in join
)
.unwrap()];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values
let probe_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
)
.unwrap()];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create RepartitionExec nodes for both sides with hash partitioning on join keys
let partition_count = 12;

// Probe side: DataSource -> RepartitionExec(Hash) -> CoalesceBatchesExec
let probe_hash_exprs = vec![
col("a", &probe_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
];
let probe_repartition = Arc::new(
RepartitionExec::try_new(
Arc::clone(&probe_scan),
Partitioning::Hash(probe_hash_exprs, partition_count), // create multi partitions on probSide
)
.unwrap(),
);
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let hash_join = Arc::new(
HashJoinExec::try_new(
build_scan,
probe_coalesce,
on,
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

// Top-level CoalesceBatchesExec
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
// Top-level CoalescePartitionsExec
let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;
// Add a sort for determistic output
let plan = Arc::new(SortExec::new(
LexOrdering::new(vec![PhysicalSortExpr::new(
col("a", &probe_side_schema).unwrap(),
SortOptions::new(true, false), // descending, nulls_first
)])
.unwrap(),
cp,
)) as Arc<dyn ExecutionPlan>;

// expect the predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
"
);

// Actually apply the optimization to the plan and execute to see the filter in action
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// Now check what our filter looks like
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
"
);

let result = format!("{}", pretty_format_batches(&batches).unwrap());

let probe_scan_metrics = probe_scan.metrics().unwrap();

// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
// The number of output rows from the probe side scan should stay consistent across executions.
// Issue: https://github.com/apache/datafusion/issues/17451
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);

insta::assert_snapshot!(
result,
@r"
+----+----+-----+----+----+-----+
| a | b | c | a | b | e |
+----+----+-----+----+----+-----+
| ab | bb | 2.0 | ab | bb | 2.0 |
| aa | ba | 1.0 | aa | ba | 1.0 |
+----+----+-----+----+----+-----+
",
);
}

#[tokio::test]
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ impl ExecutionPlan for HashJoinExec {
vec![],
self.right.output_ordering().is_some(),
bounds_accumulator,
self.mode,
)))
}

Expand Down
30 changes: 22 additions & 8 deletions datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@ impl SharedBoundsAccumulator {
/// ## Partition Mode Execution Patterns
///
/// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut`
/// across all output partitions. Each output partition calls `collect_build_side` to access
/// the shared build data. Expected calls = number of output partitions.
/// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data.
/// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely.
/// Expected calls = number of output partitions.
///
///
/// - **Partitioned**: Each partition independently builds its own hash table by calling
/// `collect_build_side` once. Expected calls = number of build partitions.
Expand Down Expand Up @@ -260,22 +262,34 @@ impl SharedBoundsAccumulator {
/// consider making the resulting future shared so the ready result can be reused.
///
/// # Arguments
/// * `partition` - The partition identifier reporting its bounds
/// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
/// * `partition_bounds` - The bounds computed by this partition (if any)
///
/// # Returns
/// * `Result<()>` - Ok if successful, Err if filter update failed
pub(crate) async fn report_partition_bounds(
&self,
partition: usize,
left_side_partition_id: usize,
partition_bounds: Option<Vec<ColumnBounds>>,
) -> Result<()> {
// Store bounds in the accumulator - this runs once per partition
if let Some(bounds) = partition_bounds {
self.inner
.lock()
.bounds
.push(PartitionBounds::new(partition, bounds));
let mut guard = self.inner.lock();

let should_push = if let Some(last_bound) = guard.bounds.last() {
// In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
// Since this function can be called multiple times for that same partition, we must deduplicate
// by checking against the last recorded bound.
last_bound.partition != left_side_partition_id
} else {
true
};

if should_push {
guard
.bounds
.push(PartitionBounds::new(left_side_partition_id, bounds));
}
}

if self.barrier.wait().await.is_leader() {
Expand Down
16 changes: 14 additions & 2 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
use crate::joins::utils::{
equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut,
};
use crate::joins::PartitionMode;
use crate::{
handle_state,
hash_utils::create_hashes,
Expand Down Expand Up @@ -210,6 +211,9 @@ pub(super) struct HashJoinStream {
/// Optional future to signal when bounds have been reported by all partitions
/// and the dynamic filter has been updated
bounds_waiter: Option<OnceFut<()>>,

/// Partitioning mode to use
mode: PartitionMode,
}

impl RecordBatchStream for HashJoinStream {
Expand Down Expand Up @@ -312,6 +316,7 @@ impl HashJoinStream {
hashes_buffer: Vec<u64>,
right_side_ordered: bool,
bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
mode: PartitionMode,
) -> Self {
Self {
partition,
Expand All @@ -331,6 +336,7 @@ impl HashJoinStream {
right_side_ordered,
bounds_accumulator,
bounds_waiter: None,
mode,
}
}

Expand Down Expand Up @@ -406,11 +412,17 @@ impl HashJoinStream {
// Report bounds to the accumulator which will handle synchronization and filter updates
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
let bounds_accumulator = Arc::clone(bounds_accumulator);
let partition = self.partition;

let left_side_partition_id = match self.mode {
PartitionMode::Partitioned => self.partition,
PartitionMode::CollectLeft => 0,
PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"),
};

let left_data_bounds = left_data.bounds.clone();
self.bounds_waiter = Some(OnceFut::new(async move {
bounds_accumulator
.report_partition_bounds(partition, left_data_bounds)
.report_partition_bounds(left_side_partition_id, left_data_bounds)
.await
}));
self.state = HashJoinStreamState::WaitPartitionBoundsReport;
Expand Down