-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Fix HashJoinExec sideways information passing for partitioned queries #17197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I think we can support this but need to do some thinking / writing code |
@nuno-faria @jonathanc-n would you mind reviewing this PR? |
In the case of hash join spilling this might be a bit difficult. I'm planning on putting out a proposal for hash join spilling in the next few days. To give you a quick rundown the idea is to essentially partition the data and when there is a need to spill we can write some partitions to disk, and after the first hash join exec is done, it will read spilled batches from disk and run again. Since we can't use the entire batch to compute the bounds upfront I was thinking we could call min_batch/max_batch for every as the batches are being loaded in. Due to this reason I think the first hash spilling pull request won't include support for filter pushdown, or we can discuss this once I open the proposal |
Its a bit late for me so I'll take a look at the code tomorrow. It'd be interesting if we can somehow have partition aware expressions, even though it doesn't quite make sense, maybe we can instead can the filter behave differently with certain execution plans (like hash join) -> create a runtime filter 🤷 . I'll give some more thought tomorrow |
query III rowsort | ||
select * | ||
from t1 | ||
join t2 on t1.k = t2.k | ||
where v = 1 or v = 10000000; | ||
---- | ||
1 1 1 | ||
10000000 10000000 10000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add a few retries of this query just to be sure, since it could randomly work before the fix.
Thanks @adriangb for looking into it. I found some issues with the fix.
copy (select i as k from generate_series(1, 10000000) as t(i)) to 't1.parquet';
copy (select i as k, i as v from generate_series(1, 10000000) as t(i)) to 't2.parquet';
create external table t1 stored as parquet location 't1.parquet';
create external table t2 stored as parquet location 't2.parquet';
explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1;
-- in each run, the number of returned rows by t1 is different
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=6854272
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=8951424
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=7902848
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=8951424
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=7902848 On main, the number of rows returned by t1 is the minimum expected: DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=20480
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=20480
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=20480
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=20480
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ]
output_rows=20480
explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 2;
-- all rows are returned
DataSourceExec t1.parquet predicate=DynamicFilterPhysicalExpr [ true ]
output_rows=10000000 I think in this case it should be However, if we had |
@nuno-faria did you review 5d0d6a4 by chance? I've completely changed the approach since then and I can't reproduce the issues you describe. |
@kosiew since you're working on these bits as well, could you help review this PR? |
@adriangb I tested again and still see the issues. I'm testing with ❯ git log -1
commit 481f7f9382dd83d7d5416cdfb1bf52d26d7cec40 (HEAD -> fix-hash-join-partitioned)
Author: Adrian Garcia Badaracco <[email protected]>
Date: Fri Aug 15 13:44:49 2025 -0500
remove field, run test multiple times
❯ cargo run
DataFusion CLI v49.0.0
> copy (select i as k from generate_series(1, 10000000) as t(i)) to 't1.parquet';
+----------+
| count |
+----------+
| 10000000 |
+----------+
1 row(s) fetched.
Elapsed 4.200 seconds.
> copy (select i as k, i as v from generate_series(1, 10000000) as t(i)) to 't2.parquet';
+----------+
| count |
+----------+
| 10000000 |
+----------+
1 row(s) fetched.
Elapsed 4.345 seconds.
> create external table t1 stored as parquet location 't1.parquet';
0 row(s) fetched.
Elapsed 0.005 seconds.
> create external table t2 stored as parquet location 't2.parquet';
0 row(s) fetched.
Elapsed 0.006 seconds.
> explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1;
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=6.711µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=2.824202ms] |
| | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=1.043652101s, build_input_batches=1, build_input_rows=1, input_batches=709, input_rows=5805696, output_batches=709, build_mem_used=89, build_time=839.3µs, join_time=1.0427621s] |
| | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=63.8µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=43.9µs] |
| | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=569.911µs] |
| | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] |
| | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=252.512µs, metadata_load_time=3.801512ms, page_index_eval_time=329.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.223112ms, time_elapsed_opening=11.4854ms, time_elapsed_processing=46.3598ms, time_elapsed_scanning_total=35.9475ms, time_elapsed_scanning_until_data=32.8674ms] |
| | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] |
| here -> | , metrics=[output_rows=5805696, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=7485222, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=1048576, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=3, bloom_filter_eval_time=3.62µs, metadata_load_time=3.269812ms, page_index_eval_time=218.712µs, row_pushdown_eval_time=24ns, statistics_eval_time=443.22µs, time_elapsed_opening=6.0144ms, time_elapsed_processing=814.7442ms, time_elapsed_scanning_total=2.0536666s, time_elapsed_scanning_until_data=119.6913ms] |
| | |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.434 seconds.
> explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1;
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=8.611µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=3.987404ms] |
| | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=1.458327901s, build_input_batches=1, build_input_rows=1, input_batches=899, input_rows=7360512, output_batches=899, build_mem_used=89, build_time=974µs, join_time=1.4572807s] |
| | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=52.6µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=29.1µs] |
| | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=586.111µs] |
| | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] |
| | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=130.412µs, metadata_load_time=4.664712ms, page_index_eval_time=198.612µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.181212ms, time_elapsed_opening=12.7488ms, time_elapsed_processing=46.6822ms, time_elapsed_scanning_total=35.0437ms, time_elapsed_scanning_until_data=31.9659ms] |
| | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] |
| here -> | , metrics=[output_rows=7360512, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=9474601, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=2076672, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=1, bloom_filter_eval_time=1.023µs, metadata_load_time=2.509312ms, page_index_eval_time=578.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=164.423µs, time_elapsed_opening=5.3009ms, time_elapsed_processing=1.1821071s, time_elapsed_scanning_total=2.937489s, time_elapsed_scanning_until_data=188.8519ms] |
| | |
+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.496 seconds.
> explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1;
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=6.711µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=6.169003ms] |
| | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=2.217662901s, build_input_batches=1, build_input_rows=1, input_batches=1221, input_rows=10000000, output_batches=1221, build_mem_used=89, build_time=711.7µs, join_time=2.2169098s] |
| | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=51.6µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=26.6µs] |
| | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=585.411µs] |
| | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] |
| | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=129.712µs, metadata_load_time=4.275612ms, page_index_eval_time=206.112µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.155312ms, time_elapsed_opening=13.5415ms, time_elapsed_processing=47.6242ms, time_elapsed_scanning_total=35.1671ms, time_elapsed_scanning_until_data=32.096ms] |
| | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] |
| here -> | , metrics=[output_rows=10000000, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=12781310, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=1.123µs, metadata_load_time=2.642712ms, page_index_eval_time=4.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=131.323µs, time_elapsed_opening=4.6513ms, time_elapsed_processing=1.8171676s, time_elapsed_scanning_total=4.5157559s, time_elapsed_scanning_until_data=260.5798ms] |
| | |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.549 seconds. On main it always returns 20480 rows for +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=6.611µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=31.7µs] |
| | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=4.247301ms, build_input_batches=1, build_input_rows=1, input_batches=3, input_rows=20480, output_batches=3, build_mem_used=89, build_time=673.7µs, join_time=3.555ms] |
| | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=66.4µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=28.7µs] |
| | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=601.011µs] |
| | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] |
| | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=91.312µs, metadata_load_time=5.554412ms, page_index_eval_time=173.912µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.118912ms, time_elapsed_opening=15.7229ms, time_elapsed_processing=49.715ms, time_elapsed_scanning_total=35.0119ms, time_elapsed_scanning_until_data=31.9417ms] |
| | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] |
| here -> | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=206447, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=134.212µs, metadata_load_time=5.683512ms, page_index_eval_time=172.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.148112ms, time_elapsed_opening=12.28ms, time_elapsed_processing=29.6494ms, time_elapsed_scanning_total=21.961ms, time_elapsed_scanning_until_data=16.4331ms] |
| | |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ And there is the second issue, where > explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 2;
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=2, elapsed_compute=11.31µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=7.552604ms] |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=431.141012ms, build_input_batches=2, build_input_rows=2, input_batches=1175, input_rows=10000000, output_batches=1368, build_mem_used=908, build_time=11.9206ms, join_time=408.0587ms] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=50.702µs] |
| | RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=51.8786ms, repartition_time=75.211µs, send_time=53.142µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=28.3µs] |
| | FilterExec: v@1 = 1 OR v@1 = 2, metrics=[output_rows=2, elapsed_compute=1.082711ms] |
| | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1 OR v@1 = 2, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1 OR v_null_count@2 != row_count@3 AND v_min@0 <= 2 AND 2 <= v_max@1, required_guarantees=[v in (1, 2)] |
| | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=167.712µs, metadata_load_time=7.794612ms, page_index_eval_time=234.612µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.390112ms, time_elapsed_opening=15.7157ms, time_elapsed_processing=49.7691ms, time_elapsed_scanning_total=35.6062ms, time_elapsed_scanning_until_data=32.055ms] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=10000000, elapsed_compute=88.506601ms] |
| | RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=1.961983s, repartition_time=1.462319702s, send_time=193.006324ms] |
| | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ], metrics=[output_rows=10000000, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=12781310, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=24ns, metadata_load_time=2.152412ms, page_index_eval_time=3.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=24ns, time_elapsed_opening=3.8613ms, time_elapsed_processing=1.9387351s, time_elapsed_scanning_total=3.9634373s, time_elapsed_scanning_until_data=251.6241ms] |
| | |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.501 seconds. |
@nuno-faria thank you for your patience. I can reproduce now, I must have been on the wrong commit or something. My laptop started having issues a couple days ago so I've been resetting it, will probably need to get a new one, and must have gotten confused about what commit I was on or something. In any case, I think I found the root cause the both issues: empty partitions (with no rows in the hash table / build side) do not report bounds -> were not being counted towards cross partition synchronization -> dynamic filters were not being built at the right time / at all. I think d2b8da5 should fix this. Now with the following copy (select i as k from generate_series(1, 10000000) as t(i)) to 't1.parquet';
copy (select i as k, i as v from generate_series(1, 10000000) as t(i)) to 't2.parquet';
create external table t1 stored as parquet location 't1.parquet';
create external table t2 stored as parquet location 't2.parquet';
explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1; When I run And with this query: explain analyze select *
from t1
join t2 on t1.k = t2.k
where v = 1 or v = 2; I consistently get |
@adriangb It appears issue 2 is fixed, it has the correct filter now. Issue 1 still appears on my end, the number of rows collected from DataSourceExec t2.parquet, output_rows=20480
DataSourceExec t1.parquet, output_rows=7902848 -- should be 20480
DataSourceExec t2.parquet, output_rows=20480
DataSourceExec t1.parquet, output_rows=10000000 -- should be 20480
DataSourceExec t2.parquet, output_rows=20480
DataSourceExec t1.parquet, output_rows=9437184 -- should be 20480 Note that +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=22.8µs] |
| | FilterExec: k@0 = 1, metrics=[output_rows=1, elapsed_compute=542.011µs] |
| | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=k@0 = 1, pruning_predicate=k_null_count@2 != row_count@3 AND k_min@0 <= 1 AND 1 <= k_max@1, required_guarantees=[k in (1)] |
| | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=206447, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=132.912µs, metadata_load_time=4.262112ms, page_index_eval_time=188.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.258112ms, time_elapsed_opening=10.551ms, time_elapsed_processing=27.7574ms, time_elapsed_scanning_total=18.0588ms, time_elapsed_scanning_until_data=16.2569ms] |
| | |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |
Ah I see I was looking at the wrong I wonder if this is just a consequence of synchronization: there are no guarantees about the order in which the build side hash tables are built, and we don't push down the filter until they're all done, so e.g. one partition may stream several batches before the filter even kicks in, and that's going to be non-deterministic. I think it's a non issue in the real world because small queries are already going to be fast and large queries will stream enough batches that the filter will eventually kick in and be pushed down. But I need to confirm this hypothesis. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good tackle of a tricky problem!
Left a question and a typo fix suggestion.
let expected_calls = match partition_mode { | ||
// Each output partition accesses shared build data | ||
PartitionMode::CollectLeft => { | ||
right_child.output_partitioning().partition_count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this is fragile?
right_child.output_partitioning().partition_count() reflects the plan at planning time. If any Repartition/Coalesce/EnforceDistribution/etc. gets inserted between planning and execution (or an upstream operator behaves differently at runtime), the actual number of streams created at execute() can differ. That can break coordination that relies on a compile-time partition count (like your SharedBoundsAccumulator.total_partitions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could defer creation of this up to dynamic filter pushdown optimizer pass (the last one) or we could try and defer it into ExecutionPlan::execute
. Does that resolve the concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does 9fe1bf4 work you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the change so that method is only called from HashJoinExec::execute, after any optimizer passes are run, should resolve the concern
@nuno-faria I was able to confirm this is just a race between updating the filter and starting work on the probe side: https://github.com/pydantic/datafusion/compare/fix-hash-join-partitioned...pydantic:datafusion:demo-race?expand=1 On that branch I get consistently get But I don't think that is a good approach long term. The code is more complex and there are potentials for deadlocks. The way the current code is structured even if there are bugs it should never be slower than not having dynamic filters. They just may take a couple batches to kick in, they won't help small queries on a local SSD (like we've been testing here) much but will help massively for large queries on slower storage, etc. |
I see, that makes sense. I think in theory we would need to have something like what Postgres does and determine at plan time that following the parameterized path would be the best approach, which would be quite complex.
Agreed. I did some tests and found that the number of rows is kept to the minimuim when the number of partitions is set to 1. On a simple join query this makes it more than 20x faster than DuckDB. |
That's crazy! I think all we need now is an approval to merge this. @kosiew not sure if you're a commiter, otherwise @xudong963? |
058953a
to
279399b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, this LGTM. nice catch on that race
PartitionMode::Auto => 1, | ||
}; | ||
Self { | ||
bounds: Mutex::new(Vec::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can preallocate bounds with_capacity()
I plan to merge this once CI passes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Observation:
The fix collects every partition’s local (min, max) pairs and merges them into a single “global” (min, max) per join column. In SharedBoundsAccumulator::merge_bounds, the loop walks all stored bounds and computes global_min/global_max for each column, discarding the original partition-specific ranges.
Later, create_filter_from_bounds converts each global pair into col ≥ min AND col ≤ max, then conjoins these ranges across columns.
Because the merged predicate is always a single contiguous interval, any disjoint conditions (e.g. v = 1 OR v = 10000000) get collapsed into one broad range. In that example, bounds (1, 1) and (10000000, 10000000) become (1, 10000000), yielding the filter k ≥ 1 AND k ≤ 10000000—a superset containing nearly all rows. While this fixes the earlier correctness bug (the filter won’t exclude valid results), the pushdown loses selectivity for highly selective OR predicates, making it less effective for pruning.
If we tracked multiple ranges per column lets the accumulator union all build-side information into a single expression like
k BETWEEN 1 AND 1 OR k BETWEEN 10000000 AND 10000000.
This remains a single dynamic filter; it simply captures non-contiguous values more precisely than a broad min/max interval.
Not suggesting for the above to be included in this PR.
Do you think a follow up issue is warranted to explore further refining this?
let bounds = self.bounds.lock(); | ||
let all_bounds: Vec<_> = bounds.iter().collect(); | ||
|
||
if all_bounds.is_empty() { | ||
return None; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we operate directly on the locked bounds
and eliminate an unnecessary intermediate collection (all_bounds) to compute column-wise minima and maxima?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored this to also address #17197 (review) as well, that's a great point
dynamic_filter: Arc<DynamicFilterPhysicalExpr>, | ||
/// Shared bounds accumulator for coordinating dynamic filter updates across partitions | ||
/// Lazily initialized at execution time to use actual runtime partition counts | ||
bounds_accumulator: Arc<OnceLock<Arc<SharedBoundsAccumulator>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The double Arc in Arc<OnceLock<Arc>> layering increases complexity and obscures ownership expectations
Can we replace with OnceLock<Arc> to eliminate the outer Arc wrapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've refactored this away 😄
if completed == total_partitions { | ||
if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() { | ||
let filter_expr = self.create_filter_from_bounds(merged_bounds)?; | ||
dynamic_filter.update(filter_expr)?; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The plan can be rerun whenever the same ExecutionPlan object is executed more than once—e.g., if a caller issues multiple collect/execute invocations on a prepared physical plan or reuses the plan across query runs.
I think we should reset the bounds_accumulator to clear stale data before exiting this if block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset_state
is already implemented for HashJoinExec
and it does reset HashJoinExec::bounds_accumulator
. This bit of code we are commenting on is within HashJoinStream
which is created inside of HashJoinExec::execute
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb @kosiew @nuno-faria and @jonathanc-n for your work on this PR 🙏
I think @kosiew comment about resetting the state should be addressed prior to merge.
The rest of the comments I think could be done as a follow on PR (or never)
I also noticed that this module is getting huge so filed a suggestion for a follow up
Thank you all for your work to push DataFusion's joins along
/// partition executions. | ||
struct SharedBoundsAccumulator { | ||
/// Bounds from completed partitions. | ||
bounds: Mutex<Vec<Vec<(ScalarValue, ScalarValue)>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as a follow on, it would be nice to put this structure into its own struct or something -- figuring out what Vec<Vec<(ScalarValue, ScalarValue)>>
represents is a bit 🤯
let expected_calls = match partition_mode { | ||
// Each output partition accesses shared build data | ||
PartitionMode::CollectLeft => { | ||
right_child.output_partitioning().partition_count() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the change so that method is only called from HashJoinExec::execute, after any optimizer passes are run, should resolve the concern
// Store bounds in the accumulator - this runs once per partition | ||
if let Some(bounds) = &left_data.bounds { | ||
// Only push actual bounds if they exist | ||
self.bounds_accumulator.bounds.lock().push(bounds.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the mixed use of the fields directly from self.bounds_accumulator
and methods (try_merge
) somewhat unclear. As a follow on it might make the code easier to understand if we moved more of the logic for the shared bounds into the BoundsAccumulator structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see how 01757fa looks
if completed == total_partitions { | ||
if let Some(merged_bounds) = self.bounds_accumulator.merge_bounds() { | ||
let filter_expr = self.create_filter_from_bounds(merged_bounds)?; | ||
dynamic_filter.update(filter_expr)?; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bounds: Vec<(ScalarValue, ScalarValue)>, | ||
) -> Result<Arc<dyn PhysicalExpr>> { | ||
// Create range predicates for each join key | ||
let mut predicates = Vec::with_capacity(bounds.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is another good candidate to move into the BoundsAccumulator
|
||
# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random | ||
# So we'll run the same query a couple of times just to have more certainty it's fixed | ||
# Sorry about the spam in this slt test... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW it would also be amazing if we could figure out how to test this more thoroughly (maybe unit tests?) |
🤖: Benchmark completed Details
|
/// | ||
/// ## Thread Safety | ||
/// | ||
/// All fields use atomic operations or mutexes to ensure correct coordination between concurrent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Letting fields guarded by fine-grained mutex/atomic looks a bit dangerous, the implementation has also to ensure the correct update order (in this case first update bounds
, then update completed_partitions
), it's correct now but the future change might forget this very subtle restriction.
How about wrapping the whole struct with a big lock? I think it won't cause performance issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, 360253e
@alamb just out of curiosity - would you be able to kick off some more benches? I wonder if the change of how we structure the dynamic filter makes a difference |
506d533
to
57dbec9
Compare
Here's results from my local machine:
|
🚀 |
Looks like extended tests are failing: https://github.com/apache/datafusion/actions/runs/17108179889/job/48522491130 |
I've put a fix up here: #17265 |
…apache#17197) (cherry picked from commit 64bc58d)
* Enable physical filter pushdown for hash joins (apache#16954) (cherry picked from commit b10f453) * Add ExecutionPlan::reset_state (apache#17028) * Add ExecutionPlan::reset_state Co-authored-by: Robert Ream <[email protected]> * Update datafusion/sqllogictest/test_files/cte.slt * Add reference * fmt * add to upgrade guide * add explain plan, implement in more plans * fmt * only explain --------- Co-authored-by: Robert Ream <[email protected]> * Add dynamic filter (bounds) pushdown to HashJoinExec (apache#16445) (cherry picked from commit ff77b70) * Push dynamic pushdown through CooperativeExec and ProjectionExec (apache#17238) (cherry picked from commit 4bc0696) * Fix dynamic filter pushdown in HashJoinExec (apache#17201) (cherry picked from commit 1d4d74b) * Fix HashJoinExec sideways information passing for partitioned queries (apache#17197) (cherry picked from commit 64bc58d) * disallow pushdown of volatile functions (apache#16861) * dissallow pushdown of volatile PhysicalExprs * fix * add FilteredVec helper to handle filter / remap pattern (#34) * checkpoint: Address PR feedback in https://github.com/apach... * add FilteredVec to consolidate handling of filter / remap pattern * lint * Add slt test for pushing volatile predicates down (#35) --------- Co-authored-by: Andrew Lamb <[email protected]> (cherry picked from commit 94e8548) * fix bounds accumulator reset in HashJoinExec dynamic filter pushdown (apache#17371) --------- Co-authored-by: Adrian Garcia Badaracco <[email protected]> Co-authored-by: Robert Ream <[email protected]> Co-authored-by: Jack Kleeman <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
Fixes #17188.
The bug originated because I did not account for each partition building a build side hash table independently.
Having grokked the code in more detail I now understand how this happens.
The approach I've taken here to fix the bug is to wait until all partitions have finished their build side (keeping track of the bounds for each partition) and only then calculating the overall bounds and pushing them down as filters to the scans.
I don't see how we could do much better: since we only have 1 filter to push down (expressions are not aware of partitions) we have to make that filter represent the entire build side (i.e. we can't push down a filter per partition).