Skip to content

Commit 5af8f81

Browse files
committed
feat: add equality delete parsing
1 parent acd7ab8 commit 5af8f81

File tree

1 file changed

+217
-29
lines changed

1 file changed

+217
-29
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 217 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,26 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
18+
use std::collections::{HashMap, HashSet};
19+
use std::ops::Not;
1920

21+
use arrow_array::{
22+
Array, ArrayRef, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
23+
StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
24+
};
2025
use futures::{StreamExt, TryStreamExt};
26+
use itertools::Itertools;
2127
use tokio::sync::oneshot::{Receiver, channel};
2228

2329
use super::delete_filter::DeleteFilter;
30+
use crate::arrow::arrow_schema_to_schema;
2431
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
2532
use crate::delete_vector::DeleteVector;
26-
use crate::expr::Predicate;
33+
use crate::expr::Predicate::AlwaysTrue;
34+
use crate::expr::{Predicate, Reference};
2735
use crate::io::FileIO;
2836
use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
29-
use crate::spec::{DataContentType, SchemaRef};
37+
use crate::spec::{DataContentType, Datum, NestedFieldRef, PrimitiveType, SchemaRef};
3038
use crate::{Error, ErrorKind, Result};
3139

3240
#[derive(Clone, Debug)]
@@ -42,6 +50,7 @@ enum DeleteFileContext {
4250
PosDels(ArrowRecordBatchStream),
4351
FreshEqDel {
4452
batch_stream: ArrowRecordBatchStream,
53+
equality_ids: HashSet<i32>,
4554
sender: tokio::sync::oneshot::Sender<Predicate>,
4655
},
4756
}
@@ -223,6 +232,7 @@ impl CachingDeleteFileLoader {
223232
)
224233
.await?,
225234
sender,
235+
equality_ids: HashSet::from_iter(task.equality_ids.clone()),
226236
})
227237
}
228238

@@ -246,9 +256,11 @@ impl CachingDeleteFileLoader {
246256
DeleteFileContext::FreshEqDel {
247257
sender,
248258
batch_stream,
259+
equality_ids,
249260
} => {
250261
let predicate =
251-
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
262+
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
263+
.await?;
252264

253265
sender
254266
.send(predicate)
@@ -277,48 +289,224 @@ impl CachingDeleteFileLoader {
277289
))
278290
}
279291

280-
/// Parses record batch streams from individual equality delete files
281-
///
282-
/// Returns an unbound Predicate for each batch stream
283292
async fn parse_equality_deletes_record_batch_stream(
284-
streams: ArrowRecordBatchStream,
293+
mut stream: ArrowRecordBatchStream,
294+
equality_ids: HashSet<i32>,
285295
) -> Result<Predicate> {
286-
// TODO
296+
let mut result_predicate = AlwaysTrue;
287297

288-
Err(Error::new(
289-
ErrorKind::FeatureUnsupported,
290-
"parsing of equality deletes is not yet supported",
291-
))
298+
while let Some(record_batch) = stream.next().await {
299+
let record_batch = record_batch?;
300+
301+
if record_batch.num_columns() == 0 {
302+
return Ok(AlwaysTrue);
303+
}
304+
305+
let batch_schema_arrow = record_batch.schema();
306+
let batch_schema_iceberg = arrow_schema_to_schema(batch_schema_arrow.as_ref())?;
307+
308+
let mut datum_columns_with_names: Vec<_> = record_batch
309+
.columns()
310+
.iter()
311+
.zip(batch_schema_iceberg.as_struct().fields())
312+
// only use columns that are in the set of equality_ids for this delete file
313+
.filter(|(field, value)| equality_ids.contains(&value.id))
314+
.map(|(column, field)| {
315+
let col_as_datum_vec = arrow_array_to_datum_iterator(column, field);
316+
col_as_datum_vec.map(|c| (c, field.name.to_string()))
317+
})
318+
.try_collect()?;
319+
320+
// consume all the iterators in lockstep, creating per-row predicates that get combined
321+
// into a single final predicate
322+
while datum_columns_with_names[0].0.len() > 0 {
323+
let mut row_predicate = AlwaysTrue;
324+
for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names {
325+
if let Some(item) = column.next() {
326+
if let Some(datum) = item? {
327+
row_predicate = row_predicate
328+
.and(Reference::new(field_name.clone()).equal_to(datum.clone()));
329+
}
330+
}
331+
}
332+
result_predicate = result_predicate.and(row_predicate.not());
333+
}
334+
}
335+
Ok(result_predicate.rewrite_not())
336+
}
337+
}
338+
339+
macro_rules! prim_to_datum {
340+
($column:ident, $arr:ty, $dat:path) => {{
341+
let arr = $column.as_any().downcast_ref::<$arr>().ok_or(Error::new(
342+
ErrorKind::Unexpected,
343+
format!("could not downcast ArrayRef to {}", stringify!($arr)),
344+
))?;
345+
Ok(Box::new(arr.iter().map(|val| Ok(val.map($dat)))))
346+
}};
347+
}
348+
349+
fn eq_col_unsupported(ty: &str) -> Error {
350+
Error::new(
351+
ErrorKind::FeatureUnsupported,
352+
format!(
353+
"Equality deletes where a predicate acts upon a {} column are not yet supported",
354+
ty
355+
),
356+
)
357+
}
358+
359+
fn arrow_array_to_datum_iterator<'a>(
360+
column: &'a ArrayRef,
361+
field: &NestedFieldRef,
362+
) -> Result<Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>> + 'a>> {
363+
match field.field_type.as_primitive_type() {
364+
Some(primitive_type) => match primitive_type {
365+
PrimitiveType::Int => prim_to_datum!(column, Int32Array, Datum::int),
366+
PrimitiveType::Boolean => {
367+
prim_to_datum!(column, BooleanArray, Datum::bool)
368+
}
369+
PrimitiveType::Long => prim_to_datum!(column, Int64Array, Datum::long),
370+
PrimitiveType::Float => {
371+
prim_to_datum!(column, Float32Array, Datum::float)
372+
}
373+
PrimitiveType::Double => {
374+
prim_to_datum!(column, Float64Array, Datum::double)
375+
}
376+
PrimitiveType::String => {
377+
prim_to_datum!(column, StringArray, Datum::string)
378+
}
379+
PrimitiveType::Date => prim_to_datum!(column, Date32Array, Datum::date),
380+
PrimitiveType::Timestamp => {
381+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamp_micros)
382+
}
383+
PrimitiveType::Timestamptz => {
384+
prim_to_datum!(column, TimestampMicrosecondArray, Datum::timestamptz_micros)
385+
}
386+
PrimitiveType::TimestampNs => {
387+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamp_nanos)
388+
}
389+
PrimitiveType::TimestamptzNs => {
390+
prim_to_datum!(column, TimestampNanosecondArray, Datum::timestamptz_nanos)
391+
}
392+
PrimitiveType::Time => {
393+
let arr = column
394+
.as_any()
395+
.downcast_ref::<Time64MicrosecondArray>()
396+
.ok_or(Error::new(
397+
ErrorKind::Unexpected,
398+
"could not downcast ArrayRef to Time64MicrosecondArray",
399+
))?;
400+
Ok(Box::new(arr.iter().map(|val| match val {
401+
None => Ok(None),
402+
Some(val) => Datum::time_micros(val).map(Some),
403+
})))
404+
}
405+
PrimitiveType::Decimal { .. } => Err(eq_col_unsupported("Decimal")),
406+
PrimitiveType::Uuid => Err(eq_col_unsupported("Uuid")),
407+
PrimitiveType::Fixed(_) => Err(eq_col_unsupported("Fixed")),
408+
PrimitiveType::Binary => Err(eq_col_unsupported("Binary")),
409+
},
410+
None => Err(eq_col_unsupported(
411+
"non-primitive (i.e. Struct, List, or Map)",
412+
)),
292413
}
293414
}
294415

295416
#[cfg(test)]
296417
mod tests {
418+
use std::collections::HashMap;
419+
use std::fs::File;
420+
use std::sync::Arc;
421+
422+
use arrow_array::{Int64Array, RecordBatch, StringArray};
423+
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
424+
use parquet::basic::Compression;
425+
use parquet::file::properties::WriterProperties;
297426
use tempfile::TempDir;
298427

299428
use super::*;
300-
use crate::arrow::delete_file_loader::tests::setup;
301429

302430
#[tokio::test]
303-
async fn test_delete_file_manager_load_deletes() {
431+
async fn test_delete_file_loader_parse_equality_deletes() {
304432
let tmp_dir = TempDir::new().unwrap();
305-
let table_location = tmp_dir.path();
306-
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
307-
.unwrap()
308-
.build()
309-
.unwrap();
433+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
434+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
310435

311-
// Note that with the delete file parsing not yet in place, all we can test here is that
312-
// the call to the loader fails with the expected FeatureUnsupportedError.
313-
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
436+
let eq_delete_file_path = setup_write_equality_delete_file_1(table_location);
314437

315-
let file_scan_tasks = setup(table_location);
316-
317-
let result = delete_file_manager
318-
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
438+
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
439+
let record_batch_stream = basic_delete_file_loader
440+
.parquet_to_batch_stream(&eq_delete_file_path)
319441
.await
320-
.unwrap();
442+
.expect("could not get batch stream");
443+
444+
let eq_ids = HashSet::from_iter(vec![2, 3, 4]);
445+
446+
let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
447+
record_batch_stream,
448+
eq_ids,
449+
)
450+
.await
451+
.expect("error parsing batch stream");
452+
println!("{}", parsed_eq_delete);
453+
454+
let expected = "(((y != 1) OR (z != 100)) OR (a != \"HELP\")) AND (y != 2)".to_string();
455+
456+
assert_eq!(parsed_eq_delete.to_string(), expected);
457+
}
321458

322-
assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
459+
fn setup_write_equality_delete_file_1(table_location: &str) -> String {
460+
let col_y_vals = vec![1, 2];
461+
let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
462+
463+
let col_z_vals = vec![Some(100), None];
464+
let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
465+
466+
let col_a_vals = vec![Some("HELP"), None];
467+
let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
468+
469+
let equality_delete_schema = {
470+
let fields = vec![
471+
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(
472+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())]),
473+
),
474+
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(
475+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())]),
476+
),
477+
arrow_schema::Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(
478+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())]),
479+
),
480+
];
481+
Arc::new(arrow_schema::Schema::new(fields))
482+
};
483+
484+
let equality_deletes_to_write =
485+
RecordBatch::try_new(equality_delete_schema.clone(), vec![col_y, col_z, col_a])
486+
.unwrap();
487+
488+
let path = format!("{}/equality-deletes-1.parquet", &table_location);
489+
490+
let file = File::create(&path).unwrap();
491+
492+
let props = WriterProperties::builder()
493+
.set_compression(Compression::SNAPPY)
494+
.build();
495+
496+
let mut writer = ArrowWriter::try_new(
497+
file,
498+
equality_deletes_to_write.schema(),
499+
Some(props.clone()),
500+
)
501+
.unwrap();
502+
503+
writer
504+
.write(&equality_deletes_to_write)
505+
.expect("Writing batch");
506+
507+
// writer must be closed to write footer
508+
writer.close().unwrap();
509+
510+
path
323511
}
324512
}

0 commit comments

Comments
 (0)