Skip to content

Conversation

adriangb
Copy link
Contributor

Fixes #17196

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Aug 15, 2025
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Aug 15, 2025
Comment on lines 808 to 830
// Create a new dynamic filter with swapped keys after inputs are swapped
new_join.dynamic_filter = Self::create_dynamic_filter(&new_join.on);
Ok(Arc::new(new_join))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the fix, I was thinking @nuno-faria was saying we didn't consider PartitionMode::Partitioned before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 The partitioned mode is still considered, but the filter that is pushed is not waiting for all partitions to finish. In this case, the filter (right or wrong) is pushed to the wrong side.

Copy link
Member

@xudong963 xudong963 Aug 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I got it, I was thinking the create_dynamic_filter is called during the hash join execution (after join_selection).

After checking the code, it'll be created during logicalplan -> physicalplan (the physical plan construction phase).

So for me, it makes more sense to delay the create_dynamic_filter creation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 you inspired me to see if I could take a completely different approach: e7c5d16

I actually think this approach should also work for and simplify things in SortExec and other places. Let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think this approach should also work for and simplify things in SortExec and other places. Let me know what you think.

Yeah, I think it would be good

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Aug 15, 2025
@adriangb adriangb changed the title Fix dynamic filter pushdown in HashJoinExec::swap_inputs Fix dynamic filter pushdown in HashJoinExec Aug 15, 2025
@adriangb
Copy link
Contributor Author

@kosiew since you're working on these bits as well, could you help review this PR?

@adriangb
Copy link
Contributor Author

hmm msvr is failing for 1.85.1 but if try locally:

❯ cargo "+1.85.1" check         
    Blocking waiting for file lock on package cache
error: rustc 1.85.1 is not supported by the following packages:
  [email protected] requires rustc 1.86.0
  [email protected] requires rustc 1.86.0
  [email protected] requires rustc 1.86.0
Either upgrade rustc or select compatible dependency versions with
`cargo update <name>@<current-ver> --precise <compatible-ver>`
where `<compatible-ver>` is the latest version supporting rustc 1.85.1

Comment on lines +309 to +313
01)CoalesceBatchesExec: target_batch_size=8192
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the DynamicFilterPhysicalExpr be applied to the left table here, i.e., small_table?

Copy link
Contributor Author

@adriangb adriangb Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that at runtime (i.e. when ExecutionPlan::execute is called and work actually begins) the left side is always the build side and the right side is always the probe side. During optimizer passes which one is left and right may be swapped / re-arranged but all of the dynamic filter stuff happens after this so we can always push the filters into the right side. At least for inner joins, other join types may be more complex and I haven't even begun to wrap my head around those cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tree looks like this:

copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
+---------------+------------------------------------------------------------+
| plan_type     | plan                                                       |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐                              |
|               | │    CoalesceBatchesExec    │                              |
|               | │    --------------------   │                              |
|               | │     target_batch_size:    │                              |
|               | │            8192           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │        HashJoinExec       │                              |
|               | │    --------------------   ├──────────────┐               |
|               | │        on: (k = k)        │              │               |
|               | └─────────────┬─────────────┘              │               |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      ││    CoalesceBatchesExec    │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │          files: 1         ││     target_batch_size:    │ |
|               | │      format: parquet      ││            8192           │ |
|               | └───────────────────────────┘└─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │         FilterExec        │ |
|               |                              │    --------------------   │ |
|               |                              │     predicate: v >= 50    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │      RepartitionExec      │ |
|               |                              │    --------------------   │ |
|               |                              │ partition_count(in->out): │ |
|               |                              │          1 -> 12          │ |
|               |                              │                           │ |
|               |                              │    partitioning_scheme:   │ |
|               |                              │    RoundRobinBatch(12)    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │       DataSourceExec      │ |
|               |                              │    --------------------   │ |
|               |                              │          files: 1         │ |
|               |                              │      format: parquet      │ |
|               |                              │                           │ |
|               |                              │         predicate:        │ |
|               |                              │      v >= 50 AND true     │ |
|               |                              └───────────────────────────┘ |
|               |                                                            |
+---------------+------------------------------------------------------------+

This makes sense: you always want the small table to be the build side and the large table to be the probe side.

If I change the query to:

explain select * from large_table join small_table on small_table.k = large_table.k where large_table.v >= 50;

Then we'll make a different plan but we swap the join around so that the large table continues to be the probe side:

+---------------+------------------------------------------------------------+
| plan_type     | plan                                                       |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐                              |
|               | │       ProjectionExec      │                              |
|               | │    --------------------   │                              |
|               | │            k: k           │                              |
|               | │            v: v           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │    CoalesceBatchesExec    │                              |
|               | │    --------------------   │                              |
|               | │     target_batch_size:    │                              |
|               | │            8192           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │        HashJoinExec       │                              |
|               | │    --------------------   ├──────────────┐               |
|               | │        on: (k = k)        │              │               |
|               | └─────────────┬─────────────┘              │               |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      ││    CoalesceBatchesExec    │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │          files: 1         ││     target_batch_size:    │ |
|               | │      format: parquet      ││            8192           │ |
|               | └───────────────────────────┘└─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │         FilterExec        │ |
|               |                              │    --------------------   │ |
|               |                              │     predicate: v >= 50    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │      RepartitionExec      │ |
|               |                              │    --------------------   │ |
|               |                              │ partition_count(in->out): │ |
|               |                              │          1 -> 12          │ |
|               |                              │                           │ |
|               |                              │    partitioning_scheme:   │ |
|               |                              │    RoundRobinBatch(12)    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │       DataSourceExec      │ |
|               |                              │    --------------------   │ |
|               |                              │          files: 1         │ |
|               |                              │      format: parquet      │ |
|               |                              │                           │ |
|               |                              │         predicate:        │ |
|               |                              │      v >= 50 AND true     │ |
|               |                              └───────────────────────────┘ |
|               |                                                            |
+---------------+------------------------------------------------------------+
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@1 as k, v@2 as v, k@0 as k], metrics=[output_rows=51, elapsed_compute=3.428µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=51, elapsed_compute=4.584µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 100], metrics=[output_rows=51, elapsed_compute=1.336377ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, build_time=983.25µs, join_time=347.668µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |       DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=41.043µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=102.5µs, time_elapsed_processing=742.668µs, time_elapsed_scanning_total=731.75µs, time_elapsed_scanning_until_data=702.584µs] |
|                   |       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=951, elapsed_compute=22.585µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |         FilterExec: v@1 >= 50, metrics=[output_rows=951, elapsed_compute=152.886µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |           RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=2.19975ms, repartition_time=1ns, send_time=5.47µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                   |             DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 100 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50 AND k_null_count@4 != row_count@2 AND k_max@3 >= 1 AND k_null_count@4 != row_count@2 AND k_min@5 <= 100, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=226.251µs, metadata_load_time=204.001µs, page_index_eval_time=143.126µs, row_pushdown_eval_time=2ns, statistics_eval_time=109.96µs, time_elapsed_opening=1.627167ms, time_elapsed_processing=2.125167ms, time_elapsed_scanning_total=581.5µs, time_elapsed_scanning_until_data=546.208µs]                                                                                                                                                       |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSourceExec small_table.parquet
DataSourceExec large_table.parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ]

But in this case large_table is filtered by both v >= 50 and the dynamic filter. Shouldn't the dynamic filter be applied to small_table instead?

If where v = 50 is executed instead, the dynamic filter now appears in the small_table table:

DataSourceExec large_table.parquet, predicate=v@1 = 50
DataSourceExec small_table.parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 50 AND k@0 <= 50 ]

Copy link
Contributor Author

@adriangb adriangb Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dynamic filter is always applied to the probe side. In those examples above it seems the join side changes, probably because v = 50 is highly selective so the large table effectively becomes ~ 1 row and thus can be used for the build side. You can see that even without dynamic filters changing the query like that flips the build/probe sides:

copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
set datafusion.optimizer.enable_dynamic_filter_pushdown = false;
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
explain analyze select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
explain analyze select * from small_table join large_table on small_table.k = large_table.k where large_table.v = 50;
DataFusion CLI v49.0.1
+-------+
| count |
+-------+
| 100   |
+-------+
1 row(s) fetched. 
Elapsed 0.027 seconds.

+-------+
| count |
+-------+
| 1000  |
+-------+
1 row(s) fetched. 
Elapsed 0.006 seconds.

0 row(s) fetched. 
Elapsed 0.001 seconds.

0 row(s) fetched. 
Elapsed 0.002 seconds.

0 row(s) fetched. 
Elapsed 0.002 seconds.

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=51, elapsed_compute=5.546µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=51, elapsed_compute=1.976173ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, build_time=1.452836ms, join_time=520.127µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                   |     DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=56.793µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=136.292µs, time_elapsed_processing=1.070917ms, time_elapsed_scanning_total=978.833µs, time_elapsed_scanning_until_data=940.417µs] |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=951, elapsed_compute=24.711µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |       FilterExec: v@1 >= 50, metrics=[output_rows=951, elapsed_compute=263.386µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=2.629708ms, repartition_time=1ns, send_time=8.553µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |           DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50, pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=292.459µs, metadata_load_time=146.834µs, page_index_eval_time=123.543µs, row_pushdown_eval_time=2ns, statistics_eval_time=135.417µs, time_elapsed_opening=1.7685ms, time_elapsed_processing=2.520792ms, time_elapsed_scanning_total=854.999µs, time_elapsed_scanning_until_data=810.208µs]                                                                                                                                                        |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.013 seconds.

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=3.75µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=6.459µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=1, elapsed_compute=194.128µs, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=100, output_batches=1, build_mem_used=89, build_time=95.668µs, join_time=54.624µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |       CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=32.583µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=14.793µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |           FilterExec: v@1 = 50, metrics=[output_rows=1, elapsed_compute=111.469µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=1.516833ms, repartition_time=1ns, send_time=5.594µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |               DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 = 50, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 50 AND 50 <= v_max@1, required_guarantees=[v in (50)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=146.792µs, metadata_load_time=189.126µs, page_index_eval_time=66.084µs, row_pushdown_eval_time=2ns, statistics_eval_time=67.584µs, time_elapsed_opening=850.75µs, time_elapsed_processing=1.408834ms, time_elapsed_scanning_total=641.791µs, time_elapsed_scanning_until_data=616.5µs]                                                                                                                                                            |
|                   |       DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=45.376µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=92.333µs, time_elapsed_processing=260.126µs, time_elapsed_scanning_total=290.167µs, time_elapsed_scanning_until_data=214.041µs] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.008 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the filter is always applied from the build side to the probe side, independently of the query. Maybe I misunderstood but I though that was the original issue (#17196), that the filter was always applied to the same side, even when it made sense to do the opposite.

For example:

copy (select i as k from generate_series(1, 1000000) t(i)) to 't1.parquet';
copy (select i as k, i as v from generate_series(1, 10000000) t(i)) to 't2.parquet';
create external table t1 stored as parquet location 't1.parquet';
create external table t2 stored as parquet location 't2.parquet';

explain analyze select *
from t1
join t2 on t1.k = t2.k
where t2.v >= 1000000;

+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=71.205µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 14 AND k@0 <= 999997], metrics=[output_rows=1, elapsed_compute=740.259412ms, build_input_batches=120, build_input_rows=1000000, input_batches=12, input_rows=11713, output_batches=12, build_mem_used=34742816, build_time=703.6063ms, join_time=7.0188ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1000000, elapsed_compute=15.225303ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |       RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=1.6347235s, repartition_time=105.8625ms, send_time=8.7999ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=135.86ms, repartition_time=1ns, send_time=1.4348ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |           DataSourceExec: file_groups={1 group: [[t1.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=1000000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=1310405, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=120.801µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=232.8µs, time_elapsed_processing=134.8576ms, time_elapsed_scanning_total=145.4359ms, time_elapsed_scanning_until_data=17.3163ms]                                            |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=11713, elapsed_compute=244.4µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |       RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=55.1884ms, repartition_time=1.360111ms, send_time=123.432µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=11713, elapsed_compute=107.3µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |           FilterExec: v@1 >= 1000000, metrics=[output_rows=11713, elapsed_compute=681.411µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |             DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 1000000 AND DynamicFilterPhysicalExpr [ k@0 >= 14 AND k@0 <= 999997 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 1000000 AND k_null_count@4 != row_count@2 AND k_max@3 >= 14 AND k_null_count@4 != row_count@2 AND k_min@5 <= 999997, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                         |
|                   | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=379500, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=180.612µs, metadata_load_time=7.630712ms, page_index_eval_time=317.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.603512ms, time_elapsed_opening=16.959ms, time_elapsed_processing=53.3003ms, time_elapsed_scanning_total=39.3134ms, time_elapsed_scanning_until_data=36.7037ms]                                                                                                                  |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

In this example t2 is filtered by v and by a dynamic filter on k, while in theory it would be faster to apply a dynamic filter from t2 to t1 (maybe we would need some heuristic to determine which would be the best approach?).

adriangb added a commit to pydantic/datafusion that referenced this pull request Aug 18, 2025
According to our MSRV policy we are able to do this because the last 4 minor versions are 1.86, 1.87, 1.88 and 1.89. 1.85 was released > 4 months ago. So it is fair game to do this bump. I would like to do the bump for apache#17201
@adriangb adriangb mentioned this pull request Aug 18, 2025
@adriangb
Copy link
Contributor Author

hmm msvr is failing for 1.85.1 but if try locally:

MSRV issue is resolved now 😄

@adriangb adriangb requested a review from nuno-faria August 18, 2025 19:24
Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to file a ticket for this

@adriangb adriangb merged commit 1d4d74b into apache:main Aug 19, 2025
27 checks passed
@adriangb adriangb deleted the fix-join-order branch August 19, 2025 12:20
@adriangb
Copy link
Contributor Author

We may need to file a ticket for this

#17237

LiaCastaneda pushed a commit to DataDog/datafusion that referenced this pull request Sep 2, 2025
LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Sep 9, 2025
* Enable physical filter pushdown for hash joins (apache#16954)

(cherry picked from commit b10f453)

* Add ExecutionPlan::reset_state (apache#17028)

* Add ExecutionPlan::reset_state

Co-authored-by: Robert Ream <[email protected]>

* Update datafusion/sqllogictest/test_files/cte.slt

* Add reference

* fmt

* add to upgrade guide

* add explain plan, implement in more plans

* fmt

* only explain

---------

Co-authored-by: Robert Ream <[email protected]>

* Add dynamic filter (bounds) pushdown to HashJoinExec (apache#16445)

(cherry picked from commit ff77b70)

* Push dynamic pushdown through CooperativeExec and ProjectionExec (apache#17238)

(cherry picked from commit 4bc0696)

* Fix dynamic filter pushdown in HashJoinExec (apache#17201)

(cherry picked from commit 1d4d74b)

* Fix HashJoinExec sideways information passing for partitioned queries (apache#17197)

(cherry picked from commit 64bc58d)

* disallow pushdown of volatile functions (apache#16861)

* dissallow pushdown of volatile PhysicalExprs

* fix

* add FilteredVec helper to handle filter / remap pattern (#34)

* checkpoint: Address PR feedback in https://github.com/apach...

* add FilteredVec to consolidate handling of filter / remap pattern

* lint

* Add slt test for pushing volatile predicates down (#35)

---------

Co-authored-by: Andrew Lamb <[email protected]>
(cherry picked from commit 94e8548)

* fix bounds accumulator reset in HashJoinExec dynamic filter pushdown (apache#17371)

---------

Co-authored-by: Adrian Garcia Badaracco <[email protected]>
Co-authored-by: Robert Ream <[email protected]>
Co-authored-by: Jack Kleeman <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Changes to the physical-expr crates physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dynamic Filter Pushdown is being applied to the wrong table
3 participants