@@ -122,7 +122,123 @@ fn test_pushdown_into_scan_with_config_options() {
122
122
}
123
123
124
124
#[ tokio:: test]
125
- async fn test_hashjoin_parent_filter_pushdown ( ) {
125
+ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk ( ) {
126
+ use datafusion_common:: JoinType ;
127
+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
128
+
129
+ // Create build side with limited values
130
+ let build_batches = vec ! [ record_batch!(
131
+ ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
132
+ ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
133
+ ( "c" , Float64 , [ 1.0 , 2.0 ] )
134
+ )
135
+ . unwrap( ) ] ;
136
+ let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
137
+ Field :: new( "a" , DataType :: Utf8 , false ) ,
138
+ Field :: new( "b" , DataType :: Utf8 , false ) ,
139
+ Field :: new( "c" , DataType :: Float64 , false ) ,
140
+ ] ) ) ;
141
+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
142
+ . with_support ( true )
143
+ . with_batches ( build_batches)
144
+ . build ( ) ;
145
+
146
+ // Create probe side with more values
147
+ let probe_batches = vec ! [ record_batch!(
148
+ ( "d" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
149
+ ( "e" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
150
+ ( "f" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
151
+ )
152
+ . unwrap( ) ] ;
153
+ let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
154
+ Field :: new( "d" , DataType :: Utf8 , false ) ,
155
+ Field :: new( "e" , DataType :: Utf8 , false ) ,
156
+ Field :: new( "f" , DataType :: Float64 , false ) ,
157
+ ] ) ) ;
158
+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
159
+ . with_support ( true )
160
+ . with_batches ( probe_batches)
161
+ . build ( ) ;
162
+
163
+ // Create HashJoinExec
164
+ let on = vec ! [ (
165
+ col( "a" , & build_side_schema) . unwrap( ) ,
166
+ col( "d" , & probe_side_schema) . unwrap( ) ,
167
+ ) ] ;
168
+ let join = Arc :: new (
169
+ HashJoinExec :: try_new (
170
+ build_scan,
171
+ probe_scan,
172
+ on,
173
+ None ,
174
+ & JoinType :: Inner ,
175
+ None ,
176
+ PartitionMode :: Partitioned ,
177
+ datafusion_common:: NullEquality :: NullEqualsNothing ,
178
+ )
179
+ . unwrap ( ) ,
180
+ ) ;
181
+
182
+ let join_schema = join. schema ( ) ;
183
+
184
+ // Finally let's add a SortExec on the outside to test pushdown of dynamic filters
185
+ let sort_expr =
186
+ PhysicalSortExpr :: new ( col ( "e" , & join_schema) . unwrap ( ) , SortOptions :: default ( ) ) ;
187
+ let plan = Arc :: new (
188
+ SortExec :: new ( LexOrdering :: new ( vec ! [ sort_expr] ) . unwrap ( ) , join)
189
+ . with_fetch ( Some ( 2 ) ) ,
190
+ ) as Arc < dyn ExecutionPlan > ;
191
+
192
+ let mut config = ConfigOptions :: default ( ) ;
193
+ config. optimizer . enable_dynamic_filter_pushdown = true ;
194
+ config. execution . parquet . pushdown_filters = true ;
195
+
196
+ // Appy the FilterPushdown optimizer rule
197
+ let plan = FilterPushdown :: new_post_optimization ( )
198
+ . optimize ( Arc :: clone ( & plan) , & config)
199
+ . unwrap ( ) ;
200
+
201
+ // Test that filters are pushed down correctly to each side of the join
202
+ insta:: assert_snapshot!(
203
+ format_plan_for_test( & plan) ,
204
+ @r"
205
+ - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
206
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
207
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
208
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
209
+ "
210
+ ) ;
211
+
212
+ // Put some data through the plan to check that the filter is updated to reflect the TopK state
213
+ let session_ctx = SessionContext :: new_with_config ( SessionConfig :: new ( ) ) ;
214
+ session_ctx. register_object_store (
215
+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
216
+ Arc :: new ( InMemory :: new ( ) ) ,
217
+ ) ;
218
+ let state = session_ctx. state ( ) ;
219
+ let task_ctx = state. task_ctx ( ) ;
220
+ let mut stream = plan. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
221
+ // Iterate one batch
222
+ stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
223
+
224
+ // Test that filters are pushed down correctly to each side of the join
225
+ insta:: assert_snapshot!(
226
+ format_plan_for_test( & plan) ,
227
+ @r"
228
+ - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
229
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
230
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
231
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
232
+ "
233
+ ) ;
234
+ }
235
+
236
+ // Test both static and dynamic filter pushdown in HashJoinExec.
237
+ // Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase.
238
+ // However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup.
239
+ // Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups.
240
+ #[ tokio:: test]
241
+ async fn test_static_filter_pushdown_through_hash_join ( ) {
126
242
use datafusion_common:: JoinType ;
127
243
use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
128
244
@@ -245,7 +361,8 @@ async fn test_hashjoin_parent_filter_pushdown() {
245
361
246
362
let join_schema = join. schema ( ) ;
247
363
let filter = col_lit_predicate ( "a" , "aa" , & join_schema) ;
248
- let plan = Arc :: new ( FilterExec :: try_new ( filter, join) . unwrap ( ) ) ;
364
+ let plan =
365
+ Arc :: new ( FilterExec :: try_new ( filter, join) . unwrap ( ) ) as Arc < dyn ExecutionPlan > ;
249
366
250
367
// Test that filters are NOT pushed down for left join
251
368
insta:: assert_snapshot!(
0 commit comments