Skip to content

add test for multi-column topk dynamic filter pushdown #17162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::{
};
use arrow_schema::SortOptions;
use datafusion::{
assert_batches_eq,
logical_expr::Operator,
physical_plan::{
expressions::{BinaryExpr, Column, Literal},
Expand Down Expand Up @@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() {
);
}

#[tokio::test]
async fn test_topk_dynamic_filter_pushdown_multi_column_sort() {
let batches = vec![
// We are going to do ORDER BY b ASC NULLS LAST, a DESC
// And we put the values in such a way that the first batch will fill the TopK
// and we skip the second batch.
record_batch!(
("a", Utf8, ["ac", "ad"]),
("b", Utf8, ["bb", "ba"]),
("c", Float64, [2.0, 1.0])
)
.unwrap(),
record_batch!(
("a", Utf8, ["aa", "ab"]),
("b", Utf8, ["bc", "bd"]),
("c", Float64, [1.0, 2.0])
)
.unwrap(),
];
let scan = TestScanBuilder::new(schema())
.with_support(true)
.with_batches(batches)
.build();
let plan = Arc::new(
SortExec::new(
LexOrdering::new(vec![
PhysicalSortExpr::new(
col("b", &schema()).unwrap(),
SortOptions::default().asc().nulls_last(),
),
PhysicalSortExpr::new(
col("a", &schema()).unwrap(),
SortOptions::default().desc().nulls_first(),
),
])
.unwrap(),
Arc::clone(&scan),
)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

// expect the predicate to be pushed down into the DataSource
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
output:
Ok:
- SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
"
);

// Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
// Iterate one batch
let res = stream.next().await.unwrap().unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+-----+",
"| a | b | c |",
"+----+----+-----+",
"| ad | ba | 1.0 |",
"| ac | bb | 2.0 |",
"+----+----+-----+",
];
assert_batches_eq!(expected, &[res]);
// Now check what our filter looks like
insta::assert_snapshot!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty cool (that you can see the predicate after the plan runs)

format!("{}", format_plan_for_test(&plan)),
@r"
- SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ]
"
);
// There should be no more batches
assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
Expand Down