Skip to content
235 changes: 228 additions & 7 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use datafusion_physical_optimizer::{
use datafusion_physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
coalesce_batches::CoalesceBatchesExec,
coalesce_partitions::CoalescePartitionsExec,
filter::FilterExec,
repartition::RepartitionExec,
sorts::sort::SortExec,
Expand Down Expand Up @@ -267,7 +268,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
format_plan_for_test(&plan),
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To other reviewers, the filters are still pushed down to the DataSourceExec (it just isn't reflected on the HashJoinExec itself0

- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
"
Expand Down Expand Up @@ -890,7 +891,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
Expand All @@ -902,12 +903,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
@r"
OptimizationTest:
input:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
",
Expand Down Expand Up @@ -936,13 +937,233 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb]
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
"
);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Rouugh plan we're trying to recreate:
// COPY (select i as k from generate_series(1, 10000000) as t(i))
// TO 'test_files/scratch/push_down_filter/t1.parquet'
// STORED AS PARQUET;
// COPY (select i as k, i as v from generate_series(1, 10000000) as t(i))
// TO 'test_files/scratch/push_down_filter/t2.parquet'
// STORED AS PARQUET;
// create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet';
// create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet';
// explain
// select *
// from t1
// join t2 on t1.k = t2.k;
// +---------------+------------------------------------------------------------+
// | plan_type | plan |
// +---------------+------------------------------------------------------------+
// | physical_plan | ┌───────────────────────────┐ |
// | | │ CoalesceBatchesExec │ |
// | | │ -------------------- │ |
// | | │ target_batch_size: │ |
// | | │ 8192 │ |
// | | └─────────────┬─────────────┘ |
// | | ┌─────────────┴─────────────┐ |
// | | │ HashJoinExec │ |
// | | │ -------------------- ├──────────────┐ |
// | | │ on: (k = k) │ │ |
// | | └─────────────┬─────────────┘ │ |
// | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
// | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ |
// | | │ -------------------- ││ -------------------- │ |
// | | │ target_batch_size: ││ target_batch_size: │ |
// | | │ 8192 ││ 8192 │ |
// | | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
// | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
// | | │ RepartitionExec ││ RepartitionExec │ |
// | | │ -------------------- ││ -------------------- │ |
// | | │ partition_count(in->out): ││ partition_count(in->out): │ |
// | | │ 12 -> 12 ││ 12 -> 12 │ |
// | | │ ││ │ |
// | | │ partitioning_scheme: ││ partitioning_scheme: │ |
// | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ |
// | | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
// | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
// | | │ DataSourceExec ││ DataSourceExec │ |
// | | │ -------------------- ││ -------------------- │ |
// | | │ files: 12 ││ files: 12 │ |
// | | │ format: parquet ││ format: parquet │ |
// | | │ ││ predicate: true │ |
// | | └───────────────────────────┘└───────────────────────────┘ |
// | | |
// +---------------+------------------------------------------------------------+

// Create build side with limited values
let build_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["ba", "bb"]),
("c", Float64, [1.0, 2.0]) // Extra column not used in join
)
.unwrap()];
let build_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
.with_batches(build_batches)
.build();

// Create probe side with more values
let probe_batches = vec![record_batch!(
("a", Utf8, ["aa", "ab", "ac", "ad"]),
("b", Utf8, ["ba", "bb", "bc", "bd"]),
("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
)
.unwrap()];
let probe_side_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Utf8, false),
Field::new("e", DataType::Float64, false),
]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches)
.build();

// Create RepartitionExec nodes for both sides with hash partitioning on join keys
let partition_count = 12;

// Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
let build_hash_exprs = vec![
col("a", &build_side_schema).unwrap(),
col("b", &build_side_schema).unwrap(),
];
let build_repartition = Arc::new(
RepartitionExec::try_new(
build_scan,
Partitioning::Hash(build_hash_exprs, partition_count),
)
.unwrap(),
);
let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192));

// Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
let probe_hash_exprs = vec![
col("a", &probe_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
];
let probe_repartition = Arc::new(
RepartitionExec::try_new(
probe_scan,
Partitioning::Hash(probe_hash_exprs, partition_count),
)
.unwrap(),
);
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));

// Create HashJoinExec with partitioned inputs
let on = vec![
(
col("a", &build_side_schema).unwrap(),
col("a", &probe_side_schema).unwrap(),
),
(
col("b", &build_side_schema).unwrap(),
col("b", &probe_side_schema).unwrap(),
),
];
let hash_join = Arc::new(
HashJoinExec::try_new(
build_coalesce,
probe_coalesce,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
);

// Top-level CoalesceBatchesExec
let cb =
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
// Top-level CoalesceParititionsExec
let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;

// expect the predicate to be pushed down into the probe side DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
output:
Ok:
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
"
);

// Actually apply the optimization to the plan and execute to see the filter in action
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
// Iterate one batch
if let Some(batch_result) = stream.next().await {
batch_result.unwrap();
}

// Now check what our filter looks like
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- CoalescePartitionsExec
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- CoalesceBatchesExec: target_batch_size=8192
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
"
);
}

#[tokio::test]
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
Expand Down Expand Up @@ -1082,9 +1303,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ]
"
Expand Down
Loading