-
Notifications
You must be signed in to change notification settings - Fork 1k
Convert RunEndEncoded to Parquet #8069
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ec40c7f
to
5a43f13
Compare
5a43f13
to
4bea341
Compare
can_cast_types(from_key.data_type(), to_key.data_type()) && can_cast_types(from_value.data_type(), to_value.data_type()), | ||
_ => false | ||
}, | ||
// TODO: RunEndEncoded here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is handled in #7713
4bea341
to
0931768
Compare
writer.write(&batch).unwrap(); | ||
writer.close().unwrap(); | ||
|
||
// Schema of output is plain, not dictionary or REE encoded!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a feeling this could be because we're not handling REE when handling the arrow type hint here:
fn apply_hint(parquet: DataType, hint: DataType) -> DataType { |
Because there's not a direct 1:1 mapping for arrow type to parquet type, the parquet writer will write the encoded arrow schema to the parquet file metadata, and will use this to convert the parquet types back to the arrow types when it reads.
We might have to add something like in that schema::primitive::apply_hint
functino:
(_, DataType::RunEndEncoded(_, value)) => {
let hinted = apply_hint(parquet, value.data_type().clone());
match &hinted == value.data_type() {
true => hint,
false => hinted,
}
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if this has been added ^^^ I suspect it might cause an error. If i'm not mistaken, once the arrow type is known, when we construct the reader we'll end up in this builder code:
arrow-rs/parquet/src/arrow/array_reader/builder.rs
Lines 381 to 394 in 1dacecb
PhysicalType::BYTE_ARRAY => match arrow_type { | |
Some(DataType::Dictionary(_, _)) => { | |
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? | |
} | |
Some(DataType::Utf8View | DataType::BinaryView) => { | |
make_byte_view_array_reader(page_iterator, column_desc, arrow_type)? | |
} | |
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?, | |
}, | |
PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type { | |
Some(DataType::Dictionary(_, _)) => { | |
make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)? | |
} | |
_ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?, |
and because there's no match for the arrow datatype DataType::RunEndEncoded
, we'll fall into this branch which may error
_ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
The quick fix would be to add something like this, which will still cause the array to read back as plan encoded array.
Some(DataType::RunEndEncoded(_, val)) => {
make_byte_array_reader(page_iterator, column_desc, Some(val.data_type().clone()))?
}
The more involved (and probably more correct) thing to do would be to add a reader implementation for REE. Similarly to what's been done for dictionary arrays
https://github.com/apache/arrow-rs/blob/main/parquet/src/arrow/array_reader/byte_array_dictionary.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolutely right on both counts! By adding the hint, the schema is now REE, and it then fails with invalid data type for byte array reader - RunEndEncoded
, but adding the quick fix gets past that. I will commit that now so we at least have that.
I agree that adding a reader implementation for REE sounds more correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed an update with your suggestions now -- thank you! It makes sense to add the reader implementation in this PR as well, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense to add the reader implementation in this PR as well, right?
Sure, if you have time!
If you don't have time to add in this PR maybe we could maybe add it as a followup (I'd be happy to help with this as well, if needed).
The temporary workaround to get the Parquet column to decode to the correct Arrow type would be to decode the column to the native Arrow type, and then convert to a RunArray
by casting. This is less efficient because we materialize a full length array before encoding it to REE.
For non-byte types (numberic types), this would already be happening. We follow this branch when creating the reader:
arrow-rs/parquet/src/arrow/array_reader/builder.rs
Lines 354 to 358 in 1dacecb
Box::new(PrimitiveArrayReader::<Int32Type>::new( | |
page_iterator, | |
column_desc, | |
arrow_type, | |
)?) as _ |
And then in the
PrimitiveArrayReader
we do the cast here:_ => arrow_cast::cast(&array, target_type)?, |
For byte types (String, Binary, etc), we'd need to do something like this:
albertlockett@f3d072f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let's do the reader in another PR after all!
I've applied your diff -- thank you! The test I added (arrow_writer_run_end_encoded_string
) is now failing with Casting from Utf8 to RunEndEncoded
here in cast_with_options
when the source type is Utf8
:
arrow-rs/arrow-cast/src/cast/mod.rs
Lines 1248 to 1250 in 4506998
_ => Err(ArrowError::CastError(format!( | |
"Casting from {from_type:?} to {to_type:?} not supported", | |
))), |
Similarly, added another test arrow_writer_run_end_encoded_int which also fails on the cast in cast_with_options
.
I know there's work on casting in progress here: #7713 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb related to your comment here: #3520 (comment)
In the discussion above, we were tentatively thinking that the shortest path to getting REE support would be to decode the column to the native arrow array using the existing readers, and then cast it to a RunArray
. Then, maybe adding the dedicated REE reader as a followup. Does that sound like a workable approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that sounds like a very good approach -- especially if we focus on the tests for reading/writing REE arrays first so when we go to implement the optimized version we already have good test coverage
part of #863 Because some OTAP fields are optional, in a stream of record batches we may receive subsequent batches with different schemas. Parquet doesn't support having row groups with different sets of column chunks, which means we need to know the schema a-priori when the writer is created. This PR adds code to normalize the schema of the record batch before writing by: - putting all the fields in the same order - creating all null/default value columns for any missing column The missing columns should have a small overhead when written to disk, because parquet will either write an entirely empty column chunk for the null column (all null count, no data), or and for all default-value columns, parquet will use dictionary and RLE encoding by default, leading to a small column chunk with a single value value in dict & a single run for the key. What's unfortunate is that we still materialize an all-null column before writing with the length of the record batch. This can be optimized when run-end encoded arrays are supported in parquet, because we could just create a run array with a single run of null/default value. The arrow community is currently working on adding support (see apache/arrow-rs#7713 & apache/arrow-rs#8069). --------- Co-authored-by: Laurent Quérel <[email protected]>
Thanks so much for your advice @albertlockett! I hope to address this soon |
608b869
to
0d8b35e
Compare
0d8b35e
to
1e1a952
Compare
1e1a952
to
09c94ff
Compare
Which issue does this PR close?