-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Upgrade to arrow 56.1.0 #17275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade to arrow 56.1.0 #17275
Conversation
+-----------------------------------+-----------------+---------------------+------+------------------+ | ||
| alltypes_plain.parquet | 1851 | 10181 | 2 | page_index=false | | ||
| alltypes_tiny_pages.parquet | 454233 | 881634 | 2 | page_index=true | | ||
| alltypes_tiny_pages.parquet | 454233 | 881418 | 2 | page_index=true | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really know why the in-memory size of the ParquetMetadata has decreased, but it seems like a good improvement to me
.unwrap_or_else(|e| e.as_ref().clone()); | ||
let mut reader = | ||
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); | ||
let mut reader = ParquetMetaDataReader::new_with_metadata(m) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to this change from @kczimm
Cargo.toml
Outdated
datafusion-spark = { path = "datafusion/spark", version = "49.0.0" } | ||
datafusion-sql = { path = "datafusion/sql", version = "49.0.0" } | ||
datafusion-substrait = { path = "datafusion/substrait", version = "49.0.0" } | ||
datafusion = { path = "datafusion/core", version = "49.0.1", default-features = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive by change to update all versions in Cargo.toml to the latest
12)│ DataSourceExec ││ DataSourceExec │ | ||
13)│ -------------------- ││ -------------------- │ | ||
14)│ bytes: 6040 ││ bytes: 6040 │ | ||
14)│ bytes: 5932 ││ bytes: 5932 │ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the in memory size may have improved due to
And the Vec doesn't have the same minimum alignment / size that the builders had
|
||
let size = get_record_batch_memory_size(&batch); | ||
assert_eq!(size, 8320); | ||
assert_eq!(size, 8208); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Also due to Use
Vec
directly in builders arrow-rs#7984
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖 |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
|
🤖 |
🤖: Benchmark completed Details
|
Cargo.toml
Outdated
] } | ||
unused_qualifications = "deny" | ||
|
||
## Temporary arrow-rs patch until 56.1.0 is released |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow-rs
56.1.0
has been released, so this can now be updated.
Thanks @nuno-faria -- I'll try and polish this over the next few days if no one beats me to it |
I found a potential performance regression with use datafusion::error::Result;
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
#[tokio::main]
async fn main() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
ctx.sql("set datafusion.execution.parquet.pushdown_filters = true")
.await?
.collect()
.await?;
ctx.sql(
"
copy (
select i as k
from generate_series(1, 1000000) as t(i)
order by k
) to 't.parquet'
options (MAX_ROW_GROUP_SIZE 100000, DATA_PAGE_ROW_COUNT_LIMIT 1000, WRITE_BATCH_SIZE 1000, DICTIONARY_ENABLED FALSE);",
)
.await?
.collect()
.await?;
ctx.register_parquet("t", "t.parquet", ParquetReadOptions::new())
.await?;
ctx.sql("explain analyze select k from t where k = 123456")
.await?
.show()
.await?;
Ok(())
} With
With
I think this is a consequence of apache/arrow-rs#7850, more specifically https://github.com/apache/arrow-rs/blame/0c7cb2ac3f3132216a08fd557f9b1edc7f90060f/parquet/src/arrow/arrow_reader/selection.rs#L445. |
I saw this @nuno-faria I hope to look at it tomorrow. |
Thanks @nuno-faria -- this is a great find. @XiangpengHao and I purposely added a setting that allows disabling the cache for precisely this reason So what I think is needed is here is a way to turn this setting off via a DataFusion setting as well, which is what I was trying to say with
Let me give this a try and see if we can get it working better |
75c255e
to
26d94a4
Compare
Thanks @alamb, a config in datafusion would be ideal. |
I have one more test to write / fix and then this will be ready. I will get it done tomorrow |
7f69441
to
ceade58
Compare
@nuno-faria -- I added a config flag Can you possibly test that if you set set datafusion.execution.parquet.max_predicate_cache_size = 0 That the I/O goes back to what you it was like in 56.0.0? |
// final output | ||
expected_inner_records: 16, | ||
// Expect this to 0 records read as the cache is disabled. However, it is | ||
// non zero due to https://github.com/apache/arrow-rs/issues/8307 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did verify that the cache is not being used via the debugger. However, this metric is very confusing. I filed a ticket to track:
ea466a5
to
7a6ea93
Compare
Thanks. I tried with the latest commit but still see the same behavior.
Here is a DataFusion CLI v50.0.0
> set datafusion.execution.parquet.pushdown_filters = true;
0 row(s) fetched.
Elapsed 0.003 seconds.
> set datafusion.execution.parquet.max_predicate_cache_size = 0;
0 row(s) fetched.
Elapsed 0.001 seconds.
> copy (
select i as k
from generate_series(1, 1000000) as t(i)
order by k
) to 't.parquet'
options (MAX_ROW_GROUP_SIZE 100000, DATA_PAGE_ROW_COUNT_LIMIT 1000, WRITE_BATCH_SIZE 1000, DICTIONARY_ENABLED FALSE);
+---------+
| count |
+---------+
| 1000000 |
+---------+
1 row(s) fetched.
Elapsed 0.861 seconds.
> create external table t stored as parquet location 't.parquet';
0 row(s) fetched.
Elapsed 0.007 seconds.
> explain analyze select k from t where k = 123456;
total=9929
ranges=[125400..126482, 126482..127564, 127564..128646, 128646..129728, 129728..130810, 130810..131892, 131892..132974, 132974..134247, 134247..135329]
total=0
ranges=[]
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | DataSourceExec: file_groups={1 group: [[/t.parquet]]}, projection=[k], file_type=parquet, predicate=k@0 = 123456, pruning_predicate=k_null_count@2 != row_count@3 AND k_min@0 <= 123456 AND 123456 <= k_max@1, required_guarantees=[k in (123456)], metrics=[output_rows=1, elapsed_compute=1ns, batches_split=0, bytes_scanned=9929, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1192, page_index_rows_pruned=98808, predicate_cache_inner_records=16384, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=1, pushdown_rows_pruned=1191, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=195.801µs, metadata_load_time=340.301µs, page_index_eval_time=233.801µs, row_pushdown_eval_time=57.201µs, statistics_eval_time=387.401µs, time_elapsed_opening=2.1128ms, time_elapsed_processing=6.9853ms, time_elapsed_scanning_total=5.324ms, time_elapsed_scanning_until_data=5.2613ms] |
| | |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.016 seconds. |
I think we should proceed to reviewing and merging this PR. Since @AdamGS was already looking at this and I will file a follow on ticket to track the inability to extend data pages, and we will have until DataFusion 51 to resolve the issue Also, since arrow-rs 56.1.0 is marked as compatible with previous versions of arrow people can (and probably will) start using this release with DataFusion 50 anyways |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-binding LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Which issue does this PR close?
56.1.0
(August 2025) arrow-rs#7837Rationale for this change
Upgrade to the latest arrow release
What changes are included in this PR?
56.1.0
release arrow-rs#8202)ArrowReaderMetrics
to DataFusion's parquet metricsAre these changes tested?
Functionally By CI
I will also run benchmarks against this branch
Follow on Issues:
parquet 56.1.0
/ data ranges #17575Are there any user-facing changes?