diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 1639960fdeac..1a04753966a2 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -24,6 +24,7 @@ use arrow::{ }; use arrow_schema::SortOptions; use datafusion::{ + assert_batches_eq, logical_expr::Operator, physical_plan::{ expressions::{BinaryExpr, Column, Literal}, @@ -736,6 +737,101 @@ async fn test_topk_dynamic_filter_pushdown() { ); } +#[tokio::test] +async fn test_topk_dynamic_filter_pushdown_multi_column_sort() { + let batches = vec![ + // We are going to do ORDER BY b ASC NULLS LAST, a DESC + // And we put the values in such a way that the first batch will fill the TopK + // and we skip the second batch. + record_batch!( + ("a", Utf8, ["ac", "ad"]), + ("b", Utf8, ["bb", "ba"]), + ("c", Float64, [2.0, 1.0]) + ) + .unwrap(), + record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["bc", "bd"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap(), + ]; + let scan = TestScanBuilder::new(schema()) + .with_support(true) + .with_batches(batches) + .build(); + let plan = Arc::new( + SortExec::new( + LexOrdering::new(vec![ + PhysicalSortExpr::new( + col("b", &schema()).unwrap(), + SortOptions::default().asc().nulls_last(), + ), + PhysicalSortExpr::new( + col("a", &schema()).unwrap(), + SortOptions::default().desc().nulls_first(), + ), + ]) + .unwrap(), + Arc::clone(&scan), + ) + .with_fetch(Some(2)), + ) as Arc; + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + output: + Ok: + - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(2); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + let res = stream.next().await.unwrap().unwrap(); + #[rustfmt::skip] + let expected = [ + "+----+----+-----+", + "| a | b | c |", + "+----+----+-----+", + "| ad | ba | 1.0 |", + "| ac | bb | 2.0 |", + "+----+----+-----+", + ]; + assert_batches_eq!(expected, &[res]); + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: TopK(fetch=2), expr=[b@1 ASC NULLS LAST, a@0 DESC], preserve_partitioning=[false], filter=[b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 < bb OR b@1 = bb AND (a@0 IS NULL OR a@0 > ac) ] + " + ); + // There should be no more batches + assert!(stream.next().await.is_none()); +} + #[tokio::test] async fn test_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType;