diff --git a/arrow-array/src/array/run_array.rs b/arrow-array/src/array/run_array.rs index 05cfa2d17135..0166a1a048b5 100644 --- a/arrow-array/src/array/run_array.rs +++ b/arrow-array/src/array/run_array.rs @@ -32,12 +32,12 @@ use crate::{ /// An array of [run-end encoded values](https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout) /// -/// This encoding is variation on [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding) +/// This encoding is a variation on [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding) /// and is good for representing data containing same values repeated consecutively. /// /// [`RunArray`] contains `run_ends` array and `values` array of same length. /// The `run_ends` array stores the indexes at which the run ends. The `values` array -/// stores the value of each run. Below example illustrates how a logical array is represented in +/// stores the value of each run. The below example illustrates how a logical array is represented in /// [`RunArray`] /// /// diff --git a/arrow-cast/src/cast/mod.rs b/arrow-cast/src/cast/mod.rs index 43ad4b0c6f65..8a3e1cccb945 100644 --- a/arrow-cast/src/cast/mod.rs +++ b/arrow-cast/src/cast/mod.rs @@ -130,7 +130,8 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { | FixedSizeList(_, _) | Struct(_) | Map(_, _) - | Dictionary(_, _), + | Dictionary(_, _) + | RunEndEncoded(_, _), ) => true, // Dictionary/List conditions should be put in front of others (Dictionary(_, from_value_type), Dictionary(_, to_value_type)) => { @@ -179,6 +180,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { _ => false, } } + // TODO: RunEndEncoded here? // cast one decimal type to another decimal type ( Decimal32(_, _) | Decimal64(_, _) | Decimal128(_, _) | Decimal256(_, _), @@ -814,6 +816,7 @@ pub fn cast_with_options( "Casting from type {from_type} to dictionary type {to_type} not supported", ))), }, + // TODO: RunEndEncoded here? (List(_), List(to)) => cast_list_values::(array, to, cast_options), (LargeList(_), LargeList(to)) => cast_list_values::(array, to, cast_options), (List(_), LargeList(list_to)) => cast_list::(array, list_to, cast_options), diff --git a/arrow-schema/src/datatype.rs b/arrow-schema/src/datatype.rs index 32bce3347404..a8514cb14a44 100644 --- a/arrow-schema/src/datatype.rs +++ b/arrow-schema/src/datatype.rs @@ -353,7 +353,7 @@ pub enum DataType { /// that contain many repeated values using less memory, but with /// a higher CPU overhead for some operations. /// - /// This type mostly used to represent low cardinality string + /// This type is mostly used to represent low cardinality string /// arrays or a limited set of primitive types as integers. Dictionary(Box, Box), /// Exact 32-bit width decimal value with precision and scale diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 92583155605b..aa2067c040f5 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -67,6 +67,28 @@ pub fn make_byte_array_reader( pages, data_type, reader, ))) } + // TODO eventually add a dedicated [`ArrayReader`] for REE + ArrowType::RunEndEncoded(_, ref val_field) => match val_field.data_type() { + ArrowType::Binary + | ArrowType::Utf8 + | ArrowType::Decimal128(_, _) + | ArrowType::Decimal256(_, _) => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + ArrowType::LargeUtf8 | ArrowType::LargeBinary => { + let reader = GenericRecordReader::new(column_desc); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + _ => Err(general_err!( + "invalid run end encoded value type for byte array reader - {}", + data_type + )), + }, _ => Err(general_err!( "invalid data type for byte array reader - {}", data_type @@ -147,6 +169,11 @@ impl ArrayReader for ByteArrayReader { .with_precision_and_scale(p, s)?; Arc::new(decimal) } + // TODO eventually add a dedicated [`ArrayReader`] for REE + ArrowType::RunEndEncoded(_, ref val_field) => { + let array = buffer.into_array(null_buffer, val_field.data_type().clone()); + arrow_cast::cast(&array, &self.data_type)? + } _ => buffer.into_array(null_buffer, self.data_type.clone()), }; diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 2deb3c535a12..a991fdcdd201 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -28,7 +28,7 @@ use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, - LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, + LargeBinaryArray, LargeStringArray, RunArray, StringArray, StringViewArray, }; use arrow_schema::DataType; @@ -59,6 +59,28 @@ macro_rules! downcast_dict_op { }; } +macro_rules! downcast_ree_impl { + ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{ + $op($array + .as_any() + .downcast_ref::>() + .unwrap() + .downcast::<$val>() + .unwrap()$(, $arg)*) + }}; +} + +macro_rules! downcast_ree_op { + ($run_end_field:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => { + match $run_end_field.data_type() { + DataType::Int16 => downcast_ree_impl!($array, Int16Type, $val, $op$(, $arg)*), + DataType::Int32 => downcast_ree_impl!($array, Int32Type, $val, $op$(, $arg)*), + DataType::Int64 => downcast_ree_impl!($array, Int64Type, $val, $op$(, $arg)*), + _ => unreachable!(), + } + }; +} + macro_rules! downcast_op { ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => { match $data_type { @@ -90,6 +112,20 @@ macro_rules! downcast_op { } d => unreachable!("cannot downcast {} dictionary value to byte array", d), }, + DataType::RunEndEncoded(run_end, value) => match value.data_type() { + DataType::Utf8 => downcast_ree_op!(run_end, StringArray, $array, $op$(, $arg)*), + DataType::LargeUtf8 => { + downcast_ree_op!(run_end, LargeStringArray, $array, $op$(, $arg)*) + } + DataType::Binary => downcast_ree_op!(run_end, BinaryArray, $array, $op$(, $arg)*), + DataType::LargeBinary => { + downcast_ree_op!(run_end, LargeBinaryArray, $array, $op$(, $arg)*) + } + DataType::FixedSizeBinary(_) => { + downcast_ree_op!(run_end, FixedSizeBinaryArray, $array, $op$(, $arg)*) + } + d => unreachable!("cannot downcast {} run end encoded value to byte array", d), + }, d => unreachable!("cannot downcast {} to byte array", d), } }; diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 3c283bcbe3d2..93ef83da0f47 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -222,6 +222,10 @@ impl LevelInfoBuilder { _ => unreachable!(), }) } + DataType::RunEndEncoded(_, v) if is_leaf(v.data_type()) => { + let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone()); + Ok(Self::Primitive(levels)) + } d => Err(nyi_err!("Datatype {} is not yet supported", d)), } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 25fd2396c190..589f8b23b789 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1062,15 +1062,15 @@ impl ArrowColumnWriterFactory { match data_type { _ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?), - ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?), + ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => { + out.push(col(leaves.next().unwrap())?) + } ArrowDataType::LargeBinary | ArrowDataType::Binary | ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::BinaryView - | ArrowDataType::Utf8View => { - out.push(bytes(leaves.next().unwrap())?) - } + | ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?), ArrowDataType::List(f) | ArrowDataType::LargeList(f) | ArrowDataType::FixedSizeList(f, _) => { @@ -1087,21 +1087,27 @@ impl ArrowColumnWriterFactory { self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)? } _ => unreachable!("invalid map type"), - } + }, ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { - ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => { - out.push(bytes(leaves.next().unwrap())?) - } + ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Binary + | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?), ArrowDataType::Utf8View | ArrowDataType::BinaryView => { out.push(bytes(leaves.next().unwrap())?) } - ArrowDataType::FixedSizeBinary(_) => { - out.push(bytes(leaves.next().unwrap())?) - } - _ => { - out.push(col(leaves.next().unwrap())?) - } - } + ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?), + _ => out.push(col(leaves.next().unwrap())?), + }, + ArrowDataType::RunEndEncoded(_, value_type) => match value_type.data_type() { + ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Binary + | ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?), + ArrowDataType::Utf8View | ArrowDataType::BinaryView => out.push(bytes(leaves.next().unwrap())?), + ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?), + _ => out.push(col(leaves.next().unwrap())?), + }, _ => return Err(ParquetError::NYI( format!( "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented" @@ -1195,6 +1201,41 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result match value_type.data_type() { + ArrowDataType::Decimal32(_, _) => { + let array = arrow_cast::cast(column, value_type.data_type())?; + let array = array + .as_primitive::() + .unary::<_, Int32Type>(|v| v); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Decimal64(_, _) => { + let array = arrow_cast::cast(column, value_type.data_type())?; + let array = array + .as_primitive::() + .unary::<_, Int32Type>(|v| v as i32); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Decimal128(_, _) => { + let array = arrow_cast::cast(column, value_type.data_type())?; + let array = array + .as_primitive::() + .unary::<_, Int32Type>(|v| v as i32); + write_primitive(typed, array.values(), levels) + } + ArrowDataType::Decimal256(_, _) => { + let array = arrow_cast::cast(column, value_type.data_type())?; + let array = array + .as_primitive::() + .unary::<_, Int32Type>(|v| v.as_i128() as i32); + write_primitive(typed, array.values(), levels) + } + _ => { + let array = arrow_cast::cast(column, &ArrowDataType::Int32)?; + let array = array.as_primitive::(); + write_primitive(typed, array.values(), levels) + } + }, _ => { let array = arrow_cast::cast(column, &ArrowDataType::Int32)?; let array = array.as_primitive::(); @@ -1277,6 +1318,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { + return Err(ParquetError::NYI( + "Int64ColumnWriter: Attempting to write an Arrow REE type that is not yet implemented" + .to_string(), + )); + } _ => { let array = arrow_cast::cast(column, &ArrowDataType::Int64)?; let array = array.as_primitive::(); @@ -1353,6 +1400,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result(); get_float_16_array_slice(array, indices) } + ArrowDataType::RunEndEncoded(_run_ends, _values) => { + return Err(ParquetError::NYI( + "FixedLenByteArrayColumnWriter: Attempting to write an Arrow REE type that is not yet implemented" + .to_string(), + )); + } _ => { return Err(ParquetError::NYI( "Attempting to write an Arrow type that is not yet implemented".to_string(), @@ -4391,4 +4444,132 @@ mod tests { assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024); assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + + #[test] + fn arrow_writer_run_end_encoded_string() { + // Create a run array of strings + let mut builder = StringRunBuilder::::new(); + builder.extend( + vec![Some("alpha"); 100000] + .into_iter() + .chain(vec![Some("beta"); 100000]), + ); + let run_array: RunArray = builder.finish(); + let schema = Arc::new(Schema::new(vec![Field::new( + "ree", + run_array.data_type().clone(), + run_array.is_nullable(), + )])); + + // Write to parquet + let mut parquet_bytes: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Read back and verify + let bytes = Bytes::from(parquet_bytes); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); + + // Check if dictionary was used by examining the metadata + let metadata = reader.metadata(); + let row_group = &metadata.row_groups()[0]; + let col_meta = &row_group.columns()[0]; + + // If dictionary encoding worked, we should see RLE_DICTIONARY encoding + // and have a dictionary page offset + let has_dict_encoding = col_meta.encodings().contains(&Encoding::RLE_DICTIONARY); + let has_dict_page = col_meta.dictionary_page_offset().is_some(); + + // Verify the schema is REE encoded when we read it back + let expected_schema = Arc::new(Schema::new(vec![Field::new( + "ree", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, false)), + Arc::new(Field::new("values", arrow_schema::DataType::Utf8, true)), + ), + false, + )])); + assert_eq!(&expected_schema, reader.schema()); + + // Read the data back + let batches: Vec<_> = reader + .build() + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(batches.len(), 196); + // Count rows in total + let total_rows = batches.iter().map(|b| b.num_rows()).sum::(); + assert_eq!(total_rows, 200000); + + // Ensure dictionary encoding + assert!(has_dict_encoding, "RunArray should be dictionary encoded"); + assert!(has_dict_page, "RunArray should have dictionary page"); + } + + #[test] + fn arrow_writer_run_end_encoded_int() { + // Create a run array of strings + let mut builder = PrimitiveRunBuilder::::new(); + builder.extend( + vec![Some(1); 100000] + .into_iter() + .chain(vec![Some(2); 100000]), + ); + let run_array: RunArray = builder.finish(); + let schema = Arc::new(Schema::new(vec![Field::new( + "ree", + run_array.data_type().clone(), + run_array.is_nullable(), + )])); + + // Write to parquet + let mut parquet_bytes: Vec = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // Read back and verify + let bytes = Bytes::from(parquet_bytes); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); + + // Check if dictionary was used by examining the metadata + let metadata = reader.metadata(); + let row_group = &metadata.row_groups()[0]; + let col_meta = &row_group.columns()[0]; + + // If dictionary encoding worked, we should see RLE_DICTIONARY encoding + // and have a dictionary page offset + let has_dict_encoding = col_meta.encodings().contains(&Encoding::RLE_DICTIONARY); + let has_dict_page = col_meta.dictionary_page_offset().is_some(); + + // Verify the schema is REE encoded when we read it back + let expected_schema = Arc::new(Schema::new(vec![Field::new( + "ree", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, false)), + Arc::new(Field::new("values", arrow_schema::DataType::Int32, true)), + ), + false, + )])); + assert_eq!(&expected_schema, reader.schema()); + + // Read the data back + let batches: Vec<_> = reader + .build() + .unwrap() + .collect::>>() + .unwrap(); + assert_eq!(batches.len(), 196); + // Count rows in total + let total_rows = batches.iter().map(|b| b.num_rows()).sum::(); + assert_eq!(total_rows, 200000); + + // Ensure dictionary encoding + assert!(has_dict_encoding, "RunArray should be dictionary encoded"); + assert!(has_dict_page, "RunArray should have dictionary page"); + } } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 5b079b66276a..da23252f1e40 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -129,7 +129,7 @@ pub fn parquet_to_arrow_field_levels( match complex::convert_schema(schema, mask, hint)? { Some(field) => match &field.arrow_type { DataType::Struct(fields) => Ok(FieldLevels { - fields: fields.clone(), + fields: fields.to_owned(), levels: Some(field), }), _ => unreachable!(), @@ -303,7 +303,7 @@ impl<'a> ArrowSchemaConverter<'a> { /// /// Setting this option to `true` will result in Parquet files that can be /// read by more readers, but may lose precision for Arrow types such as - /// [`DataType::Date64`] which have no direct [corresponding Parquet type]. + /// [`DataType::Date64`] which have no direct corresponding Parquet type. /// /// By default, this converter does not coerce to native Parquet types. Enabling type /// coercion allows for meaningful representations that do not require @@ -771,12 +771,17 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { DataType::Union(_, _) => unimplemented!("See ARROW-8817."), DataType::Dictionary(_, ref value) => { // Dictionary encoding not handled at the schema level - let dict_field = field.clone().with_data_type(value.as_ref().clone()); + let dict_field = field.to_owned().with_data_type(value.as_ref().clone()); + arrow_to_parquet_type(&dict_field, coerce_types) + } + DataType::RunEndEncoded(_run_end_type, value_type) => { + // We want to write REE data as dictionary encoded data, + // which is not handled at the schema level. + let dict_field = field + .to_owned() + .with_data_type(value_type.data_type().to_owned()); arrow_to_parquet_type(&dict_field, coerce_types) } - DataType::RunEndEncoded(_, _) => Err(arrow_err!( - "Converting RunEndEncodedType to parquet not supported", - )), } } @@ -2272,4 +2277,22 @@ mod tests { Ok(()) } + + #[test] + fn test_run_end_encoded_conversion() { + use crate::basic::Type; + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false)); + let values_field = Arc::new(Field::new("values", DataType::Boolean, true)); + let run_end_encoded_field = Field::new( + "run_end_encoded_16", + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ); + + let result = arrow_to_parquet_type(&run_end_encoded_field, false).unwrap(); + // Should convert to the underlying value type (Boolean in this case) + assert_eq!(result.get_physical_type(), Type::BOOLEAN); + assert_eq!(result.get_basic_info().repetition(), Repetition::REQUIRED); // field is not nullable + assert_eq!(result.name(), "run_end_encoded_16"); + } } diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 1b3ab7d45c51..7c8167a2668d 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -102,6 +102,18 @@ fn apply_hint(parquet: DataType, hint: DataType) -> DataType { false => hinted, } } + + // Potentially preserve run end encoded encoding + (_, DataType::RunEndEncoded(_, value)) => { + // Apply hint to inner type + let hinted = apply_hint(parquet, value.data_type().clone()); + // If matches run end encoded value - preserve REE type + // otherwise use hinted inner type + match &hinted == value.data_type() { + true => hint, + false => hinted, + } + } _ => parquet, } }