Skip to content

Commit d499fdf

Browse files
committed
feat: add equality delete parsing
1 parent 6ccea70 commit d499fdf

File tree

1 file changed

+221
-16
lines changed

1 file changed

+221
-16
lines changed

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 221 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,29 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
18+
use std::collections::{HashMap, HashSet};
19+
use std::ops::Not;
1920
use std::sync::Arc;
2021

22+
use arrow_array::{
23+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
24+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
25+
};
2126
use futures::channel::oneshot;
2227
use futures::future::join_all;
2328
use futures::{StreamExt, TryStreamExt};
29+
use itertools::Itertools;
2430
use tokio::sync::oneshot::{channel, Receiver};
2531

2632
use super::delete_filter::{DeleteFilter, EqDelFuture};
2733
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
28-
use crate::arrow::ArrowReader;
34+
use crate::arrow::{arrow_schema_to_schema, ArrowReader};
2935
use crate::delete_vector::DeleteVector;
30-
use crate::expr::Predicate;
36+
use crate::expr::Predicate::AlwaysTrue;
37+
use crate::expr::{Predicate, Reference};
3138
use crate::io::FileIO;
3239
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
33-
use crate::spec::{DataContentType, Schema, SchemaRef};
40+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, Schema, SchemaRef};
3441
use crate::{Error, ErrorKind, Result};
3542

3643
#[allow(unused)]
@@ -75,6 +82,7 @@ enum DeleteFileContext {
7582
FreshEqDel {
7683
batch_stream: ArrowRecordBatchStream,
7784
sender: oneshot::Sender<Predicate>,
85+
equality_ids: HashSet<i32>,
7886
},
7987
}
8088

@@ -258,6 +266,7 @@ impl CachingDeleteFileLoader {
258266
)
259267
.await?,
260268
sender,
269+
equality_ids: HashSet::from_iter(task.equality_ids.clone()),
261270
})
262271
}
263272

@@ -281,9 +290,11 @@ impl CachingDeleteFileLoader {
281290
DeleteFileContext::FreshEqDel {
282291
sender,
283292
batch_stream,
293+
equality_ids,
284294
} => {
285295
let predicate =
286-
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
296+
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
297+
.await?;
287298

288299
sender
289300
.send(predicate)
@@ -361,14 +372,126 @@ impl CachingDeleteFileLoader {
361372
///
362373
/// Returns an unbound Predicate for each batch stream
363374
async fn parse_equality_deletes_record_batch_stream(
364-
streams: ArrowRecordBatchStream,
375+
mut stream: ArrowRecordBatchStream,
376+
equality_ids: HashSet<i32>,
365377
) -> Result<Predicate> {
366-
// TODO
378+
let mut result_predicate = AlwaysTrue;
367379

368-
Err(Error::new(
369-
ErrorKind::FeatureUnsupported,
370-
"parsing of equality deletes is not yet supported",
371-
))
380+
while let Some(record_batch) = stream.next().await {
381+
let record_batch = record_batch?;
382+
383+
if record_batch.num_columns() == 0 {
384+
return Ok(AlwaysTrue);
385+
}
386+
387+
let batch_schema_arrow = record_batch.schema();
388+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
389+
390+
let mut datum_columns_with_names: Vec<_> = record_batch
391+
.columns()
392+
.iter()
393+
.zip(batch_schema_iceberg.as_struct().fields())
394+
// only use columns that are in the set of equality_ids for this delete file
395+
.filter(|(field, value)| equality_ids.contains(&value.id))
396+
.map(|(column, field)| {
397+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
398+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
399+
})
400+
.try_collect()?;
401+
402+
// consume all the iterators in lockstep, creating per-row predicates that get combined
403+
// into a single final predicate
404+
while datum_columns_with_names[0].0.len() > 0 {
405+
let mut row_predicate = AlwaysTrue;
406+
for (ref mut column, ref field_name) in &mut datum_columns_with_names {
407+
if let Some(item) = column.next() {
408+
if let Some(datum) = item? {
409+
row_predicate = row_predicate
410+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
411+
}
412+
}
413+
}
414+
result_predicate = result_predicate.and(row_predicate.not());
415+
}
416+
}
417+
Ok(result_predicate.rewrite_not())
418+
}
419+
}
420+
421+
macro_rules! prim_to_datum {
422+
($column:ident, $arr:ty, $dat:path) => {{
423+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
424+
ErrorKind::Unexpected,
425+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
426+
))?;
427+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
428+
}};
429+
}
430+
431+
fn eq_col_unsupported(ty: &str) -> Error {
432+
Error::new(
433+
ErrorKind::FeatureUnsupported,
434+
format!(
435+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
436+
ty
437+
),
438+
)
439+
}
440+
441+
fn arrow_array_to_datum_iterator<'a>(
442+
column: &'a ArrayRef,
443+
field: &NestedFieldRef,
444+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
445+
match field.field_type.as_primitive_type() {
446+
Some(primitive_type) => match primitive_type {
447+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
448+
PrimitiveType::Boolean => {
449+
prim_to_datum!(column, BooleanArray, Datum::bool)
450+
}
451+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
452+
PrimitiveType::Float => {
453+
prim_to_datum!(column, Float32Array, Datum::float)
454+
}
455+
PrimitiveType::Double => {
456+
prim_to_datum!(column, Float64Array, Datum::double)
457+
}
458+
PrimitiveType::String => {
459+
prim_to_datum!(column, StringArray, Datum::string)
460+
}
461+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
462+
PrimitiveType::Timestamp => {
463+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
464+
}
465+
PrimitiveType::Timestamptz => {
466+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
467+
}
468+
PrimitiveType::TimestampNs => {
469+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
470+
}
471+
PrimitiveType::TimestamptzNs => {
472+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
473+
}
474+
PrimitiveType::Time => {
475+
let arr = column
476+
.as_any()
477+
.downcast_ref::<Time64MicrosecondArray>()
478+
.ok_or(Error::new(
479+
ErrorKind::Unexpected,
480+
"could not downcast ArrayRef to Time64MicrosecondArray",
481+
))?;
482+
Ok(Box::new(arr.iter().map(|val| match val {
483+
None => Ok(None),
484+
Some(val) => Datum::time_micros(val).map(Some),
485+
})))
486+
}
487+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
488+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
489+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
490+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
491+
},
492+
None => Err(eq_col_unsupported(
493+
"non-primitive (i.e. Struct, List, or Map)",
494+
)),
372495
}
373496
}
374497

@@ -395,7 +518,7 @@ mod tests {
395518
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
396519

397520
#[tokio::test]
398-
async fn test_delete_file_manager_load_deletes() {
521+
async fn test_delete_file_loader_load_deletes() {
399522
let tmp_dir = TempDir::new().unwrap();
400523
let table_location = tmp_dir.path();
401524
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
@@ -405,19 +528,47 @@ mod tests {
405528

406529
// Note that with the delete file parsing not yet in place, all we can test here is that
407530
// the call to the loader fails with the expected FeatureUnsupportedError.
408-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
531+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
409532

410-
let file_scan_tasks = setup(table_location);
533+
let file_scan_tasks = setup_load_deletes_test_tasks(table_location);
411534

412-
let result = delete_file_manager
535+
let result = delete_file_loader
413536
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
414537
.await
415538
.unwrap();
416539

417540
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
418541
}
419542

420-
fn setup(table_location: &Path) -> Vec<FileScanTask> {
543+
#[tokio::test]
544+
async fn test_delete_file_loader_parse_equality_deletes() {
545+
let tmp_dir = TempDir::new().unwrap();
546+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
547+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
548+
549+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
550+
551+
let record_batch_stream =
552+
CachingDeleteFileLoader::parquet_to_batch_stream(&eq_delete_file_path, file_io.clone())
553+
.await
554+
.expect("could not get batch stream");
555+
556+
let eq_ids = HashSet::from_iter(vec![2, 3, 4]);
557+
558+
let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
559+
record_batch_stream,
560+
eq_ids,
561+
)
562+
.await
563+
.expect("error parsing batch stream");
564+
println!("{}", parsed_eq_delete);
565+
566+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
567+
568+
assert_eq!(parsed_eq_delete.to_string(), expected);
569+
}
570+
571+
fn setup_load_deletes_test_tasks(table_location: &Path) -> Vec<FileScanTask> {
421572
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
422573
let positional_delete_schema = create_pos_del_schema();
423574

@@ -527,4 +678,58 @@ mod tests {
527678
];
528679
Arc::new(arrow_schema::Schema::new(fields))
529680
}
681+
682+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
683+
let col_y_vals = vec![1, 2];
684+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
685+
686+
let col_z_vals = vec![Some(100), None];
687+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
688+
689+
let col_a_vals = vec![Some("HELP"), None];
690+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
691+
692+
let equality_delete_schema = {
693+
let fields = vec![
694+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
695+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
696+
),
697+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
698+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
699+
),
700+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
701+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
702+
),
703+
];
704+
Arc::new(arrow_schema::Schema::new(fields))
705+
};
706+
707+
let equality_deletes_to_write =
708+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
709+
.unwrap();
710+
711+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
712+
713+
let file = File::create(&path).unwrap();
714+
715+
let props = WriterProperties::builder()
716+
.set_compression(Compression::SNAPPY)
717+
.build();
718+
719+
let mut writer = ArrowWriter::try_new(
720+
file,
721+
equality_deletes_to_write.schema(),
722+
Some(props.clone()),
723+
)
724+
.unwrap();
725+
726+
writer
727+
.write(&equality_deletes_to_write)
728+
.expect("Writing batch");
729+
730+
// writer must be closed to write footer
731+
writer.close().unwrap();
732+
733+
path
734+
}
530735
}

0 commit comments

Comments
 (0)