Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 76 additions & 33 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
coalesce_batches::CoalesceBatchesExec,
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::FilterExec,
repartition::RepartitionExec,
sorts::sort::SortExec,
Expand Down Expand Up @@ -1095,34 +1096,45 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
// Top-level CoalesceParititionsExec
let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;
let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;
// Add a sort for determistic output
let plan = Arc::new(SortExec::new(
LexOrdering::new(vec![PhysicalSortExpr::new(
col("a", &probe_side_schema).unwrap(),
SortOptions::new(true, false), // descending, nulls_first
)])
.unwrap(),
cp,
)) as Arc<dyn ExecutionPlan>;

// expect the predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
"
);

Expand All @@ -1141,27 +1153,58 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
// Iterate one batch
if let Some(batch_result) = stream.next().await {
batch_result.unwrap();
}
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
.await
.unwrap();

// Now check what our filter looks like
#[cfg(not(feature = "force_hash_collisions"))]
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
"
);

#[cfg(feature = "force_hash_collisions")]
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
"
);

let result = format!("{}", pretty_format_batches(&batches).unwrap());

insta::assert_snapshot!(
result,
@r"
+----+----+-----+----+----+-----+
| a | b | c | a | b | e |
+----+----+-----+----+----+-----+
| ab | bb | 2.0 | ab | bb | 2.0 |
| aa | ba | 1.0 | aa | ba | 1.0 |
+----+----+-----+----+----+-----+
",
);
}

#[tokio::test]
Expand Down