diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 90c03cf93563..0972283ffd85 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -49,6 +49,7 @@ use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, coalesce_partitions::CoalescePartitionsExec, + collect, filter::FilterExec, repartition::RepartitionExec, sorts::sort::SortExec, @@ -1095,7 +1096,16 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { let cb = Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; // Top-level CoalesceParititionsExec - let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + // Add a sort for determistic output + let plan = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + col("a", &probe_side_schema).unwrap(), + SortOptions::new(true, false), // descending, nulls_first + )]) + .unwrap(), + cp, + )) as Arc; // expect the predicate to be pushed down into the probe side DataSource insta::assert_snapshot!( @@ -1103,26 +1113,28 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { @r" OptimizationTest: input: - - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true output: Ok: - - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] " ); @@ -1141,27 +1153,58 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { ); let state = session_ctx.state(); let task_ctx = state.task_ctx(); - let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); - // Iterate one batch - if let Some(batch_result) = stream.next().await { - batch_result.unwrap(); - } + let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx)) + .await + .unwrap(); // Now check what our filter looks like + #[cfg(not(feature = "force_hash_collisions"))] insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - CoalescePartitionsExec - - CoalesceBatchesExec: target_batch_size=8192 - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - CoalesceBatchesExec: target_batch_size=8192 - - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] " ); + + #[cfg(feature = "force_hash_collisions")] + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false] + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + " + ); + + let result = format!("{}", pretty_format_batches(&batches).unwrap()); + + insta::assert_snapshot!( + result, + @r" + +----+----+-----+----+----+-----+ + | a | b | c | a | b | e | + +----+----+-----+----+----+-----+ + | ab | bb | 2.0 | ab | bb | 2.0 | + | aa | ba | 1.0 | aa | ba | 1.0 | + +----+----+-----+----+----+-----+ + ", + ); } #[tokio::test]