Skip to content

Commit 7ea25cb

Browse files
committed
feat: always process delete files in plan phase. Opt-out flag only applies to reader
1 parent 7b755fd commit 7ea25cb

File tree

3 files changed

+47
-72
lines changed

3 files changed

+47
-72
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub(crate) struct ManifestFileContext {
4545
object_cache: Arc<ObjectCache>,
4646
snapshot_schema: SchemaRef,
4747
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
48-
delete_file_index: Option<DeleteFileIndex>,
48+
delete_file_index: DeleteFileIndex,
4949
}
5050

5151
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +58,7 @@ pub(crate) struct ManifestEntryContext {
5858
pub bound_predicates: Option<Arc<BoundPredicates>>,
5959
pub partition_spec_id: i32,
6060
pub snapshot_schema: SchemaRef,
61-
pub delete_file_index: Option<DeleteFileIndex>,
61+
pub delete_file_index: DeleteFileIndex,
6262
}
6363

6464
impl ManifestFileContext {
@@ -105,16 +105,13 @@ impl ManifestEntryContext {
105105
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
106106
/// created from it
107107
pub(crate) async fn into_file_scan_task(self) -> Result<FileScanTask> {
108-
let deletes = if let Some(delete_file_index) = self.delete_file_index {
109-
delete_file_index
110-
.get_deletes_for_data_file(
111-
self.manifest_entry.data_file(),
112-
self.manifest_entry.sequence_number(),
113-
)
114-
.await?
115-
} else {
116-
vec![]
117-
};
108+
let deletes = self
109+
.delete_file_index
110+
.get_deletes_for_data_file(
111+
self.manifest_entry.data_file(),
112+
self.manifest_entry.sequence_number(),
113+
)
114+
.await?;
118115

119116
Ok(FileScanTask {
120117
start: 0,
@@ -188,24 +185,19 @@ impl PlanContext {
188185
&self,
189186
manifest_list: Arc<ManifestList>,
190187
tx_data: Sender<ManifestEntryContext>,
191-
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
188+
delete_file_idx: DeleteFileIndex,
189+
delete_file_tx: Sender<ManifestEntryContext>,
192190
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
193191
let manifest_files = manifest_list.entries().iter();
194192

195193
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
196194
let mut filtered_mfcs = vec![];
197195

198196
for manifest_file in manifest_files {
199-
let (delete_file_idx, tx) = if manifest_file.content == ManifestContentType::Deletes {
200-
let Some((delete_file_idx, tx)) = delete_file_idx_and_tx.as_ref() else {
201-
continue;
202-
};
203-
(Some(delete_file_idx.clone()), tx.clone())
197+
let tx = if manifest_file.content == ManifestContentType::Deletes {
198+
delete_file_tx.clone()
204199
} else {
205-
(
206-
delete_file_idx_and_tx.as_ref().map(|x| x.0.clone()),
207-
tx_data.clone(),
208-
)
200+
tx_data.clone()
209201
};
210202

211203
let partition_bound_predicate = if self.predicate.is_some() {
@@ -233,7 +225,7 @@ impl PlanContext {
233225
manifest_file,
234226
partition_bound_predicate,
235227
tx,
236-
delete_file_idx,
228+
delete_file_idx.clone(),
237229
);
238230

239231
filtered_mfcs.push(Ok(mfc));
@@ -247,7 +239,7 @@ impl PlanContext {
247239
manifest_file: &ManifestFile,
248240
partition_filter: Option<Arc<BoundPredicate>>,
249241
sender: Sender<ManifestEntryContext>,
250-
delete_file_index: Option<DeleteFileIndex>,
242+
delete_file_index: DeleteFileIndex,
251243
) -> ManifestFileContext {
252244
let bound_predicates =
253245
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =

crates/iceberg/src/scan/mod.rs

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -368,12 +368,7 @@ impl TableScan {
368368
// used to stream the results back to the caller
369369
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);
370370

371-
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
372-
if self.delete_file_processing_enabled {
373-
Some(DeleteFileIndex::new())
374-
} else {
375-
None
376-
};
371+
let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new();
377372

378373
let manifest_list = plan_context.get_manifest_list().await?;
379374

@@ -383,9 +378,8 @@ impl TableScan {
383378
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
384379
manifest_list,
385380
manifest_entry_data_ctx_tx,
386-
delete_file_idx_and_tx.as_ref().map(|(delete_file_idx, _)| {
387-
(delete_file_idx.clone(), manifest_entry_delete_ctx_tx)
388-
}),
381+
delete_file_idx.clone(),
382+
manifest_entry_delete_ctx_tx.clone(),
389383
)?;
390384

391385
let mut channel_for_manifest_error = file_scan_task_tx.clone();
@@ -405,33 +399,29 @@ impl TableScan {
405399

406400
let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone();
407401

408-
if let Some((_, delete_file_tx)) = delete_file_idx_and_tx {
409-
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
410-
411-
// Process the delete file [`ManifestEntry`] stream in parallel
412-
spawn(async move {
413-
let result = manifest_entry_delete_ctx_rx
414-
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
415-
.try_for_each_concurrent(
416-
concurrency_limit_manifest_entries,
417-
|(manifest_entry_context, tx)| async move {
418-
spawn(async move {
419-
Self::process_delete_manifest_entry(manifest_entry_context, tx)
420-
.await
421-
})
422-
.await
423-
},
424-
)
425-
.await;
402+
let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone();
426403

427-
if let Err(error) = result {
428-
let _ = channel_for_delete_manifest_entry_error
429-
.send(Err(error))
430-
.await;
431-
}
432-
})
433-
.await;
434-
}
404+
// Process the delete file [`ManifestEntry`] stream in parallel
405+
spawn(async move {
406+
let result = manifest_entry_delete_ctx_rx
407+
.map(|me_ctx| Ok((me_ctx, delete_file_tx.clone())))
408+
.try_for_each_concurrent(
409+
concurrency_limit_manifest_entries,
410+
|(manifest_entry_context, tx)| async move {
411+
spawn(async move {
412+
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
413+
})
414+
.await
415+
},
416+
)
417+
.await;
418+
419+
if let Err(error) = result {
420+
let _ = channel_for_delete_manifest_entry_error
421+
.send(Err(error))
422+
.await;
423+
}
424+
});
435425

436426
// Process the data file [`ManifestEntry`] stream in parallel
437427
spawn(async move {
@@ -461,7 +451,8 @@ impl TableScan {
461451
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
462452
.with_data_file_concurrency_limit(self.concurrency_limit_data_files)
463453
.with_row_group_filtering_enabled(self.row_group_filtering_enabled)
464-
.with_row_selection_enabled(self.row_selection_enabled);
454+
.with_row_selection_enabled(self.row_selection_enabled)
455+
.with_delete_file_support_enabled(self.delete_file_processing_enabled);
465456

466457
if let Some(batch_size) = self.batch_size {
467458
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);

crates/integration_tests/tests/shared_tests/read_positional_deletes.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ async fn test_read_table_with_positional_deletes() {
3939

4040
let scan = table
4141
.scan()
42-
.with_delete_file_processing_enabled(true)
42+
.with_delete_file_processing_enabled(false)
4343
.build()
4444
.unwrap();
4545
println!("{:?}", scan);
@@ -54,20 +54,14 @@ async fn test_read_table_with_positional_deletes() {
5454
println!("{:?}", plan);
5555

5656
// Scan plan phase should include delete files in file plan
57-
// when with_delete_file_processing_enabled == true
57+
// even when with_delete_file_processing_enabled == false
5858
assert_eq!(plan[0].deletes.len(), 2);
5959

60-
// 😱 If we don't support positional deletes, we should fail when we try to read a table that
61-
// has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py
6260
let result = scan.to_arrow().await.unwrap().try_collect::<Vec<_>>().await;
6361

62+
// `delete_file_processing_enabled == false` is propagated to the reader. Since
63+
// we have some delete files, this causes it to return an Err.
6464
assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported));
65-
66-
// When we get support for it:
67-
// let batch_stream = scan.to_arrow().await.unwrap();
68-
// let batches: Vec<_> = batch_stream.try_collect().await.is_err();
69-
// let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum();
70-
// assert_eq!(num_rows, 10);
7165
}
7266

7367
#[tokio::test]
@@ -99,8 +93,6 @@ async fn test_read_table_with_positional_deletes_with_delete_support_enabled() {
9993
.unwrap();
10094
println!("{:?}", plan);
10195

102-
// Scan plan phase should include delete files in file plan
103-
// when with_delete_file_processing_enabled == true
10496
assert_eq!(plan[0].deletes.len(), 2);
10597

10698
// we should see two rows deleted, returning 10 rows instead of 12

0 commit comments

Comments
 (0)