From 8699c123c54d7cef94712d747789b19d25d3d9b3 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Thu, 22 May 2025 07:29:19 +0100 Subject: [PATCH 1/5] feat: add equality delete parsing --- .../src/arrow/caching_delete_file_loader.rs | 242 +++++++++++++++++- 1 file changed, 228 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index f0dece75a4..54d3b9f81d 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -15,19 +15,26 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::ops::Not; -use arrow_array::{Int64Array, StringArray}; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, +}; use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use tokio::sync::oneshot::{Receiver, channel}; use super::delete_filter::DeleteFilter; +use crate::arrow::arrow_schema_to_schema; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Predicate, Reference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::{DataContentType, SchemaRef}; +use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, SchemaRef}; use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] @@ -43,6 +50,7 @@ enum DeleteFileContext { PosDels(ArrowRecordBatchStream), FreshEqDel { batch_stream: ArrowRecordBatchStream, + equality_ids: HashSet, sender: tokio::sync::oneshot::Sender, }, } @@ -224,6 +232,7 @@ impl CachingDeleteFileLoader { ) .await?, sender, + equality_ids: HashSet::from_iter(task.equality_ids.clone()), }) } @@ -247,9 +256,11 @@ impl CachingDeleteFileLoader { DeleteFileContext::FreshEqDel { sender, batch_stream, + equality_ids, } => { let predicate = - Self::parse_equality_deletes_record_batch_stream(batch_stream).await?; + Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids) + .await?; sender .send(predicate) @@ -308,28 +319,231 @@ impl CachingDeleteFileLoader { Ok(result) } - /// Parses record batch streams from individual equality delete files - /// - /// Returns an unbound Predicate for each batch stream async fn parse_equality_deletes_record_batch_stream( - streams: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, + equality_ids: HashSet, ) -> Result { - // TODO + let mut result_predicate = AlwaysTrue; + + while let Some(record_batch) = stream.next().await { + let record_batch = record_batch?; + + if record_batch.num_columns() == 0 { + return Ok(AlwaysTrue); + } + + let batch_schema_arrow = record_batch.schema(); + let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?; + + let mut datum_columns_with_names: Vec<_> = record_batch + .columns() + .iter() + .zip(batch_schema_iceberg.as_struct().fields()) + // only use columns that are in the set of equality_ids for this delete file + .filter(|(field, value)| equality_ids.contains(&value.id)) + .map(|(column, field)| { + let col_as_datum_vec = arrow_array_to_datum_iterator(column, field); + col_as_datum_vec.map(|c| (c, field.name.to_string())) + }) + .try_collect()?; + + // consume all the iterators in lockstep, creating per-row predicates that get combined + // into a single final predicate + + // (2025-06-12) can't use `is_empty` as it depends on unstable library feature `exact_size_is_empty` + #[allow(clippy::len_zero)] + while datum_columns_with_names[0].0.len() > 0 { + let mut row_predicate = AlwaysTrue; + for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { + if let Some(item) = column.next() { + if let Some(datum) = item? { + row_predicate = row_predicate + .and(Reference::new(field_name.clone()).equal_to(datum.clone())); + } + } + } + result_predicate = result_predicate.and(row_predicate.not()); + } + } + Ok(result_predicate.rewrite_not()) + } +} + +macro_rules! prim_to_datum { + ($column:ident, $arr:ty, $dat:path) => {{ + let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new( + ErrorKind::Unexpected, + format!("could not downcast ArrayRef to {}", stringify!($arr)), + ))?; + Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat))))) + }}; +} + +fn eq_col_unsupported(ty: &str) -> Error { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Equality deletes where a predicate acts upon a {} column are not yet supported", + ty + ), + ) +} - Err(Error::new( - ErrorKind::FeatureUnsupported, - "parsing of equality deletes is not yet supported", - )) +fn arrow_array_to_datum_iterator<'a>( + column: &'a ArrayRef, + field: &NestedFieldRef, +) -> Result>> + 'a>> { + match field.field_type.as_primitive_type() { + Some(primitive_type) => match primitive_type { + PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int), + PrimitiveType::Boolean => { + prim_to_datum!(column, BooleanArray, Datum::bool) + } + PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long), + PrimitiveType::Float => { + prim_to_datum!(column, Float32Array, Datum::float) + } + PrimitiveType::Double => { + prim_to_datum!(column, Float64Array, Datum::double) + } + PrimitiveType::String => { + prim_to_datum!(column, StringArray, Datum::string) + } + PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date), + PrimitiveType::Timestamp => { + prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros) + } + PrimitiveType::Timestamptz => { + prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros) + } + PrimitiveType::TimestampNs => { + prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos) + } + PrimitiveType::TimestamptzNs => { + prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos) + } + PrimitiveType::Time => { + let arr = column + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "could not downcast ArrayRef to Time64MicrosecondArray", + ))?; + Ok(Box::new(arr.iter().map(|val| match val { + None => Ok(None), + Some(val) => Datum::time_micros(val).map(Some), + }))) + } + PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")), + PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")), + PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")), + PrimitiveType::Binary => Err(eq_col_unsupported("Binary")), + }, + None => Err(eq_col_unsupported( + "non-primitive (i.e. Struct, List, or Map)", + )), } } #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs::File; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; use tempfile::TempDir; use super::*; use crate::arrow::delete_filter::tests::setup; + #[tokio::test] + async fn test_delete_file_loader_parse_equality_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + + let eq_delete_file_path = setup_write_equality_delete_file_1(table_location); + + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let record_batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&eq_delete_file_path) + .await + .expect("could not get batch stream"); + + let eq_ids = HashSet::from_iter(vec![2, 3, 4]); + + let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream( + record_batch_stream, + eq_ids, + ) + .await + .expect("error parsing batch stream"); + println!("{}", parsed_eq_delete); + + let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string(); + + assert_eq!(parsed_eq_delete.to_string(), expected); + } + + fn setup_write_equality_delete_file_1(table_location: &str) -> String { + let col_y_vals = vec![1, 2]; + let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef; + + let col_z_vals = vec![Some(100), None]; + let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef; + + let col_a_vals = vec![Some("HELP"), None]; + let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef; + + let equality_delete_schema = { + let fields = vec![ + arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), + ), + arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), + ), + arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let equality_deletes_to_write = + RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a]) + .unwrap(); + + let path = format!("{}/equality-deletes-1.parquet", &table_location); + + let file = File::create(&path).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let mut writer = ArrowWriter::try_new( + file, + equality_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&equality_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + + path + } + #[tokio::test] async fn test_caching_delete_file_loader_load_deletes() { let tmp_dir = TempDir::new().unwrap(); From 37326081b9463bf54ad06b4835019cd7c9d16425 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 2 Jul 2025 07:55:54 +0100 Subject: [PATCH 2/5] fix: ensure that equality delete predicate builder correctly handles null values in eq del file --- .../iceberg/src/arrow/caching_delete_file_loader.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 54d3b9f81d..77be41cd9d 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -356,10 +356,12 @@ impl CachingDeleteFileLoader { let mut row_predicate = AlwaysTrue; for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { if let Some(item) = column.next() { - if let Some(datum) = item? { - row_predicate = row_predicate - .and(Reference::new(field_name.clone()).equal_to(datum.clone())); - } + let cell_predicate = if let Some(datum) = item? { + Reference::new(field_name.clone()).equal_to(datum.clone()) + } else { + Reference::new(field_name.clone()).is_null() + }; + row_predicate = row_predicate.and(cell_predicate) } } result_predicate = result_predicate.and(row_predicate.not()); @@ -485,7 +487,7 @@ mod tests { .expect("error parsing batch stream"); println!("{}", parsed_eq_delete); - let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string(); + let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL))".to_string(); assert_eq!(parsed_eq_delete.to_string(), expected); } From 1f687c70bc378fec11e5f7bceab9165381af3528 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 2 Jul 2025 20:31:35 +0100 Subject: [PATCH 3/5] refactor: remove arrow_array_to_datum_iterator and add arrow_primitive_to_literal, which uses existing visitor --- .../src/arrow/caching_delete_file_loader.rs | 114 +++++------------- crates/iceberg/src/arrow/value.rs | 17 ++- 2 files changed, 44 insertions(+), 87 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 77be41cd9d..e0c83fb400 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -18,23 +18,20 @@ use std::collections::{HashMap, HashSet}; use std::ops::Not; -use arrow_array::{ - Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, - StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, -}; +use arrow_array::{Array, Int64Array, StringArray}; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use tokio::sync::oneshot::{Receiver, channel}; use super::delete_filter::DeleteFilter; -use crate::arrow::arrow_schema_to_schema; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; +use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema}; use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Predicate, Reference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, SchemaRef}; +use crate::spec::{DataContentType, Datum, SchemaRef}; use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] @@ -342,8 +339,30 @@ impl CachingDeleteFileLoader { // only use columns that are in the set of equality_ids for this delete file .filter(|(field, value)| equality_ids.contains(&value.id)) .map(|(column, field)| { - let col_as_datum_vec = arrow_array_to_datum_iterator(column, field); - col_as_datum_vec.map(|c| (c, field.name.to_string())) + let lit_vec = arrow_primitive_to_literal(column, &field.field_type)?; + + let primitive_type = field.field_type.as_primitive_type().ok_or(Error::new( + ErrorKind::Unexpected, + "field is not a primitive type", + ))?; + + let datum_iterator: Box>>> = + Box::new(lit_vec.into_iter().map(move |c| { + c.map(|literal| { + literal + .as_primitive_literal() + .map(|primitive_literal| { + Datum::new(primitive_type.clone(), primitive_literal) + }) + .ok_or(Error::new( + ErrorKind::Unexpected, + "failed to convert to primitive literal", + )) + }) + .transpose() + })); + + Ok::<_, Error>((datum_iterator, field.name.to_string())) }) .try_collect()?; @@ -371,90 +390,13 @@ impl CachingDeleteFileLoader { } } -macro_rules! prim_to_datum { - ($column:ident, $arr:ty, $dat:path) => {{ - let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new( - ErrorKind::Unexpected, - format!("could not downcast ArrayRef to {}", stringify!($arr)), - ))?; - Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat))))) - }}; -} - -fn eq_col_unsupported(ty: &str) -> Error { - Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Equality deletes where a predicate acts upon a {} column are not yet supported", - ty - ), - ) -} - -fn arrow_array_to_datum_iterator<'a>( - column: &'a ArrayRef, - field: &NestedFieldRef, -) -> Result>> + 'a>> { - match field.field_type.as_primitive_type() { - Some(primitive_type) => match primitive_type { - PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int), - PrimitiveType::Boolean => { - prim_to_datum!(column, BooleanArray, Datum::bool) - } - PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long), - PrimitiveType::Float => { - prim_to_datum!(column, Float32Array, Datum::float) - } - PrimitiveType::Double => { - prim_to_datum!(column, Float64Array, Datum::double) - } - PrimitiveType::String => { - prim_to_datum!(column, StringArray, Datum::string) - } - PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date), - PrimitiveType::Timestamp => { - prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros) - } - PrimitiveType::Timestamptz => { - prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros) - } - PrimitiveType::TimestampNs => { - prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos) - } - PrimitiveType::TimestamptzNs => { - prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos) - } - PrimitiveType::Time => { - let arr = column - .as_any() - .downcast_ref::() - .ok_or(Error::new( - ErrorKind::Unexpected, - "could not downcast ArrayRef to Time64MicrosecondArray", - ))?; - Ok(Box::new(arr.iter().map(|val| match val { - None => Ok(None), - Some(val) => Datum::time_micros(val).map(Some), - }))) - } - PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")), - PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")), - PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")), - PrimitiveType::Binary => Err(eq_col_unsupported("Binary")), - }, - None => Err(eq_col_unsupported( - "non-primitive (i.e. Struct, List, or Map)", - )), - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; use std::fs::File; use std::sync::Arc; - use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 9ddd941fa4..25b138a5f9 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -27,7 +27,8 @@ use uuid::Uuid; use super::get_field_id; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, - SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner, + SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, + visit_type_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -604,6 +605,20 @@ pub fn arrow_struct_to_literal( ) } +/// Convert arrow primitive array to iceberg primitive value array. +/// This function will assume the schema of arrow struct array is the same as iceberg struct type. +pub fn arrow_primitive_to_literal( + primitive_array: &ArrayRef, + ty: &Type, +) -> Result>> { + visit_type_with_partner( + ty, + primitive_array, + &mut ArrowArrayToIcebergStructConverter, + &ArrowArrayAccessor, + ) +} + #[cfg(test)] mod test { use std::collections::HashMap; From 681ab7e70b6cf8ccf02fc1d3d2757331312aebd2 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Fri, 11 Jul 2025 19:54:22 +0100 Subject: [PATCH 4/5] feat: use visitor to allow struct fields to be used in eq deletes --- .../src/arrow/caching_delete_file_loader.rs | 281 ++++++++++++++---- crates/iceberg/src/scan/context.rs | 1 - 2 files changed, 224 insertions(+), 58 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index e0c83fb400..c6a8943d78 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -17,10 +17,10 @@ use std::collections::{HashMap, HashSet}; use std::ops::Not; +use std::sync::Arc; -use arrow_array::{Array, Int64Array, StringArray}; +use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use futures::{StreamExt, TryStreamExt}; -use itertools::Itertools; use tokio::sync::oneshot::{Receiver, channel}; use super::delete_filter::DeleteFilter; @@ -31,7 +31,11 @@ use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Predicate, Reference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::{DataContentType, Datum, SchemaRef}; +use crate::spec::{ + DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, + PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, + visit_schema_with_partner, +}; use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] @@ -321,6 +325,8 @@ impl CachingDeleteFileLoader { equality_ids: HashSet, ) -> Result { let mut result_predicate = AlwaysTrue; + let mut batch_schema_iceberg: Option = None; + let accessor = EqDelRecordBatchPartnerAccessor; while let Some(record_batch) = stream.next().await { let record_batch = record_batch?; @@ -329,47 +335,26 @@ impl CachingDeleteFileLoader { return Ok(AlwaysTrue); } - let batch_schema_arrow = record_batch.schema(); - let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?; - - let mut datum_columns_with_names: Vec<_> = record_batch - .columns() - .iter() - .zip(batch_schema_iceberg.as_struct().fields()) - // only use columns that are in the set of equality_ids for this delete file - .filter(|(field, value)| equality_ids.contains(&value.id)) - .map(|(column, field)| { - let lit_vec = arrow_primitive_to_literal(column, &field.field_type)?; - - let primitive_type = field.field_type.as_primitive_type().ok_or(Error::new( - ErrorKind::Unexpected, - "field is not a primitive type", - ))?; - - let datum_iterator: Box>>> = - Box::new(lit_vec.into_iter().map(move |c| { - c.map(|literal| { - literal - .as_primitive_literal() - .map(|primitive_literal| { - Datum::new(primitive_type.clone(), primitive_literal) - }) - .ok_or(Error::new( - ErrorKind::Unexpected, - "failed to convert to primitive literal", - )) - }) - .transpose() - })); - - Ok::<_, Error>((datum_iterator, field.name.to_string())) - }) - .try_collect()?; + let schema = match &batch_schema_iceberg { + Some(schema) => schema, + None => { + let schema = arrow_schema_to_schema(record_batch.schema().as_ref())?; + batch_schema_iceberg = Some(schema); + batch_schema_iceberg.as_ref().unwrap() + } + }; + + let root_array: ArrayRef = Arc::new(StructArray::from(record_batch)); - // consume all the iterators in lockstep, creating per-row predicates that get combined - // into a single final predicate + let mut processor = EqDelColumnProcessor::new(&equality_ids); + visit_schema_with_partner(schema, &root_array, &mut processor, &accessor)?; - // (2025-06-12) can't use `is_empty` as it depends on unstable library feature `exact_size_is_empty` + let mut datum_columns_with_names = processor.finish()?; + if datum_columns_with_names.is_empty() { + continue; + } + + // Process the collected columns in lockstep #[allow(clippy::len_zero)] while datum_columns_with_names[0].0.len() > 0 { let mut row_predicate = AlwaysTrue; @@ -390,13 +375,169 @@ impl CachingDeleteFileLoader { } } +struct EqDelColumnProcessor<'a> { + equality_ids: &'a HashSet, + collected_columns: Vec<(ArrayRef, String, Type)>, +} + +impl<'a> EqDelColumnProcessor<'a> { + fn new(equality_ids: &'a HashSet) -> Self { + Self { + equality_ids, + collected_columns: Vec::with_capacity(equality_ids.len()), + } + } + + #[allow(clippy::type_complexity)] + fn finish( + self, + ) -> Result< + Vec<( + Box>>>, + String, + )>, + > { + self.collected_columns + .into_iter() + .map(|(array, field_name, field_type)| { + let primitive_type = field_type + .as_primitive_type() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "field is not a primitive type") + })? + .clone(); + + let lit_vec = arrow_primitive_to_literal(&array, &field_type)?; + let datum_iterator: Box>>> = + Box::new(lit_vec.into_iter().map(move |c| { + c.map(|literal| { + literal + .as_primitive_literal() + .map(|primitive_literal| { + Datum::new(primitive_type.clone(), primitive_literal) + }) + .ok_or(Error::new( + ErrorKind::Unexpected, + "failed to convert to primitive literal", + )) + }) + .transpose() + })); + + Ok((datum_iterator, field_name)) + }) + .collect::>>() + } +} + +impl SchemaWithPartnerVisitor for EqDelColumnProcessor<'_> { + type T = (); + + fn schema(&mut self, _schema: &Schema, _partner: &ArrayRef, _value: ()) -> Result<()> { + Ok(()) + } + + fn field(&mut self, field: &NestedFieldRef, partner: &ArrayRef, _value: ()) -> Result<()> { + if self.equality_ids.contains(&field.id) && field.field_type.as_primitive_type().is_some() { + self.collected_columns.push(( + partner.clone(), + field.name.clone(), + field.field_type.as_ref().clone(), + )); + } + Ok(()) + } + + fn r#struct( + &mut self, + _struct: &StructType, + _partner: &ArrayRef, + _results: Vec<()>, + ) -> Result<()> { + Ok(()) + } + + fn list(&mut self, _list: &ListType, _partner: &ArrayRef, _value: ()) -> Result<()> { + Ok(()) + } + + fn map( + &mut self, + _map: &MapType, + _partner: &ArrayRef, + _key_value: (), + _value: (), + ) -> Result<()> { + Ok(()) + } + + fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) -> Result<()> { + Ok(()) + } +} + +struct EqDelRecordBatchPartnerAccessor; + +impl PartnerAccessor for EqDelRecordBatchPartnerAccessor { + fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Ok(schema_partner) + } + + fn field_partner<'a>( + &self, + struct_partner: &'a ArrayRef, + field: &NestedField, + ) -> Result<&'a ArrayRef> { + let Some(struct_array) = struct_partner.as_any().downcast_ref::() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Expected struct array for field extraction", + )); + }; + + // Find the field by name within the struct + for (i, field_def) in struct_array.fields().iter().enumerate() { + if field_def.name() == &field.name { + return Ok(struct_array.column(i)); + } + } + + Err(Error::new( + ErrorKind::Unexpected, + format!("Field {} not found in parent struct", field.name), + )) + } + + fn list_element_partner<'a>(&self, _list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "List columns are unsupported in equality deletes", + )) + } + + fn map_key_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Map columns are unsupported in equality deletes", + )) + } + + fn map_value_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Map columns are unsupported in equality deletes", + )) + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; use std::fs::File; use std::sync::Arc; - use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; + use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Fields}; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; @@ -419,7 +560,7 @@ mod tests { .await .expect("could not get batch stream"); - let eq_ids = HashSet::from_iter(vec![2, 3, 4]); + let eq_ids = HashSet::from_iter(vec![2, 3, 4, 6]); let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream( record_batch_stream, @@ -429,11 +570,19 @@ mod tests { .expect("error parsing batch stream"); println!("{}", parsed_eq_delete); - let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL))".to_string(); + let expected = "((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) AND ((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5))".to_string(); assert_eq!(parsed_eq_delete.to_string(), expected); } + /// Create a simple field with metadata. + fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field { + arrow_schema::Field::new(name, ty, nullable).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + value.to_string(), + )])) + } + fn setup_write_equality_delete_file_1(table_location: &str) -> String { let col_y_vals = vec![1, 2]; let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef; @@ -444,24 +593,42 @@ mod tests { let col_a_vals = vec![Some("HELP"), None]; let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef; + let col_s = Arc::new(StructArray::from(vec![ + ( + Arc::new(simple_field("sa", DataType::Int32, false, "6")), + Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef, + ), + ( + Arc::new(simple_field("sb", DataType::Utf8, true, "7")), + Arc::new(StringArray::from(vec![Some("x"), None])) as ArrayRef, + ), + ])); + let equality_delete_schema = { + let struct_field = DataType::Struct(Fields::from(vec![ + simple_field("sa", DataType::Int32, false, "6"), + simple_field("sb", DataType::Utf8, true, "7"), + ])); + let fields = vec![ - arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]), - ), - arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]), - ), - arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata( - HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]), - ), + Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())], + )), + Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())], + )), + Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()), + ])), + simple_field("s", struct_field, false, "5"), ]; Arc::new(arrow_schema::Schema::new(fields)) }; - let equality_deletes_to_write = - RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a]) - .unwrap(); + let equality_deletes_to_write = RecordBatch::try_new(equality_delete_schema.clone(), vec![ + col_y, col_z, col_a, col_s, + ]) + .unwrap(); let path = format!("{}/equality-deletes-1.parquet", &table_location); diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b2b..3f7c29dbf4 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -191,7 +191,6 @@ impl PlanContext { // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; - for manifest_file in manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { delete_file_tx.clone() From 0f0bbfa833dbf85fc5e8e0b91b44be1f10803da7 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 1 Sep 2025 07:49:25 +0100 Subject: [PATCH 5/5] address issue from rebase --- crates/iceberg/src/arrow/value.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 25b138a5f9..cc3a561d57 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -615,7 +615,7 @@ pub fn arrow_primitive_to_literal( ty, primitive_array, &mut ArrowArrayToIcebergStructConverter, - &ArrowArrayAccessor, + &ArrowArrayAccessor::new(), ) }