From b5f9bf9ec36059dad4da80b6987449427ce1ac35 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 12 Mar 2024 15:32:43 -0700 Subject: [PATCH 1/7] feat: Implement the conversion from ArrowSchema to iceberg Schema --- crates/iceberg/src/spec/arrow.rs | 553 +++++++++++++++++++++++++++++++ crates/iceberg/src/spec/mod.rs | 1 + 2 files changed, 554 insertions(+) create mode 100644 crates/iceberg/src/spec/arrow.rs diff --git a/crates/iceberg/src/spec/arrow.rs b/crates/iceberg/src/spec/arrow.rs new file mode 100644 index 0000000000..68c46ab60c --- /dev/null +++ b/crates/iceberg/src/spec/arrow.rs @@ -0,0 +1,553 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, +}; +use crate::{Error, ErrorKind}; +use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; +use std::sync::Arc; + +/// A post order arrow schema visitor. +/// +/// For order of methods called, please refer to [`visit_schema`]. +pub trait ArrowSchemaVisitor { + /// Return type of this visitor. + type T; + type U; + + /// Called before struct/list/map field. + fn before_field(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after struct/list/map field. + fn after_field(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after schema's type visited. + fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result; + + /// Called after struct's fields visited. + fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result; + + /// Called after list fields visited. + fn list(&mut self, list: &DataType, value: Self::T) -> Result; + + /// Called after map's key and value fields visited. + fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result; + + /// Called when see a primitive type. + fn primitive(&mut self, p: &DataType) -> Result; +} + +/// Visiting a type in post order. +fn visit_type(r#type: &DataType, visitor: &mut V) -> Result { + match r#type { + p if p.is_primitive() + || matches!( + p, + DataType::Boolean + | DataType::Utf8 + | DataType::Binary + | DataType::FixedSizeBinary(_) + ) => + { + visitor.primitive(p) + } + DataType::List(element_field) => { + visitor.before_field(element_field)?; + let value = visit_type(element_field.data_type(), visitor)?; + visitor.after_field(element_field)?; + visitor.list(r#type, value) + } + DataType::Map(field, _) => match field.data_type() { + DataType::Struct(fields) => { + if fields.len() != 2 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have exactly 2 fields", + )); + } + + let key_field = &fields[0]; + let value_field = &fields[1]; + + let key_result = { + visitor.before_field(key_field)?; + let ret = visit_type(key_field.data_type(), visitor)?; + visitor.after_field(key_field)?; + ret + }; + + let value_result = { + visitor.before_field(value_field)?; + let ret = visit_type(value_field.data_type(), visitor)?; + visitor.after_field(value_field)?; + ret + }; + + visitor.map(r#type, key_result, value_result) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have struct type", + )), + }, + DataType::Struct(fields) => visit_struct(fields, visitor), + other => Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot visit Arrow data type: {other}"), + )), + } +} + +/// Visit struct type in post order. +#[allow(dead_code)] +fn visit_struct(fields: &Fields, visitor: &mut V) -> Result { + let mut results = Vec::with_capacity(fields.len()); + for field in fields { + visitor.before_field(field)?; + let result = visit_type(field.data_type(), visitor)?; + visitor.after_field(field)?; + results.push(result); + } + + visitor.r#struct(fields, results) +} + +/// Visit schema in post order. +#[allow(dead_code)] +pub fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> Result { + let mut results = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + visitor.before_field(field)?; + let result = visit_type(field.data_type(), visitor)?; + visitor.after_field(field)?; + results.push(result); + } + visitor.schema(schema, results) +} + +const ARROW_FIELD_ID_KEYS: [&str; 2] = ["PARQUET:field_id", "field_id"]; +const ARROW_FIELD_DOC_KEYS: [&str; 3] = ["PARQUET:field_doc", "field_doc", "doc"]; + +fn get_field_id(field: &Field) -> Option { + for key in ARROW_FIELD_ID_KEYS { + if let Some(value) = field.metadata().get(key) { + return value.parse::().ok(); + } + } + None +} + +fn get_field_doc(field: &Field) -> Option { + for key in ARROW_FIELD_DOC_KEYS { + if let Some(value) = field.metadata().get(key) { + return Some(value.clone()); + } + } + None +} + +struct ArrowSchemaConverter {} + +impl ArrowSchemaConverter { + #[allow(dead_code)] + fn new() -> Self { + Self {} + } + + fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result> { + let mut results = Vec::with_capacity(fields.len()); + for i in 0..fields.len() { + let field = &fields[i]; + let field_type = &field_results[i]; + let id = get_field_id(field).ok_or(Error::new( + ErrorKind::DataInvalid, + "Field id not found in metadata", + ))?; + let doc = get_field_doc(field); + let nested_field = NestedField { + id, + doc, + name: field.name().clone(), + required: !field.is_nullable(), + field_type: Box::new(field_type.clone()), + initial_default: None, + write_default: None, + }; + results.push(Arc::new(nested_field)); + } + Ok(results) + } +} + +impl ArrowSchemaVisitor for ArrowSchemaConverter { + type T = Type; + type U = Schema; + + fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result { + let fields = Self::convert_fields(schema.fields(), &values)?; + let builder = Schema::builder().with_fields(fields); + builder.build() + } + + fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result { + let fields = Self::convert_fields(fields, &results)?; + Ok(Type::Struct(StructType::new(fields))) + } + + fn list(&mut self, list: &DataType, value: Self::T) -> Result { + match list { + DataType::List(element_field) => { + let id = get_field_id(element_field).ok_or(Error::new( + ErrorKind::DataInvalid, + "Field id not found in metadata", + ))?; + let doc = get_field_doc(element_field); + let element_field = Arc::new(NestedField { + id, + doc, + name: element_field.name().clone(), + required: !element_field.is_nullable(), + field_type: Box::new(value.clone()), + initial_default: None, + write_default: None, + }); + Ok(Type::List(ListType { element_field })) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "List type must have list data type", + )), + } + } + + fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result { + match map { + DataType::Map(field, _) => match field.data_type() { + DataType::Struct(fields) => { + if fields.len() != 2 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have exactly 2 fields", + )); + } + + let key_field = &fields[0]; + let value_field = &fields[1]; + + let key_id = get_field_id(key_field).ok_or(Error::new( + ErrorKind::DataInvalid, + "Field id not found in metadata", + ))?; + let key_doc = get_field_doc(key_field); + let key_field = Arc::new(NestedField { + id: key_id, + doc: key_doc, + name: key_field.name().clone(), + required: !key_field.is_nullable(), + field_type: Box::new(key_value.clone()), + initial_default: None, + write_default: None, + }); + + let value_id = get_field_id(value_field).ok_or(Error::new( + ErrorKind::DataInvalid, + "Field id not found in metadata", + ))?; + let value_doc = get_field_doc(value_field); + let value_field = Arc::new(NestedField { + id: value_id, + doc: value_doc, + name: value_field.name().clone(), + required: !value_field.is_nullable(), + field_type: Box::new(value.clone()), + initial_default: None, + write_default: None, + }); + + Ok(Type::Map(MapType { + key_field, + value_field, + })) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have struct type", + )), + }, + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map type must have map data type", + )), + } + } + + fn primitive(&mut self, p: &DataType) -> Result { + match p { + DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)), + DataType::Int32 => Ok(Type::Primitive(PrimitiveType::Int)), + DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), + DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), + DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), + DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), + DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Time)) + } + DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Timestamp)) + } + DataType::Timestamp(unit, Some(zone)) + if unit == &TimeUnit::Microsecond + && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => + { + Ok(Type::Primitive(PrimitiveType::Timestamptz)) + } + DataType::Binary => Ok(Type::Primitive(PrimitiveType::Binary)), + DataType::FixedSizeBinary(width) => { + Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) + } + DataType::Utf8 => Ok(Type::Primitive(PrimitiveType::String)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported Arrow data type: {p}"), + )), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Schema as ArrowSchema; + use arrow_schema::TimeUnit; + use std::collections::HashMap; + use std::sync::Arc; + + #[test] + fn test_arrow_primitive() { + let schema = ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Utf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "0".to_string(), + )])), + Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "1".to_string())]), + ), + ]); + let schema = Arc::new(schema); + let mut visitor = ArrowSchemaConverter::new(); + let result = visit_schema(&schema, &mut visitor).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 3); + + assert_eq!(schema_struct.fields()[0].name, "a"); + assert_eq!(schema_struct.fields()[0].id, 2); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::Primitive(PrimitiveType::Int)) + ); + + assert_eq!(schema_struct.fields()[1].name, "b"); + assert_eq!(schema_struct.fields()[1].id, 0); + assert_eq!( + schema_struct.fields()[1].field_type, + Box::new(Type::Primitive(PrimitiveType::String)) + ); + + assert_eq!(schema_struct.fields()[2].name, "c"); + assert_eq!(schema_struct.fields()[2].id, 1); + assert_eq!( + schema_struct.fields()[2].field_type, + Box::new(Type::Primitive(PrimitiveType::Timestamp)) + ); + } + + #[test] + fn test_arrow_list() { + let schema = ArrowSchema::new(vec![Field::new( + "a", + DataType::List(Arc::new( + Field::new("item", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "0".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "1".to_string(), + )]))]); + let schema = Arc::new(schema); + let mut visitor = ArrowSchemaConverter::new(); + let result = visit_schema(&schema, &mut visitor).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 1); + + assert_eq!(schema_struct.fields()[0].name, "a"); + assert_eq!(schema_struct.fields()[0].id, 1); + assert!(!schema_struct.fields()[0].required); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::List(ListType { + element_field: Arc::new(NestedField { + id: 0, + doc: None, + name: "item".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + initial_default: None, + write_default: None, + }) + })) + ); + } + + #[test] + fn test_arrow_map() { + let fields = Fields::from(vec![ + Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "2".to_string(), + )])), + Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "0".to_string(), + )])), + ]); + + let r#struct = DataType::Struct(fields); + let map = DataType::Map( + Arc::new( + Field::new("entries", r#struct, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "1".to_string(), + )])), + ), + false, + ); + + let schema = ArrowSchema::new(vec![Field::new("m", map, false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "4".to_string())]), + )]); + let schema = Arc::new(schema); + let mut visitor = ArrowSchemaConverter::new(); + let result = visit_schema(&schema, &mut visitor).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 1); + + assert_eq!(schema_struct.fields()[0].name, "m"); + assert_eq!(schema_struct.fields()[0].id, 4); + assert!(schema_struct.fields()[0].required); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::Map(MapType { + key_field: Arc::new(NestedField { + id: 2, + doc: None, + name: "key".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + initial_default: None, + write_default: None, + }), + value_field: Arc::new(NestedField { + id: 0, + doc: None, + name: "value".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + initial_default: None, + write_default: None, + }) + })) + ); + } + + #[test] + fn test_arrow_struct() { + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEYS[0].to_string(), + "0".to_string(), + )])), + Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "1".to_string())]), + ), + ]); + + let r#struct = DataType::Struct(fields); + let schema = ArrowSchema::new(vec![Field::new("s", r#struct, false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "2".to_string())]), + )]); + let schema = Arc::new(schema); + let mut visitor = ArrowSchemaConverter::new(); + let result = visit_schema(&schema, &mut visitor).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 1); + + assert_eq!(schema_struct.fields()[0].name, "s"); + assert_eq!(schema_struct.fields()[0].id, 2); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::Struct(StructType::new(vec![ + Arc::new(NestedField { + id: 2, + doc: None, + name: "a".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + initial_default: None, + write_default: None, + }), + Arc::new(NestedField { + id: 0, + doc: None, + name: "b".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + initial_default: None, + write_default: None, + }), + Arc::new(NestedField { + id: 1, + doc: None, + name: "c".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Timestamp)), + initial_default: None, + write_default: None, + }), + ]))) + ); + } +} diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 199fc4a160..088c08e8c8 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -17,6 +17,7 @@ //! Spec for Iceberg. +mod arrow; mod datatypes; mod manifest; mod manifest_list; From 73e8c7893cc8f63a8bd8e62ba7f8695dc483b5c2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 13 Mar 2024 19:02:22 -0700 Subject: [PATCH 2/7] For review --- crates/iceberg/src/arrow.rs | 565 +++++++++++++++++++++++++++++++ crates/iceberg/src/spec/arrow.rs | 553 ------------------------------ crates/iceberg/src/spec/mod.rs | 1 - 3 files changed, 565 insertions(+), 554 deletions(-) delete mode 100644 crates/iceberg/src/spec/arrow.rs diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index 6de9b6ad2e..373662d741 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -25,6 +25,14 @@ use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::SchemaRef; +use crate::error::Result; +use crate::spec::{ + ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, +}; +use crate::{Error, ErrorKind}; +use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; +use std::sync::Arc; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -106,3 +114,560 @@ impl ArrowReader { ProjectionMask::all() } } + +/// A post order arrow schema visitor. +/// +/// For order of methods called, please refer to [`visit_schema`]. +pub trait ArrowSchemaVisitor { + /// Return type of this visitor on arrow field. + type T; + + /// Return type of this visitor on arrow schema. + type U; + + /// Called before struct/list/map field. + fn before_field(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after struct/list/map field. + fn after_field(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called before list element. + fn before_list_element(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after list element. + fn after_list_element(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called before map key. + fn before_map_key(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after map key. + fn after_map_key(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called before map value. + fn before_map_value(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after map value. + fn after_map_value(&mut self, _field: &Field) -> Result<()> { + Ok(()) + } + + /// Called after schema's type visited. + fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result; + + /// Called after struct's fields visited. + fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result; + + /// Called after list fields visited. + fn list(&mut self, list: &DataType, value: Self::T) -> Result; + + /// Called after map's key and value fields visited. + fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result; + + /// Called when see a primitive type. + fn primitive(&mut self, p: &DataType) -> Result; +} + +/// Visiting a type in post order. +fn visit_type(r#type: &DataType, visitor: &mut V) -> Result { + match r#type { + p if p.is_primitive() + || matches!( + p, + DataType::Boolean + | DataType::Utf8 + | DataType::Binary + | DataType::FixedSizeBinary(_) + ) => + { + visitor.primitive(p) + } + DataType::List(element_field) => { + visitor.before_list_element(element_field)?; + let value = visit_type(element_field.data_type(), visitor)?; + visitor.after_list_element(element_field)?; + visitor.list(r#type, value) + } + DataType::Map(field, _) => match field.data_type() { + DataType::Struct(fields) => { + if fields.len() != 2 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have exactly 2 fields", + )); + } + + let key_field = &fields[0]; + let value_field = &fields[1]; + + let key_result = { + visitor.before_map_key(key_field)?; + let ret = visit_type(key_field.data_type(), visitor)?; + visitor.after_map_key(key_field)?; + ret + }; + + let value_result = { + visitor.before_map_value(value_field)?; + let ret = visit_type(value_field.data_type(), visitor)?; + visitor.after_map_value(value_field)?; + ret + }; + + visitor.map(r#type, key_result, value_result) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have struct type", + )), + }, + DataType::Struct(fields) => visit_struct(fields, visitor), + other => Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot visit Arrow data type: {other}"), + )), + } +} + +/// Visit struct type in post order. +#[allow(dead_code)] +fn visit_struct(fields: &Fields, visitor: &mut V) -> Result { + let mut results = Vec::with_capacity(fields.len()); + for field in fields { + visitor.before_field(field)?; + let result = visit_type(field.data_type(), visitor)?; + visitor.after_field(field)?; + results.push(result); + } + + visitor.r#struct(fields, results) +} + +/// Visit schema in post order. +#[allow(dead_code)] +fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> Result { + let mut results = Vec::with_capacity(schema.fields().len()); + for field in schema.fields() { + visitor.before_field(field)?; + let result = visit_type(field.data_type(), visitor)?; + visitor.after_field(field)?; + results.push(result); + } + visitor.schema(schema, results) +} + +/// Convert Arrow schema to ceberg schema. +#[allow(dead_code)] +pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result { + let mut visitor = ArrowSchemaConverter::new(); + visit_schema(schema, &mut visitor) +} + +const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id"; +const ARROW_FIELD_DOC_KEY: &str = "doc"; + +fn get_field_id(field: &Field) -> Result { + if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) { + return value.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to parse field id: {e}"), + ) + }); + } + Err(Error::new( + ErrorKind::DataInvalid, + "Field id not found in metadata", + )) +} + +fn get_field_doc(field: &Field) -> Option { + if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) { + return Some(value.clone()); + } + None +} + +struct ArrowSchemaConverter {} + +impl ArrowSchemaConverter { + #[allow(dead_code)] + fn new() -> Self { + Self {} + } + + fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result> { + let mut results = Vec::with_capacity(fields.len()); + for i in 0..fields.len() { + let field = &fields[i]; + let field_type = &field_results[i]; + let id = get_field_id(field)?; + let doc = get_field_doc(field); + let nested_field = NestedField { + id, + doc, + name: field.name().clone(), + required: !field.is_nullable(), + field_type: Box::new(field_type.clone()), + initial_default: None, + write_default: None, + }; + results.push(Arc::new(nested_field)); + } + Ok(results) + } +} + +impl ArrowSchemaVisitor for ArrowSchemaConverter { + type T = Type; + type U = Schema; + + fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result { + let fields = Self::convert_fields(schema.fields(), &values)?; + let builder = Schema::builder().with_fields(fields); + builder.build() + } + + fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result { + let fields = Self::convert_fields(fields, &results)?; + Ok(Type::Struct(StructType::new(fields))) + } + + fn list(&mut self, list: &DataType, value: Self::T) -> Result { + match list { + DataType::List(element_field) => { + let id = get_field_id(element_field)?; + let doc = get_field_doc(element_field); + let element_field = Arc::new(NestedField { + id, + doc, + name: "element".to_string(), + required: !element_field.is_nullable(), + field_type: Box::new(value.clone()), + initial_default: None, + write_default: None, + }); + Ok(Type::List(ListType { element_field })) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "List type must have list data type", + )), + } + } + + fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result { + match map { + DataType::Map(field, _) => match field.data_type() { + DataType::Struct(fields) => { + if fields.len() != 2 { + return Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have exactly 2 fields", + )); + } + + let key_field = &fields[0]; + let value_field = &fields[1]; + + let key_id = get_field_id(key_field)?; + let key_doc = get_field_doc(key_field); + let key_field = Arc::new(NestedField { + id: key_id, + doc: key_doc, + name: "key".to_string(), + required: !key_field.is_nullable(), + field_type: Box::new(key_value.clone()), + initial_default: None, + write_default: None, + }); + + let value_id = get_field_id(value_field)?; + let value_doc = get_field_doc(value_field); + let value_field = Arc::new(NestedField { + id: value_id, + doc: value_doc, + name: "value".to_string(), + required: !value_field.is_nullable(), + field_type: Box::new(value.clone()), + initial_default: None, + write_default: None, + }); + + Ok(Type::Map(MapType { + key_field, + value_field, + })) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map field must have struct type", + )), + }, + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Map type must have map data type", + )), + } + } + + fn primitive(&mut self, p: &DataType) -> Result { + match p { + DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)), + DataType::Int32 => Ok(Type::Primitive(PrimitiveType::Int)), + DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), + DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), + DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), + DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), + DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Time)) + } + DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => { + Ok(Type::Primitive(PrimitiveType::Timestamp)) + } + DataType::Timestamp(unit, Some(zone)) + if unit == &TimeUnit::Microsecond + && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => + { + Ok(Type::Primitive(PrimitiveType::Timestamptz)) + } + DataType::Binary => Ok(Type::Primitive(PrimitiveType::Binary)), + DataType::FixedSizeBinary(width) => { + Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) + } + DataType::Utf8 => Ok(Type::Primitive(PrimitiveType::String)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported Arrow data type: {p}"), + )), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_schema::DataType; + use arrow_schema::Field; + use arrow_schema::Schema as ArrowSchema; + use arrow_schema::TimeUnit; + use std::collections::HashMap; + use std::sync::Arc; + + #[test] + fn test_arrow_primitive() { + let schema = ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Utf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "0".to_string(), + )])), + Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "1".to_string())]), + ), + ]); + let schema = Arc::new(schema); + let result = arrow_schema_to_schema(&schema).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 3); + + assert_eq!(schema_struct.fields()[0].name, "a"); + assert_eq!(schema_struct.fields()[0].id, 2); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::Primitive(PrimitiveType::Int)) + ); + + assert_eq!(schema_struct.fields()[1].name, "b"); + assert_eq!(schema_struct.fields()[1].id, 0); + assert_eq!( + schema_struct.fields()[1].field_type, + Box::new(Type::Primitive(PrimitiveType::String)) + ); + + assert_eq!(schema_struct.fields()[2].name, "c"); + assert_eq!(schema_struct.fields()[2].id, 1); + assert_eq!( + schema_struct.fields()[2].field_type, + Box::new(Type::Primitive(PrimitiveType::Timestamp)) + ); + } + + #[test] + fn test_arrow_list() { + let schema = ArrowSchema::new(vec![Field::new( + "a", + DataType::List(Arc::new( + Field::new("item", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "0".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "1".to_string(), + )]))]); + let schema = Arc::new(schema); + let mut visitor = ArrowSchemaConverter::new(); + let result = visit_schema(&schema, &mut visitor).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 1); + + assert_eq!(schema_struct.fields()[0].name, "a"); + assert_eq!(schema_struct.fields()[0].id, 1); + assert!(!schema_struct.fields()[0].required); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::List(ListType { + element_field: Arc::new(NestedField { + id: 0, + doc: None, + name: "element".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + initial_default: None, + write_default: None, + }) + })) + ); + } + + #[test] + fn test_arrow_map() { + let fields = Fields::from(vec![ + Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "2".to_string(), + )])), + Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "0".to_string(), + )])), + ]); + + let r#struct = DataType::Struct(fields); + let map = DataType::Map( + Arc::new( + Field::new("entries", r#struct, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "1".to_string(), + )])), + ), + false, + ); + + let schema = ArrowSchema::new(vec![Field::new("m", map, false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "4".to_string())]), + )]); + let schema = Arc::new(schema); + let result = arrow_schema_to_schema(&schema).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 1); + + assert_eq!(schema_struct.fields()[0].name, "m"); + assert_eq!(schema_struct.fields()[0].id, 4); + assert!(schema_struct.fields()[0].required); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::Map(MapType { + key_field: Arc::new(NestedField { + id: 2, + doc: None, + name: "key".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + initial_default: None, + write_default: None, + }), + value_field: Arc::new(NestedField { + id: 0, + doc: None, + name: "value".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + initial_default: None, + write_default: None, + }) + })) + ); + } + + #[test] + fn test_arrow_struct() { + let fields = Fields::from(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "0".to_string(), + )])), + Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "1".to_string())]), + ), + ]); + + let r#struct = DataType::Struct(fields); + let schema = ArrowSchema::new(vec![Field::new("s", r#struct, false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "2".to_string())]), + )]); + let schema = Arc::new(schema); + let result = arrow_schema_to_schema(&schema).unwrap(); + let schema_struct = result.as_struct(); + assert_eq!(schema_struct.fields().len(), 1); + + assert_eq!(schema_struct.fields()[0].name, "s"); + assert_eq!(schema_struct.fields()[0].id, 2); + assert_eq!( + schema_struct.fields()[0].field_type, + Box::new(Type::Struct(StructType::new(vec![ + Arc::new(NestedField { + id: 2, + doc: None, + name: "a".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Int)), + initial_default: None, + write_default: None, + }), + Arc::new(NestedField { + id: 0, + doc: None, + name: "b".to_string(), + required: false, + field_type: Box::new(Type::Primitive(PrimitiveType::String)), + initial_default: None, + write_default: None, + }), + Arc::new(NestedField { + id: 1, + doc: None, + name: "c".to_string(), + required: true, + field_type: Box::new(Type::Primitive(PrimitiveType::Timestamp)), + initial_default: None, + write_default: None, + }), + ]))) + ); + } +} diff --git a/crates/iceberg/src/spec/arrow.rs b/crates/iceberg/src/spec/arrow.rs deleted file mode 100644 index 68c46ab60c..0000000000 --- a/crates/iceberg/src/spec/arrow.rs +++ /dev/null @@ -1,553 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::error::Result; -use crate::spec::{ - ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, StructType, Type, -}; -use crate::{Error, ErrorKind}; -use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; -use std::sync::Arc; - -/// A post order arrow schema visitor. -/// -/// For order of methods called, please refer to [`visit_schema`]. -pub trait ArrowSchemaVisitor { - /// Return type of this visitor. - type T; - type U; - - /// Called before struct/list/map field. - fn before_field(&mut self, _field: &Field) -> Result<()> { - Ok(()) - } - - /// Called after struct/list/map field. - fn after_field(&mut self, _field: &Field) -> Result<()> { - Ok(()) - } - - /// Called after schema's type visited. - fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result; - - /// Called after struct's fields visited. - fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result; - - /// Called after list fields visited. - fn list(&mut self, list: &DataType, value: Self::T) -> Result; - - /// Called after map's key and value fields visited. - fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result; - - /// Called when see a primitive type. - fn primitive(&mut self, p: &DataType) -> Result; -} - -/// Visiting a type in post order. -fn visit_type(r#type: &DataType, visitor: &mut V) -> Result { - match r#type { - p if p.is_primitive() - || matches!( - p, - DataType::Boolean - | DataType::Utf8 - | DataType::Binary - | DataType::FixedSizeBinary(_) - ) => - { - visitor.primitive(p) - } - DataType::List(element_field) => { - visitor.before_field(element_field)?; - let value = visit_type(element_field.data_type(), visitor)?; - visitor.after_field(element_field)?; - visitor.list(r#type, value) - } - DataType::Map(field, _) => match field.data_type() { - DataType::Struct(fields) => { - if fields.len() != 2 { - return Err(Error::new( - ErrorKind::DataInvalid, - "Map field must have exactly 2 fields", - )); - } - - let key_field = &fields[0]; - let value_field = &fields[1]; - - let key_result = { - visitor.before_field(key_field)?; - let ret = visit_type(key_field.data_type(), visitor)?; - visitor.after_field(key_field)?; - ret - }; - - let value_result = { - visitor.before_field(value_field)?; - let ret = visit_type(value_field.data_type(), visitor)?; - visitor.after_field(value_field)?; - ret - }; - - visitor.map(r#type, key_result, value_result) - } - _ => Err(Error::new( - ErrorKind::DataInvalid, - "Map field must have struct type", - )), - }, - DataType::Struct(fields) => visit_struct(fields, visitor), - other => Err(Error::new( - ErrorKind::DataInvalid, - format!("Cannot visit Arrow data type: {other}"), - )), - } -} - -/// Visit struct type in post order. -#[allow(dead_code)] -fn visit_struct(fields: &Fields, visitor: &mut V) -> Result { - let mut results = Vec::with_capacity(fields.len()); - for field in fields { - visitor.before_field(field)?; - let result = visit_type(field.data_type(), visitor)?; - visitor.after_field(field)?; - results.push(result); - } - - visitor.r#struct(fields, results) -} - -/// Visit schema in post order. -#[allow(dead_code)] -pub fn visit_schema(schema: &ArrowSchema, visitor: &mut V) -> Result { - let mut results = Vec::with_capacity(schema.fields().len()); - for field in schema.fields() { - visitor.before_field(field)?; - let result = visit_type(field.data_type(), visitor)?; - visitor.after_field(field)?; - results.push(result); - } - visitor.schema(schema, results) -} - -const ARROW_FIELD_ID_KEYS: [&str; 2] = ["PARQUET:field_id", "field_id"]; -const ARROW_FIELD_DOC_KEYS: [&str; 3] = ["PARQUET:field_doc", "field_doc", "doc"]; - -fn get_field_id(field: &Field) -> Option { - for key in ARROW_FIELD_ID_KEYS { - if let Some(value) = field.metadata().get(key) { - return value.parse::().ok(); - } - } - None -} - -fn get_field_doc(field: &Field) -> Option { - for key in ARROW_FIELD_DOC_KEYS { - if let Some(value) = field.metadata().get(key) { - return Some(value.clone()); - } - } - None -} - -struct ArrowSchemaConverter {} - -impl ArrowSchemaConverter { - #[allow(dead_code)] - fn new() -> Self { - Self {} - } - - fn convert_fields(fields: &Fields, field_results: &[Type]) -> Result> { - let mut results = Vec::with_capacity(fields.len()); - for i in 0..fields.len() { - let field = &fields[i]; - let field_type = &field_results[i]; - let id = get_field_id(field).ok_or(Error::new( - ErrorKind::DataInvalid, - "Field id not found in metadata", - ))?; - let doc = get_field_doc(field); - let nested_field = NestedField { - id, - doc, - name: field.name().clone(), - required: !field.is_nullable(), - field_type: Box::new(field_type.clone()), - initial_default: None, - write_default: None, - }; - results.push(Arc::new(nested_field)); - } - Ok(results) - } -} - -impl ArrowSchemaVisitor for ArrowSchemaConverter { - type T = Type; - type U = Schema; - - fn schema(&mut self, schema: &ArrowSchema, values: Vec) -> Result { - let fields = Self::convert_fields(schema.fields(), &values)?; - let builder = Schema::builder().with_fields(fields); - builder.build() - } - - fn r#struct(&mut self, fields: &Fields, results: Vec) -> Result { - let fields = Self::convert_fields(fields, &results)?; - Ok(Type::Struct(StructType::new(fields))) - } - - fn list(&mut self, list: &DataType, value: Self::T) -> Result { - match list { - DataType::List(element_field) => { - let id = get_field_id(element_field).ok_or(Error::new( - ErrorKind::DataInvalid, - "Field id not found in metadata", - ))?; - let doc = get_field_doc(element_field); - let element_field = Arc::new(NestedField { - id, - doc, - name: element_field.name().clone(), - required: !element_field.is_nullable(), - field_type: Box::new(value.clone()), - initial_default: None, - write_default: None, - }); - Ok(Type::List(ListType { element_field })) - } - _ => Err(Error::new( - ErrorKind::DataInvalid, - "List type must have list data type", - )), - } - } - - fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result { - match map { - DataType::Map(field, _) => match field.data_type() { - DataType::Struct(fields) => { - if fields.len() != 2 { - return Err(Error::new( - ErrorKind::DataInvalid, - "Map field must have exactly 2 fields", - )); - } - - let key_field = &fields[0]; - let value_field = &fields[1]; - - let key_id = get_field_id(key_field).ok_or(Error::new( - ErrorKind::DataInvalid, - "Field id not found in metadata", - ))?; - let key_doc = get_field_doc(key_field); - let key_field = Arc::new(NestedField { - id: key_id, - doc: key_doc, - name: key_field.name().clone(), - required: !key_field.is_nullable(), - field_type: Box::new(key_value.clone()), - initial_default: None, - write_default: None, - }); - - let value_id = get_field_id(value_field).ok_or(Error::new( - ErrorKind::DataInvalid, - "Field id not found in metadata", - ))?; - let value_doc = get_field_doc(value_field); - let value_field = Arc::new(NestedField { - id: value_id, - doc: value_doc, - name: value_field.name().clone(), - required: !value_field.is_nullable(), - field_type: Box::new(value.clone()), - initial_default: None, - write_default: None, - }); - - Ok(Type::Map(MapType { - key_field, - value_field, - })) - } - _ => Err(Error::new( - ErrorKind::DataInvalid, - "Map field must have struct type", - )), - }, - _ => Err(Error::new( - ErrorKind::DataInvalid, - "Map type must have map data type", - )), - } - } - - fn primitive(&mut self, p: &DataType) -> Result { - match p { - DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)), - DataType::Int32 => Ok(Type::Primitive(PrimitiveType::Int)), - DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), - DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), - DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), - DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), - DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { - Ok(Type::Primitive(PrimitiveType::Time)) - } - DataType::Timestamp(unit, None) if unit == &TimeUnit::Microsecond => { - Ok(Type::Primitive(PrimitiveType::Timestamp)) - } - DataType::Timestamp(unit, Some(zone)) - if unit == &TimeUnit::Microsecond - && (zone.as_ref() == "UTC" || zone.as_ref() == "+00:00") => - { - Ok(Type::Primitive(PrimitiveType::Timestamptz)) - } - DataType::Binary => Ok(Type::Primitive(PrimitiveType::Binary)), - DataType::FixedSizeBinary(width) => { - Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) - } - DataType::Utf8 => Ok(Type::Primitive(PrimitiveType::String)), - _ => Err(Error::new( - ErrorKind::DataInvalid, - format!("Unsupported Arrow data type: {p}"), - )), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow_schema::DataType; - use arrow_schema::Field; - use arrow_schema::Schema as ArrowSchema; - use arrow_schema::TimeUnit; - use std::collections::HashMap; - use std::sync::Arc; - - #[test] - fn test_arrow_primitive() { - let schema = ArrowSchema::new(vec![ - Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "2".to_string(), - )])), - Field::new("b", DataType::Utf8, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "0".to_string(), - )])), - Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "1".to_string())]), - ), - ]); - let schema = Arc::new(schema); - let mut visitor = ArrowSchemaConverter::new(); - let result = visit_schema(&schema, &mut visitor).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 3); - - assert_eq!(schema_struct.fields()[0].name, "a"); - assert_eq!(schema_struct.fields()[0].id, 2); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::Primitive(PrimitiveType::Int)) - ); - - assert_eq!(schema_struct.fields()[1].name, "b"); - assert_eq!(schema_struct.fields()[1].id, 0); - assert_eq!( - schema_struct.fields()[1].field_type, - Box::new(Type::Primitive(PrimitiveType::String)) - ); - - assert_eq!(schema_struct.fields()[2].name, "c"); - assert_eq!(schema_struct.fields()[2].id, 1); - assert_eq!( - schema_struct.fields()[2].field_type, - Box::new(Type::Primitive(PrimitiveType::Timestamp)) - ); - } - - #[test] - fn test_arrow_list() { - let schema = ArrowSchema::new(vec![Field::new( - "a", - DataType::List(Arc::new( - Field::new("item", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "0".to_string(), - )])), - )), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "1".to_string(), - )]))]); - let schema = Arc::new(schema); - let mut visitor = ArrowSchemaConverter::new(); - let result = visit_schema(&schema, &mut visitor).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 1); - - assert_eq!(schema_struct.fields()[0].name, "a"); - assert_eq!(schema_struct.fields()[0].id, 1); - assert!(!schema_struct.fields()[0].required); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::List(ListType { - element_field: Arc::new(NestedField { - id: 0, - doc: None, - name: "item".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - initial_default: None, - write_default: None, - }) - })) - ); - } - - #[test] - fn test_arrow_map() { - let fields = Fields::from(vec![ - Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "2".to_string(), - )])), - Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "0".to_string(), - )])), - ]); - - let r#struct = DataType::Struct(fields); - let map = DataType::Map( - Arc::new( - Field::new("entries", r#struct, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "1".to_string(), - )])), - ), - false, - ); - - let schema = ArrowSchema::new(vec![Field::new("m", map, false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "4".to_string())]), - )]); - let schema = Arc::new(schema); - let mut visitor = ArrowSchemaConverter::new(); - let result = visit_schema(&schema, &mut visitor).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 1); - - assert_eq!(schema_struct.fields()[0].name, "m"); - assert_eq!(schema_struct.fields()[0].id, 4); - assert!(schema_struct.fields()[0].required); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::Map(MapType { - key_field: Arc::new(NestedField { - id: 2, - doc: None, - name: "key".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - initial_default: None, - write_default: None, - }), - value_field: Arc::new(NestedField { - id: 0, - doc: None, - name: "value".to_string(), - required: false, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - initial_default: None, - write_default: None, - }) - })) - ); - } - - #[test] - fn test_arrow_struct() { - let fields = Fields::from(vec![ - Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "2".to_string(), - )])), - Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEYS[0].to_string(), - "0".to_string(), - )])), - Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "1".to_string())]), - ), - ]); - - let r#struct = DataType::Struct(fields); - let schema = ArrowSchema::new(vec![Field::new("s", r#struct, false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEYS[0].to_string(), "2".to_string())]), - )]); - let schema = Arc::new(schema); - let mut visitor = ArrowSchemaConverter::new(); - let result = visit_schema(&schema, &mut visitor).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 1); - - assert_eq!(schema_struct.fields()[0].name, "s"); - assert_eq!(schema_struct.fields()[0].id, 2); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::Struct(StructType::new(vec![ - Arc::new(NestedField { - id: 2, - doc: None, - name: "a".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - initial_default: None, - write_default: None, - }), - Arc::new(NestedField { - id: 0, - doc: None, - name: "b".to_string(), - required: false, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - initial_default: None, - write_default: None, - }), - Arc::new(NestedField { - id: 1, - doc: None, - name: "c".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Timestamp)), - initial_default: None, - write_default: None, - }), - ]))) - ); - } -} diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 088c08e8c8..199fc4a160 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -17,7 +17,6 @@ //! Spec for Iceberg. -mod arrow; mod datatypes; mod manifest; mod manifest_list; From b4a1b7137f1ade416f0b8bd82a2dfc5655a6fdaf Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 13 Mar 2024 21:38:08 -0700 Subject: [PATCH 3/7] Update test --- crates/iceberg/src/arrow.rs | 413 +++++++++++++++++++++--------------- 1 file changed, 242 insertions(+), 171 deletions(-) diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index 373662d741..49ae1a1cb8 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -468,98 +468,15 @@ mod tests { use std::sync::Arc; #[test] - fn test_arrow_primitive() { - let schema = ArrowSchema::new(vec![ - Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "2".to_string(), - )])), - Field::new("b", DataType::Utf8, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "0".to_string(), - )])), - Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "1".to_string())]), - ), - ]); - let schema = Arc::new(schema); - let result = arrow_schema_to_schema(&schema).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 3); - - assert_eq!(schema_struct.fields()[0].name, "a"); - assert_eq!(schema_struct.fields()[0].id, 2); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::Primitive(PrimitiveType::Int)) - ); - - assert_eq!(schema_struct.fields()[1].name, "b"); - assert_eq!(schema_struct.fields()[1].id, 0); - assert_eq!( - schema_struct.fields()[1].field_type, - Box::new(Type::Primitive(PrimitiveType::String)) - ); - - assert_eq!(schema_struct.fields()[2].name, "c"); - assert_eq!(schema_struct.fields()[2].id, 1); - assert_eq!( - schema_struct.fields()[2].field_type, - Box::new(Type::Primitive(PrimitiveType::Timestamp)) - ); - } - - #[test] - fn test_arrow_list() { - let schema = ArrowSchema::new(vec![Field::new( - "a", - DataType::List(Arc::new( - Field::new("item", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "0".to_string(), - )])), - )), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "1".to_string(), - )]))]); - let schema = Arc::new(schema); - let mut visitor = ArrowSchemaConverter::new(); - let result = visit_schema(&schema, &mut visitor).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 1); - - assert_eq!(schema_struct.fields()[0].name, "a"); - assert_eq!(schema_struct.fields()[0].id, 1); - assert!(!schema_struct.fields()[0].required); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::List(ListType { - element_field: Arc::new(NestedField { - id: 0, - doc: None, - name: "element".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - initial_default: None, - write_default: None, - }) - })) - ); - } - - #[test] - fn test_arrow_map() { + fn test_arrow_schema_to_schema() { let fields = Fields::from(vec![ Field::new("key", DataType::Int32, false).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), - "2".to_string(), + "17".to_string(), )])), Field::new("value", DataType::Utf8, true).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), - "0".to_string(), + "18".to_string(), )])), ]); @@ -568,106 +485,260 @@ mod tests { Arc::new( Field::new("entries", r#struct, false).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), - "1".to_string(), + "19".to_string(), )])), ), false, ); - let schema = ArrowSchema::new(vec![Field::new("m", map, false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "4".to_string())]), - )]); - let schema = Arc::new(schema); - let result = arrow_schema_to_schema(&schema).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 1); - - assert_eq!(schema_struct.fields()[0].name, "m"); - assert_eq!(schema_struct.fields()[0].id, 4); - assert!(schema_struct.fields()[0].required); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::Map(MapType { - key_field: Arc::new(NestedField { - id: 2, - doc: None, - name: "key".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - initial_default: None, - write_default: None, - }), - value_field: Arc::new(NestedField { - id: 0, - doc: None, - name: "value".to_string(), - required: false, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - initial_default: None, - write_default: None, - }) - })) - ); - } - - #[test] - fn test_arrow_struct() { let fields = Fields::from(vec![ + Field::new("aa", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "18".to_string(), + )])), + Field::new("bb", DataType::Utf8, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "19".to_string(), + )])), + Field::new( + "cc", + DataType::Timestamp(TimeUnit::Microsecond, None), + false, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "20".to_string(), + )])), + ]); + + let r#struct = DataType::Struct(fields); + + let schema = ArrowSchema::new(vec![ Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), "2".to_string(), )])), - Field::new("b", DataType::Utf8, true).with_metadata(HashMap::from([( + Field::new("b", DataType::Int64, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "1".to_string(), + )])), + Field::new("c", DataType::Utf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "3".to_string(), + )])), + Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "4".to_string())]), + ), + Field::new("e", DataType::Boolean, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "6".to_string(), + )])), + Field::new("f", DataType::Float32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "5".to_string(), + )])), + Field::new("g", DataType::Float64, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "7".to_string(), + )])), + Field::new("h", DataType::Date32, false).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), - "0".to_string(), + "8".to_string(), )])), - Field::new("c", DataType::Timestamp(TimeUnit::Microsecond, None), false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "1".to_string())]), + Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "9".to_string())]), ), + Field::new( + "j", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + false, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "10".to_string(), + )])), + Field::new( + "k", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "12".to_string(), + )])), + Field::new("l", DataType::Binary, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "13".to_string(), + )])), + Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "11".to_string(), + )])), + Field::new( + "list", + DataType::List(Arc::new( + Field::new("element", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "15".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "14".to_string(), + )])), + Field::new("map", map, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "16".to_string(), + )])), + Field::new("struct", r#struct, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "17".to_string(), + )])), ]); - - let r#struct = DataType::Struct(fields); - let schema = ArrowSchema::new(vec![Field::new("s", r#struct, false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "2".to_string())]), - )]); let schema = Arc::new(schema); let result = arrow_schema_to_schema(&schema).unwrap(); - let schema_struct = result.as_struct(); - assert_eq!(schema_struct.fields().len(), 1); - - assert_eq!(schema_struct.fields()[0].name, "s"); - assert_eq!(schema_struct.fields()[0].id, 2); - assert_eq!( - schema_struct.fields()[0].field_type, - Box::new(Type::Struct(StructType::new(vec![ - Arc::new(NestedField { - id: 2, - doc: None, - name: "a".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Int)), - initial_default: None, - write_default: None, - }), - Arc::new(NestedField { - id: 0, - doc: None, - name: "b".to_string(), - required: false, - field_type: Box::new(Type::Primitive(PrimitiveType::String)), - initial_default: None, - write_default: None, - }), - Arc::new(NestedField { - id: 1, - doc: None, - name: "c".to_string(), - required: true, - field_type: Box::new(Type::Primitive(PrimitiveType::Timestamp)), - initial_default: None, - write_default: None, - }), - ]))) - ); + + let schema_json = r#"{ + "type":"struct", + "schema-id":0, + "fields":[ + { + "id":2, + "name":"a", + "required":true, + "type":"int" + }, + { + "id":1, + "name":"b", + "required":true, + "type":"long" + }, + { + "id":3, + "name":"c", + "required":true, + "type":"string" + }, + { + "id":4, + "name":"d", + "required":false, + "type":"timestamp" + }, + { + "id":6, + "name":"e", + "required":false, + "type":"boolean" + }, + { + "id":5, + "name":"f", + "required":true, + "type":"float" + }, + { + "id":7, + "name":"g", + "required":true, + "type":"double" + }, + { + "id":8, + "name":"h", + "required":true, + "type":"date" + }, + { + "id":9, + "name":"i", + "required":true, + "type":"time" + }, + { + "id":10, + "name":"j", + "required":true, + "type":"timestamptz" + }, + { + "id":12, + "name":"k", + "required":true, + "type":"timestamptz" + }, + { + "id":13, + "name":"l", + "required":true, + "type":"binary" + }, + { + "id":11, + "name":"m", + "required":true, + "type":"fixed[10]" + }, + { + "id":14, + "name":"list", + "required": false, + "type": { + "type": "list", + "element-id": 15, + "element-required": true, + "element": "int" + } + }, + { + "id":16, + "name":"map", + "required": true, + "type": { + "type": "map", + "key-id": 17, + "key": "int", + "value-id": 18, + "value-required": false, + "value": "string" + } + }, + { + "id":17, + "name":"struct", + "required": true, + "type": { + "type": "struct", + "fields": [ + { + "id":18, + "name":"aa", + "required":true, + "type":"int" + }, + { + "id":19, + "name":"bb", + "required":false, + "type":"string" + }, + { + "id":20, + "name":"cc", + "required":true, + "type":"timestamp" + } + ] + } + } + ], + "identifier-field-ids":[] + }"#; + + let expected_type: Schema = serde_json::from_str(schema_json).unwrap(); + assert_eq!(result, expected_type); } } From f61af5c10ceec4e672e0a9da602f64c241e5fdf5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 14 Mar 2024 09:39:37 -0700 Subject: [PATCH 4/7] Add LargeString, LargeBinary, LargeList and FixedSizeList --- crates/iceberg/src/arrow.rs | 142 +++++++++++++++++++++++++++++------- 1 file changed, 115 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index 49ae1a1cb8..581bba4a24 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -189,18 +189,17 @@ fn visit_type(r#type: &DataType, visitor: &mut V) -> Resu p, DataType::Boolean | DataType::Utf8 + | DataType::LargeUtf8 | DataType::Binary + | DataType::LargeBinary | DataType::FixedSizeBinary(_) ) => { visitor.primitive(p) } - DataType::List(element_field) => { - visitor.before_list_element(element_field)?; - let value = visit_type(element_field.data_type(), visitor)?; - visitor.after_list_element(element_field)?; - visitor.list(r#type, value) - } + DataType::List(element_field) => visit_list(r#type, element_field, visitor), + DataType::LargeList(element_field) => visit_list(r#type, element_field, visitor), + DataType::FixedSizeList(element_field, _) => visit_list(r#type, element_field, visitor), DataType::Map(field, _) => match field.data_type() { DataType::Struct(fields) => { if fields.len() != 2 { @@ -242,6 +241,19 @@ fn visit_type(r#type: &DataType, visitor: &mut V) -> Resu } } +/// Visit list types in post order. +#[allow(dead_code)] +fn visit_list( + data_type: &DataType, + element_field: &Field, + visitor: &mut V, +) -> Result { + visitor.before_list_element(element_field)?; + let value = visit_type(element_field.data_type(), visitor)?; + visitor.after_list_element(element_field)?; + visitor.list(data_type, value) +} + /// Visit struct type in post order. #[allow(dead_code)] fn visit_struct(fields: &Fields, visitor: &mut V) -> Result { @@ -347,26 +359,30 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { } fn list(&mut self, list: &DataType, value: Self::T) -> Result { - match list { - DataType::List(element_field) => { - let id = get_field_id(element_field)?; - let doc = get_field_doc(element_field); - let element_field = Arc::new(NestedField { - id, - doc, - name: "element".to_string(), - required: !element_field.is_nullable(), - field_type: Box::new(value.clone()), - initial_default: None, - write_default: None, - }); - Ok(Type::List(ListType { element_field })) + let element_field = match list { + DataType::List(element_field) => element_field, + DataType::LargeList(element_field) => element_field, + DataType::FixedSizeList(element_field, _) => element_field, + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + "List type must have list data type", + )) } - _ => Err(Error::new( - ErrorKind::DataInvalid, - "List type must have list data type", - )), - } + }; + + let id = get_field_id(element_field)?; + let doc = get_field_doc(element_field); + let element_field = Arc::new(NestedField { + id, + doc, + name: "element".to_string(), + required: !element_field.is_nullable(), + field_type: Box::new(value.clone()), + initial_default: None, + write_default: None, + }); + Ok(Type::List(ListType { element_field })) } fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> Result { @@ -444,11 +460,11 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { { Ok(Type::Primitive(PrimitiveType::Timestamptz)) } - DataType::Binary => Ok(Type::Primitive(PrimitiveType::Binary)), + DataType::Binary | DataType::LargeBinary => Ok(Type::Primitive(PrimitiveType::Binary)), DataType::FixedSizeBinary(width) => { Ok(Type::Primitive(PrimitiveType::Fixed(*width as u64))) } - DataType::Utf8 => Ok(Type::Primitive(PrimitiveType::String)), + DataType::Utf8 | DataType::LargeUtf8 => Ok(Type::Primitive(PrimitiveType::String)), _ => Err(Error::new( ErrorKind::DataInvalid, format!("Unsupported Arrow data type: {p}"), @@ -526,6 +542,10 @@ mod tests { ARROW_FIELD_ID_KEY.to_string(), "3".to_string(), )])), + Field::new("n", DataType::LargeUtf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "21".to_string(), + )])), Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true).with_metadata( HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "4".to_string())]), ), @@ -570,6 +590,10 @@ mod tests { ARROW_FIELD_ID_KEY.to_string(), "13".to_string(), )])), + Field::new("o", DataType::LargeBinary, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "22".to_string(), + )])), Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), "11".to_string(), @@ -588,6 +612,36 @@ mod tests { ARROW_FIELD_ID_KEY.to_string(), "14".to_string(), )])), + Field::new( + "large_list", + DataType::LargeList(Arc::new( + Field::new("element", DataType::Utf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "23".to_string(), + )])), + )), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "24".to_string(), + )])), + Field::new( + "fixed_list", + DataType::FixedSizeList( + Arc::new( + Field::new("element", DataType::Binary, false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "26".to_string())]), + ), + ), + 10, + ), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "25".to_string(), + )])), Field::new("map", map, false).with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), "16".to_string(), @@ -622,6 +676,12 @@ mod tests { "required":true, "type":"string" }, + { + "id":21, + "name":"n", + "required":true, + "type":"string" + }, { "id":4, "name":"d", @@ -676,6 +736,12 @@ mod tests { "required":true, "type":"binary" }, + { + "id":22, + "name":"o", + "required":true, + "type":"binary" + }, { "id":11, "name":"m", @@ -693,6 +759,28 @@ mod tests { "element": "int" } }, + { + "id":24, + "name":"large_list", + "required": false, + "type": { + "type": "list", + "element-id": 23, + "element-required": true, + "element": "string" + } + }, + { + "id":25, + "name":"fixed_list", + "required": false, + "type": { + "type": "list", + "element-id": 26, + "element-required": true, + "element": "binary" + } + }, { "id":16, "name":"map", From 1b955b368d32c9702c4f577f888fa1728b51ccf2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 18 Mar 2024 00:46:55 -0700 Subject: [PATCH 5/7] Add decimal type --- crates/iceberg/src/arrow.rs | 247 +++++++++++++++++++----------------- 1 file changed, 130 insertions(+), 117 deletions(-) diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index 581bba4a24..63607aa6a8 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -447,6 +447,10 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), + DataType::Decimal128(p, s) => Ok(Type::Primitive(PrimitiveType::Decimal { + precision: *p as u32, + scale: *s as u32, + })), DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { Ok(Type::Primitive(PrimitiveType::Time)) @@ -529,128 +533,131 @@ mod tests { let r#struct = DataType::Struct(fields); - let schema = ArrowSchema::new(vec![ - Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "2".to_string(), - )])), - Field::new("b", DataType::Int64, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "1".to_string(), - )])), - Field::new("c", DataType::Utf8, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "3".to_string(), - )])), - Field::new("n", DataType::LargeUtf8, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "21".to_string(), - )])), - Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "4".to_string())]), - ), - Field::new("e", DataType::Boolean, true).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "6".to_string(), - )])), - Field::new("f", DataType::Float32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "5".to_string(), - )])), - Field::new("g", DataType::Float64, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "7".to_string(), - )])), - Field::new("h", DataType::Date32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "8".to_string(), - )])), - Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "9".to_string())]), - ), - Field::new( - "j", - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), - false, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "10".to_string(), - )])), - Field::new( - "k", - DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), - false, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "12".to_string(), - )])), - Field::new("l", DataType::Binary, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "13".to_string(), - )])), - Field::new("o", DataType::LargeBinary, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "22".to_string(), - )])), - Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "11".to_string(), - )])), - Field::new( - "list", - DataType::List(Arc::new( - Field::new("element", DataType::Int32, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "15".to_string(), - )])), - )), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "14".to_string(), - )])), - Field::new( - "large_list", - DataType::LargeList(Arc::new( - Field::new("element", DataType::Utf8, false).with_metadata(HashMap::from([( + let schema = + ArrowSchema::new(vec![ + Field::new("a", DataType::Int32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "2".to_string(), + )])), + Field::new("b", DataType::Int64, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "1".to_string(), + )])), + Field::new("c", DataType::Utf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "3".to_string(), + )])), + Field::new("n", DataType::LargeUtf8, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "21".to_string(), + )])), + Field::new("d", DataType::Timestamp(TimeUnit::Microsecond, None), true) + .with_metadata(HashMap::from([( ARROW_FIELD_ID_KEY.to_string(), - "23".to_string(), + "4".to_string(), )])), + Field::new("e", DataType::Boolean, true).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "6".to_string(), + )])), + Field::new("f", DataType::Float32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "5".to_string(), + )])), + Field::new("g", DataType::Float64, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "7".to_string(), + )])), + Field::new("p", DataType::Decimal128(10, 2), false).with_metadata(HashMap::from([ + (ARROW_FIELD_ID_KEY.to_string(), "27".to_string()), + ])), + Field::new("h", DataType::Date32, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "8".to_string(), + )])), + Field::new("i", DataType::Time64(TimeUnit::Microsecond), false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "9".to_string())]), + ), + Field::new( + "j", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + false, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "10".to_string(), + )])), + Field::new( + "k", + DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())), + false, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "12".to_string(), + )])), + Field::new("l", DataType::Binary, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "13".to_string(), + )])), + Field::new("o", DataType::LargeBinary, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "22".to_string(), + )])), + Field::new("m", DataType::FixedSizeBinary(10), false).with_metadata(HashMap::from( + [(ARROW_FIELD_ID_KEY.to_string(), "11".to_string())], )), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "24".to_string(), - )])), - Field::new( - "fixed_list", - DataType::FixedSizeList( - Arc::new( - Field::new("element", DataType::Binary, false).with_metadata( - HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "26".to_string())]), + Field::new( + "list", + DataType::List(Arc::new( + Field::new("element", DataType::Int32, false).with_metadata(HashMap::from( + [(ARROW_FIELD_ID_KEY.to_string(), "15".to_string())], + )), + )), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "14".to_string(), + )])), + Field::new( + "large_list", + DataType::LargeList(Arc::new( + Field::new("element", DataType::Utf8, false).with_metadata(HashMap::from( + [(ARROW_FIELD_ID_KEY.to_string(), "23".to_string())], + )), + )), + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "24".to_string(), + )])), + Field::new( + "fixed_list", + DataType::FixedSizeList( + Arc::new( + Field::new("element", DataType::Binary, false).with_metadata( + HashMap::from([(ARROW_FIELD_ID_KEY.to_string(), "26".to_string())]), + ), ), + 10, ), - 10, - ), - true, - ) - .with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "25".to_string(), - )])), - Field::new("map", map, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "16".to_string(), - )])), - Field::new("struct", r#struct, false).with_metadata(HashMap::from([( - ARROW_FIELD_ID_KEY.to_string(), - "17".to_string(), - )])), - ]); + true, + ) + .with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "25".to_string(), + )])), + Field::new("map", map, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "16".to_string(), + )])), + Field::new("struct", r#struct, false).with_metadata(HashMap::from([( + ARROW_FIELD_ID_KEY.to_string(), + "17".to_string(), + )])), + ]); let schema = Arc::new(schema); let result = arrow_schema_to_schema(&schema).unwrap(); @@ -706,6 +713,12 @@ mod tests { "required":true, "type":"double" }, + { + "id":27, + "name":"p", + "required":true, + "type":"decimal(10,2)" + }, { "id":8, "name":"h", From e7984191647780f9501aa76da6ecf34956b3cf10 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 20 Mar 2024 22:40:13 -0700 Subject: [PATCH 6/7] For review --- crates/iceberg/src/arrow.rs | 67 +++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index 63607aa6a8..e812091e40 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -294,10 +294,9 @@ const ARROW_FIELD_DOC_KEY: &str = "doc"; fn get_field_id(field: &Field) -> Result { if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) { return value.parse::().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!("Failed to parse field id: {e}"), - ) + Error::new(ErrorKind::DataInvalid, format!("Failed to parse field id")) + .with_context("value", value) + .with_source(e) }); } Err(Error::new( @@ -313,7 +312,7 @@ fn get_field_doc(field: &Field) -> Option { None } -struct ArrowSchemaConverter {} +struct ArrowSchemaConverter; impl ArrowSchemaConverter { #[allow(dead_code)] @@ -373,15 +372,12 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { let id = get_field_id(element_field)?; let doc = get_field_doc(element_field); - let element_field = Arc::new(NestedField { - id, - doc, - name: "element".to_string(), - required: !element_field.is_nullable(), - field_type: Box::new(value.clone()), - initial_default: None, - write_default: None, - }); + let mut element_field = + NestedField::list_element(id, value.clone(), !element_field.is_nullable()); + if let Some(doc) = doc { + element_field = element_field.with_doc(doc); + } + let element_field = Arc::new(element_field); Ok(Type::List(ListType { element_field })) } @@ -401,27 +397,23 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { let key_id = get_field_id(key_field)?; let key_doc = get_field_doc(key_field); - let key_field = Arc::new(NestedField { - id: key_id, - doc: key_doc, - name: "key".to_string(), - required: !key_field.is_nullable(), - field_type: Box::new(key_value.clone()), - initial_default: None, - write_default: None, - }); + let mut key_field = NestedField::map_key_element(key_id, key_value.clone()); + if let Some(doc) = key_doc { + key_field = key_field.with_doc(doc); + } + let key_field = Arc::new(key_field); let value_id = get_field_id(value_field)?; let value_doc = get_field_doc(value_field); - let value_field = Arc::new(NestedField { - id: value_id, - doc: value_doc, - name: "value".to_string(), - required: !value_field.is_nullable(), - field_type: Box::new(value.clone()), - initial_default: None, - write_default: None, - }); + let mut value_field = NestedField::map_value_element( + value_id, + value.clone(), + !value_field.is_nullable(), + ); + if let Some(doc) = value_doc { + value_field = value_field.with_doc(doc); + } + let value_field = Arc::new(value_field); Ok(Type::Map(MapType { key_field, @@ -447,10 +439,13 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)), DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)), DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)), - DataType::Decimal128(p, s) => Ok(Type::Primitive(PrimitiveType::Decimal { - precision: *p as u32, - scale: *s as u32, - })), + DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to create decimal type"), + ) + .with_source(e) + }), DataType::Date32 => Ok(Type::Primitive(PrimitiveType::Date)), DataType::Time64(unit) if unit == &TimeUnit::Microsecond => { Ok(Type::Primitive(PrimitiveType::Time)) From 752ea8e3f42d8241b39efa9b85e4930749dab492 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 20 Mar 2024 23:00:01 -0700 Subject: [PATCH 7/7] Fix clippy --- crates/iceberg/src/arrow.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow.rs b/crates/iceberg/src/arrow.rs index e812091e40..527fb1917c 100644 --- a/crates/iceberg/src/arrow.rs +++ b/crates/iceberg/src/arrow.rs @@ -294,9 +294,12 @@ const ARROW_FIELD_DOC_KEY: &str = "doc"; fn get_field_id(field: &Field) -> Result { if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) { return value.parse::().map_err(|e| { - Error::new(ErrorKind::DataInvalid, format!("Failed to parse field id")) - .with_context("value", value) - .with_source(e) + Error::new( + ErrorKind::DataInvalid, + "Failed to parse field id".to_string(), + ) + .with_context("value", value) + .with_source(e) }); } Err(Error::new( @@ -442,7 +445,7 @@ impl ArrowSchemaVisitor for ArrowSchemaConverter { DataType::Decimal128(p, s) => Type::decimal(*p as u32, *s as u32).map_err(|e| { Error::new( ErrorKind::DataInvalid, - format!("Failed to create decimal type"), + "Failed to create decimal type".to_string(), ) .with_source(e) }),