Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4890faa
Added arrow-avro `SchemaStore` and fingerprint support to `schema.rs`…
jecsand838 Jul 25, 2025
f8040c6
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 27, 2025
f2f34d0
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 27, 2025
3db9aed
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 28, 2025
da7b1b9
Address PR Comments
jecsand838 Jul 28, 2025
ed7fb49
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 28, 2025
8e6face
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 28, 2025
25c3899
Address Remaining PR Comments
jecsand838 Jul 28, 2025
bf55dba
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 31, 2025
a113399
Update arrow-avro/src/reader/mod.rs
jecsand838 Jul 31, 2025
5c8e045
Address Follow-up PR Comments
jecsand838 Jul 31, 2025
c8fce60
Update arrow-avro/src/codec.rs
jecsand838 Aug 1, 2025
c631218
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 1, 2025
d014bfb
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 1, 2025
7bffdb6
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 1, 2025
72afc79
Update arrow-avro/src/schema.rs
jecsand838 Aug 1, 2025
6aac06c
Address Follow-up PR Comments
jecsand838 Aug 2, 2025
81fae32
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
6fd3234
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
6a84a05
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
24cb8a2
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
07b55c4
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
4f734e2
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
9c828c6
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 4, 2025
dc56c70
Remove LRU cache feature and refactor StoreStore + AvroSchema handlin…
jecsand838 Aug 5, 2025
5a1b7e4
Merge branch 'main' into avro-reader-schema-store
jecsand838 Aug 5, 2025
fc6a5cb
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 5, 2025
eca7ca3
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 5, 2025
ef9051e
Update arrow-avro/src/reader/mod.rs
jecsand838 Aug 5, 2025
da0cfc2
Merge branch 'apache:main' into avro-reader-schema-store
jecsand838 Aug 7, 2025
c450401
Updated decoder benchmarks to include Avro single-object prefix gener…
jecsand838 Aug 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ zstd = { version = "0.13", default-features = false, optional = true }
bzip2 = { version = "0.6.0", optional = true }
xz = { version = "0.1", default-features = false, optional = true }
crc = { version = "3.0", optional = true }
uuid = "1.17"
strum_macros = "0.27"
uuid = "1.17"
indexmap = "2.10"


[dev-dependencies]
arrow-data = { workspace = true }
Expand Down
159 changes: 101 additions & 58 deletions arrow-avro/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,68 +27,89 @@ extern crate uuid;

use apache_avro::types::Value;
use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema};
use arrow_avro::{reader::ReaderBuilder, schema::Schema as AvroSchema};
use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC};
use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use once_cell::sync::Lazy;
use std::{hint::black_box, io, time::Duration};
use std::{hint::black_box, time::Duration};
use uuid::Uuid;

fn encode_records(schema: &ApacheSchema, rows: impl Iterator<Item = Value>) -> Vec<u8> {
fn make_prefix(fp: Fingerprint) -> [u8; 10] {
let Fingerprint::Rabin(val) = fp;
let mut buf = [0u8; 10];
buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit
buf
}

fn encode_records_with_prefix(
schema: &ApacheSchema,
prefix: &[u8],
rows: impl Iterator<Item = Value>,
) -> Vec<u8> {
let mut out = Vec::new();
for v in rows {
out.extend_from_slice(prefix);
out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum failed"));
}
out
}

fn gen_int(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_int(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as i32))])),
)
}

fn gen_long(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_long(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long(i as i64))])),
)
}

fn gen_float(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_float(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Float(i as f32 + 0.5678))])),
)
}

fn gen_bool(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_bool(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Boolean(i % 2 == 0))])),
)
}

fn gen_double(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_double(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Double(i as f64 + 0.1234))])),
)
}

fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_bytes(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let payload = vec![(i & 0xFF) as u8; 16];
Value::Record(vec![("field1".into(), Value::Bytes(payload))])
}),
)
}

fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_string(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let s = if i % 3 == 0 {
format!("value-{i}")
Expand All @@ -100,30 +121,34 @@ fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_date(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_date(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as i32))])),
)
}

fn gen_timemillis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_timemillis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int((i * 37) as i32))])),
)
}

fn gen_timemicros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_timemicros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long((i * 1_001) as i64))])),
)
}

fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_ts_millis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![(
"field1".into(),
Expand All @@ -133,9 +158,10 @@ fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_ts_micros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![(
"field1".into(),
Expand All @@ -145,10 +171,11 @@ fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
fn gen_map(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
use std::collections::HashMap;
encode_records(
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let mut m = HashMap::new();
let int_val = |v: i32| Value::Union(0, Box::new(Value::Int(v)));
Expand All @@ -165,9 +192,10 @@ fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_array(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_array(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let items = (0..5).map(|j| Value::Int(i as i32 + j)).collect();
Value::Record(vec![("field1".into(), Value::Array(items))])
Expand All @@ -189,9 +217,10 @@ fn trim_i128_be(v: i128) -> Vec<u8> {
full[first..].to_vec()
}

fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_decimal(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let unscaled = if i % 2 == 0 { i as i128 } else { -(i as i128) };
Value::Record(vec![(
Expand All @@ -202,9 +231,10 @@ fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_uuid(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let mut raw = (i as u128).to_be_bytes();
raw[6] = (raw[6] & 0x0F) | 0x40;
Expand All @@ -214,9 +244,10 @@ fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_fixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let mut buf = vec![0u8; 16];
buf[..8].copy_from_slice(&(i as u64).to_be_bytes());
Expand All @@ -225,9 +256,10 @@ fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_interval(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let months = (i % 24) as u32;
let days = (i % 32) as u32;
Expand All @@ -241,10 +273,11 @@ fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
fn gen_enum(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
const SYMBOLS: [&str; 3] = ["A", "B", "C"];
encode_records(
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let idx = i % 3;
Value::Record(vec![(
Expand All @@ -255,9 +288,10 @@ fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_mixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
Value::Record(vec![
("f1".into(), Value::Int(i as i32)),
Expand All @@ -269,9 +303,10 @@ fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}

fn gen_nested(sc: &ApacheSchema, n: usize) -> Vec<u8> {
encode_records(
fn gen_nested(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
encode_records_with_prefix(
sc,
prefix,
(0..n).map(|i| {
let sub = Value::Record(vec![
("x".into(), Value::Int(i as i32)),
Expand All @@ -290,12 +325,14 @@ fn new_decoder(
batch_size: usize,
utf8view: bool,
) -> arrow_avro::reader::Decoder {
let schema: AvroSchema<'static> = serde_json::from_str(schema_json).unwrap();
let schema = AvroSchema::new(schema_json.parse().unwrap());
let mut store = arrow_avro::schema::SchemaStore::new();
store.register(schema.clone()).unwrap();
ReaderBuilder::new()
.with_schema(schema)
.with_writer_schema_store(store)
.with_batch_size(batch_size)
.with_utf8_view(utf8view)
.build_decoder(io::empty())
.build_decoder()
.expect("failed to build decoder")
}

Expand Down Expand Up @@ -325,8 +362,8 @@ const ARRAY_SCHEMA: &str = r#"{"type":"record","name":"ArrRec","fields":[{"name"
const DECIMAL_SCHEMA: &str = r#"{"type":"record","name":"DecRec","fields":[{"name":"field1","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":3}}]}"#;
const UUID_SCHEMA: &str = r#"{"type":"record","name":"UuidRec","fields":[{"name":"field1","type":{"type":"string","logicalType":"uuid"}}]}"#;
const FIXED_SCHEMA: &str = r#"{"type":"record","name":"FixRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Fixed16","size":16}}]}"#;
const INTERVAL_SCHEMA_ENCODE: &str = r#"{"type":"record","name":"DurRecEnc","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
const INTERVAL_SCHEMA: &str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12,"logicalType":"duration"}}]}"#;
const INTERVAL_SCHEMA_ENCODE: &str = r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
const ENUM_SCHEMA: &str = r#"{"type":"record","name":"EnumRec","fields":[{"name":"field1","type":{"type":"enum","name":"MyEnum","symbols":["A","B","C"]}}]}"#;
const MIX_SCHEMA: &str = r#"{"type":"record","name":"MixRec","fields":[{"name":"f1","type":"int"},{"name":"f2","type":"long"},{"name":"f3","type":"string"},{"name":"f4","type":"double"}]}"#;
const NEST_SCHEMA: &str = r#"{"type":"record","name":"NestRec","fields":[{"name":"sub","type":{"type":"record","name":"Sub","fields":[{"name":"x","type":"int"},{"name":"y","type":"string"}]}}]}"#;
Expand All @@ -336,7 +373,13 @@ macro_rules! dataset {
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
let schema =
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
SIZES.iter().map(|&n| $gen_fn(&schema, n)).collect()
let arrow_schema = AvroSchema::new($schema_json.to_string());
let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed");
let prefix = make_prefix(fingerprint);
SIZES
.iter()
.map(|&n| $gen_fn(&schema, n, &prefix))
.collect()
});
};
}
Expand Down Expand Up @@ -406,6 +449,14 @@ fn bench_scenario(

fn criterion_benches(c: &mut Criterion) {
for &batch_size in &[SMALL_BATCH, LARGE_BATCH] {
bench_scenario(
c,
"Interval",
INTERVAL_SCHEMA,
&INTERVAL_DATA,
false,
batch_size,
);
bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size);
bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size);
bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, batch_size);
Expand Down Expand Up @@ -480,14 +531,6 @@ fn criterion_benches(c: &mut Criterion) {
false,
batch_size,
);
bench_scenario(
c,
"Interval",
INTERVAL_SCHEMA,
&INTERVAL_DATA,
false,
batch_size,
);
bench_scenario(
c,
"Enum(Dictionary)",
Expand Down
Loading
Loading