From 5b7d6b7483ced26d9c72e4d92828f78833c503a3 Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Sat, 12 Jul 2025 07:47:38 -0700 Subject: [PATCH 1/5] feat(encoding): Add Parquet encoding support This commit introduces Parquet encoding support to the Prolly Tree library. Changes include: - Added the `parquet` crate as a dependency. - Updated the `EncodingType` enum to include `Parquet`. - Implemented the `encode_parquet` function. - Added a test case for Parquet encoding and made tests order-agnostic. - Bumped the version to `0.2.0-alpha.1. --- Cargo.toml | 11 ++--- src/encoding.rs | 107 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7e0c70b..a1406e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "prollytree" description = "A prolly (probabilistic) tree for efficient storage, retrieval, and modification of ordered data." authors = ["Feng Zhang "] -version = "0.1.0-beta.1" +version = "0.2.0-alpha.1" edition = "2021" license = "Apache-2.0" @@ -17,22 +17,17 @@ base64 = { version = "0.22.0", optional = true } sha2 = "0.10" tracing = { version = "0.1.37", optional = true } rand = "0.9.0" -lazy_static = "1.4.0" serde = { version = "1.0", features = ["derive"] } -hex = "0.4.3" bincode = "1.3.3" thiserror = "2.0.3" twox-hash = "2.0" serde_json = "1.0.117" arrow = "54.2.1" schemars = "0.8" +parquet = { version = "54.0.0", features = ["arrow"] } [dev-dependencies] -assert_matches = "1.5.0" -criterion = "0.5.1" -insta = "1.31.0" -paste = "1.0.14" -proptest = "1.2.0" +bytes = "1.10.1" tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/src/encoding.rs b/src/encoding.rs index 7fd0cf4..e0c6eff 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -20,6 +20,7 @@ use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::ipc::writer::StreamWriter; use arrow::record_batch::RecordBatch; +use parquet::arrow::arrow_writer::ArrowWriter; use schemars::schema::RootSchema; use schemars::schema::SchemaObject; use serde::{Deserialize, Serialize}; @@ -29,6 +30,7 @@ use std::sync::Arc; pub enum EncodingType { Json, Arrow, + Parquet, } impl ProllyNode { @@ -36,6 +38,7 @@ impl ProllyNode { let encoded_value = match self.encode_types[encoding_index] { EncodingType::Json => self.encode_json(), EncodingType::Arrow => self.encode_arrow(), + EncodingType::Parquet => self.encode_parquet(), }; self.encode_values[encoding_index] = encoded_value; } @@ -67,6 +70,24 @@ impl ProllyNode { encoded_data } + fn encode_parquet(&self) -> Vec { + // Convert keys and values to arrays based on their schemas + let key_batch = self.convert_to_arrow_array(&self.keys, &self.key_schema); + let value_batch = self.convert_to_arrow_array(&self.values, &self.value_schema); + + // Combine the two RecordBatches into one + let combined_batch = self.combine_record_batches(key_batch, value_batch); + let schema = combined_batch.schema(); + + // Encode to Parquet format + let mut encoded_data = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut encoded_data, schema, None).unwrap(); + writer.write(&combined_batch).unwrap(); + writer.close().unwrap(); + + encoded_data + } + fn combine_record_batches( &self, key_batch: RecordBatch, @@ -195,6 +216,7 @@ impl ProllyNode { mod tests { use super::*; use arrow::ipc::reader::StreamReader; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use schemars::{schema_for, JsonSchema}; #[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq)] @@ -350,7 +372,90 @@ balance: 100, -50 description: value1, value2 name: name1, name2 "#; - assert_eq!(batch_string, expected_output); + // Sort the lines of both strings to compare them + let mut actual_lines: Vec<&str> = batch_string.trim().lines().collect(); + actual_lines.sort_unstable(); + let mut expected_lines: Vec<&str> = expected_output.trim().lines().collect(); + expected_lines.sort_unstable(); + + assert_eq!(actual_lines, expected_lines); + } + } + + #[test] + fn test_encode_parquet() { + let mut node: ProllyNode<1024> = ProllyNode::default(); + + let keys = [ + ComplexKey { + id: 1, + uuid: "guid-key1".to_string(), + }, + ComplexKey { + id: 2, + uuid: "guid-key2".to_string(), + }, + ]; + let values = [ + ComplexValue { + name: "name1".to_string(), + age: 30, + description: "value1".to_string(), + active: true, + balance: 100.0, + }, + ComplexValue { + name: "name2".to_string(), + age: 55, + description: "value2".to_string(), + active: false, + balance: -50.0, + }, + ]; + + node.keys = keys + .iter() + .map(|k| serde_json::to_vec(k).unwrap()) + .collect(); + node.values = values + .iter() + .map(|v| serde_json::to_vec(v).unwrap()) + .collect(); + node.encode_types = vec![EncodingType::Parquet]; + + let key_schema = schema_for!(ComplexKey); + let value_schema = schema_for!(ComplexValue); + node.key_schema = Some(key_schema); + node.value_schema = Some(value_schema); + + node.encode_all_pairs(); + + for encoded_value in &node.encode_values { + // Decode the Parquet format + let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(encoded_value.clone())).unwrap(); + let mut reader = builder.build().unwrap(); + let batch = reader.next().unwrap().unwrap(); + + // Convert the RecordBatch to a string for comparison + let batch_string = record_batch_to_string(&batch); + assert_eq!(batch.num_rows(), 2); + println!("{}", batch_string); + // Define the expected output + let expected_output = r#"id: 1, 2 +uuid: guid-key1, guid-key2 +name: name1, name2 +age: 30, 55 +description: value1, value2 +active: true, false +balance: 100, -50 +"#; + // Sort the lines of both strings to compare them + let mut actual_lines: Vec<&str> = batch_string.trim().lines().collect(); + actual_lines.sort_unstable(); + let mut expected_lines: Vec<&str> = expected_output.trim().lines().collect(); + expected_lines.sort_unstable(); + + assert_eq!(actual_lines, expected_lines); } } From e67fefd0695a5268ae726fba46a35b401d1123ee Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Sat, 12 Jul 2025 07:49:11 -0700 Subject: [PATCH 2/5] style: rustfmt --- src/encoding.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/encoding.rs b/src/encoding.rs index e0c6eff..1088a7b 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -432,7 +432,9 @@ name: name1, name2 for encoded_value in &node.encode_values { // Decode the Parquet format - let builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(encoded_value.clone())).unwrap(); + let builder = + ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(encoded_value.clone())) + .unwrap(); let mut reader = builder.build().unwrap(); let batch = reader.next().unwrap().unwrap(); From bf87a0a4751249007968a3ec8947a76c75c73432 Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Sat, 12 Jul 2025 07:55:49 -0700 Subject: [PATCH 3/5] refactor(encoding): Remove unwrap() calls and propagate errors --- Cargo.toml | 2 +- src/encoding.rs | 168 +++++++++++++++++++++++++++++------------------- src/errors.rs | 20 +++++- 3 files changed, 122 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a1406e7..91e629c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ tracing = { version = "0.1.37", optional = true } rand = "0.9.0" serde = { version = "1.0", features = ["derive"] } bincode = "1.3.3" -thiserror = "2.0.3" +thiserror = "1.0" twox-hash = "2.0" serde_json = "1.0.117" arrow = "54.2.1" diff --git a/src/encoding.rs b/src/encoding.rs index 1088a7b..9781d12 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -14,6 +14,7 @@ limitations under the License. #![allow(unused_imports)] +use crate::errors::ProllyTreeError; use crate::node::ProllyNode; use arrow::array::{Array, Float64Array}; use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray}; @@ -34,27 +35,28 @@ pub enum EncodingType { } impl ProllyNode { - pub fn encode_pairs(&mut self, encoding_index: usize) { + pub fn encode_pairs(&mut self, encoding_index: usize) -> Result<(), ProllyTreeError> { let encoded_value = match self.encode_types[encoding_index] { - EncodingType::Json => self.encode_json(), - EncodingType::Arrow => self.encode_arrow(), - EncodingType::Parquet => self.encode_parquet(), + EncodingType::Json => self.encode_json()?, + EncodingType::Arrow => self.encode_arrow()?, + EncodingType::Parquet => self.encode_parquet()?, }; self.encode_values[encoding_index] = encoded_value; + Ok(()) } - fn encode_json(&self) -> Vec { + fn encode_json(&self) -> Result, ProllyTreeError> { let pairs: Vec<(&Vec, &Vec)> = self.keys.iter().zip(self.values.iter()).collect(); - serde_json::to_vec(&pairs).unwrap_or_else(|_| Vec::new()) + Ok(serde_json::to_vec(&pairs)?) } - fn encode_arrow(&self) -> Vec { + fn encode_arrow(&self) -> Result, ProllyTreeError> { // Convert keys and values to arrays based on their schemas - let key_batch = self.convert_to_arrow_array(&self.keys, &self.key_schema); - let value_batch = self.convert_to_arrow_array(&self.values, &self.value_schema); + let key_batch = self.convert_to_arrow_array(&self.keys, &self.key_schema)?; + let value_batch = self.convert_to_arrow_array(&self.values, &self.value_schema)?; // Combine the two RecordBatches into one - let combined_batch = self.combine_record_batches(key_batch, value_batch); + let combined_batch = self.combine_record_batches(key_batch, value_batch)?; // Define the schema let schema = combined_batch.schema(); @@ -62,37 +64,37 @@ impl ProllyNode { // Encode to Arrow IPC format let mut encoded_data = Vec::new(); { - let mut writer = StreamWriter::try_new(&mut encoded_data, &schema).unwrap(); - writer.write(&combined_batch).unwrap(); - writer.finish().unwrap(); + let mut writer = StreamWriter::try_new(&mut encoded_data, &schema)?; + writer.write(&combined_batch)?; + writer.finish()?; } - encoded_data + Ok(encoded_data) } - fn encode_parquet(&self) -> Vec { + fn encode_parquet(&self) -> Result, ProllyTreeError> { // Convert keys and values to arrays based on their schemas - let key_batch = self.convert_to_arrow_array(&self.keys, &self.key_schema); - let value_batch = self.convert_to_arrow_array(&self.values, &self.value_schema); + let key_batch = self.convert_to_arrow_array(&self.keys, &self.key_schema)?; + let value_batch = self.convert_to_arrow_array(&self.values, &self.value_schema)?; // Combine the two RecordBatches into one - let combined_batch = self.combine_record_batches(key_batch, value_batch); + let combined_batch = self.combine_record_batches(key_batch, value_batch)?; let schema = combined_batch.schema(); // Encode to Parquet format let mut encoded_data = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut encoded_data, schema, None).unwrap(); - writer.write(&combined_batch).unwrap(); - writer.close().unwrap(); + let mut writer = ArrowWriter::try_new(&mut encoded_data, schema, None)?; + writer.write(&combined_batch)?; + writer.close()?; - encoded_data + Ok(encoded_data) } fn combine_record_batches( &self, key_batch: RecordBatch, value_batch: RecordBatch, - ) -> RecordBatch { + ) -> Result { // Extract columns from both batches let mut columns = Vec::new(); let mut fields = Vec::new(); @@ -117,11 +119,15 @@ impl ProllyNode { let schema = Arc::new(Schema::new(fields)); // Create a new RecordBatch with combined columns and schema - RecordBatch::try_new(schema, columns).unwrap() + Ok(RecordBatch::try_new(schema, columns)?) } - fn convert_to_arrow_array(&self, data: &[Vec], schema: &Option) -> RecordBatch { - let schema = schema.as_ref().unwrap(); + fn convert_to_arrow_array( + &self, + data: &[Vec], + schema: &Option, + ) -> Result { + let schema = schema.as_ref().ok_or(ProllyTreeError::SchemaNotFound)?; if let Some(object) = &schema.schema.object { let fields: Vec = object @@ -158,57 +164,87 @@ impl ProllyNode { }) .collect(); - let values: Vec = data + let values: Result, _> = data .iter() - .map(|v| serde_json::from_slice(v).unwrap()) + .map(|v| serde_json::from_slice(v)) .collect(); + let values = values?; - let arrays: Vec = fields + let arrays: Result, _> = fields .iter() - .map(|field| match field.data_type() { - DataType::Utf8 => { - let string_values: Vec<&str> = values - .iter() - .map(|value| value.get(field.name()).unwrap().as_str().unwrap()) - .collect(); - Arc::new(StringArray::from(string_values)) as ArrayRef - } - DataType::Int32 => { - let int_values: Vec = values - .iter() - .map(|value| value.get(field.name()).unwrap().as_i64().unwrap() as i32) - .collect(); - Arc::new(Int32Array::from(int_values)) as ArrayRef - } - DataType::Boolean => { - let bool_values: Vec = values - .iter() - .map(|value| value.get(field.name()).unwrap().as_bool().unwrap()) - .collect(); - Arc::new(BooleanArray::from(bool_values)) as ArrayRef - } - DataType::Float64 => { - let float_values: Vec = values - .iter() - .map(|value| value.get(field.name()).unwrap().as_f64().unwrap()) - .collect(); - Arc::new(Float64Array::from(float_values)) as ArrayRef + .map(|field| -> Result { + match field.data_type() { + DataType::Utf8 => { + let string_values: Result, _> = values + .iter() + .map(|value| { + value + .get(field.name()) + .and_then(|v| v.as_str()) + .ok_or(ProllyTreeError::InvalidJsonValue) + }) + .collect(); + Ok(Arc::new(StringArray::from(string_values?)) as ArrayRef) + } + DataType::Int32 => { + let int_values: Result, _> = values + .iter() + .map(|value| { + value + .get(field.name()) + .and_then(|v| v.as_i64()) + .map(|v| v as i32) + .ok_or(ProllyTreeError::InvalidJsonValue) + }) + .collect(); + Ok(Arc::new(Int32Array::from(int_values?)) as ArrayRef) + } + DataType::Boolean => { + let bool_values: Result, _> = values + .iter() + .map(|value| { + value + .get(field.name()) + .and_then(|v| v.as_bool()) + .ok_or(ProllyTreeError::InvalidJsonValue) + }) + .collect(); + Ok(Arc::new(BooleanArray::from(bool_values?)) as ArrayRef) + } + DataType::Float64 => { + let float_values: Result, _> = values + .iter() + .map(|value| { + value + .get(field.name()) + .and_then(|v| v.as_f64()) + .ok_or(ProllyTreeError::InvalidJsonValue) + }) + .collect(); + Ok(Arc::new(Float64Array::from(float_values?)) as ArrayRef) + } + _ => panic!("Unsupported data type"), } - _ => panic!("Unsupported data type"), }) .collect(); // Create a RecordBatch to return - return RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).unwrap(); + Ok(RecordBatch::try_new( + Arc::new(Schema::new(fields)), + arrays?, + )?) + } + else { + panic!("Unsupported schema") } - panic!("Unsupported schema"); } - pub fn encode_all_pairs(&mut self) { + pub fn encode_all_pairs(&mut self) -> Result<(), ProllyTreeError> { self.encode_values = vec![Vec::new(); self.encode_types.len()]; for i in 0..self.encode_types.len() { - self.encode_pairs(i); + self.encode_pairs(i)?; } + Ok(()) } } @@ -241,7 +277,7 @@ mod tests { node.values = vec![b"value1".to_vec(), b"value2".to_vec()]; node.encode_types = vec![EncodingType::Json]; - node.encode_all_pairs(); + node.encode_all_pairs().unwrap(); for encoded_value in &node.encode_values { let decoded: Vec<(Vec, Vec)> = serde_json::from_slice(encoded_value).unwrap(); @@ -293,7 +329,7 @@ mod tests { .collect(); node.encode_types = vec![EncodingType::Json]; - node.encode_all_pairs(); + node.encode_all_pairs().unwrap(); for encoded_value in &node.encode_values { let decoded: Vec<(Vec, Vec)> = serde_json::from_slice(encoded_value).unwrap(); @@ -352,7 +388,7 @@ mod tests { node.key_schema = Some(key_schema); node.value_schema = Some(value_schema); - node.encode_all_pairs(); + node.encode_all_pairs().unwrap(); for encoded_value in &node.encode_values { // Decode the Arrow IPC format @@ -428,7 +464,7 @@ name: name1, name2 node.key_schema = Some(key_schema); node.value_schema = Some(value_schema); - node.encode_all_pairs(); + node.encode_all_pairs().unwrap(); for encoded_value in &node.encode_values { // Decode the Parquet format diff --git a/src/errors.rs b/src/errors.rs index e0077de..738da33 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -15,7 +15,19 @@ limitations under the License. use thiserror::Error; #[derive(Error, Debug)] -pub enum Error { +pub enum ProllyTreeError { + #[error("Arrow error: {0}")] + Arrow(#[from] arrow::error::ArrowError), + + #[error("Parquet error: {0}")] + Parquet(#[from] parquet::errors::ParquetError), + + #[error("Serde JSON error: {0}")] + SerdeJson(#[from] serde_json::Error), + + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("Unsupported Value Type")] UnsupportedValueType, @@ -30,4 +42,10 @@ pub enum Error { #[error("Serde Error")] Serde, + + #[error("Schema not found")] + SchemaNotFound, + + #[error("Invalid JSON value")] + InvalidJsonValue, } From d1899213865be5c44cbf4acd0e70c8b63764e388 Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Sat, 12 Jul 2025 08:05:20 -0700 Subject: [PATCH 4/5] style: rustfmt --- src/encoding.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/encoding.rs b/src/encoding.rs index 9781d12..6a81599 100644 --- a/src/encoding.rs +++ b/src/encoding.rs @@ -164,10 +164,8 @@ impl ProllyNode { }) .collect(); - let values: Result, _> = data - .iter() - .map(|v| serde_json::from_slice(v)) - .collect(); + let values: Result, _> = + data.iter().map(|v| serde_json::from_slice(v)).collect(); let values = values?; let arrays: Result, _> = fields @@ -233,8 +231,7 @@ impl ProllyNode { Arc::new(Schema::new(fields)), arrays?, )?) - } - else { + } else { panic!("Unsupported schema") } } From 80ad414aeebe0aa0146455450bdddc10d99bb54c Mon Sep 17 00:00:00 2001 From: zhangfengcdt Date: Sat, 12 Jul 2025 08:11:19 -0700 Subject: [PATCH 5/5] docs: Mark Parquet/Avro encoding as finished in roadmap --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ad3d02b..901a146 100644 --- a/README.md +++ b/README.md @@ -185,7 +185,7 @@ The following features are for Prolly tree library for Version 0.1.0: The following features are for Prolly tree library for Version 0.2.0: - [X] Arrow block encoding and decoding -- [ ] Parquet/Avro block encoding and decoding +- [X] Parquet/Avro block encoding and decoding - [ ] Advanced probabilistic tree balancing ## Contributing