Skip to content

Commit f0150d5

Browse files
authored
feat: Make duplicate check optional for adding parquet files (#1034)
## Which issue does this PR close? - Closes #1031. ## What changes are included in this PR? Added option for checking duplicates when adding parquet files.
1 parent 44f4396 commit f0150d5

File tree

1 file changed

+52
-42
lines changed

1 file changed

+52
-42
lines changed

crates/iceberg/src/transaction.rs

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ impl<'a> Transaction<'a> {
177177
/// FastAppendAction is a transaction action for fast append data files to the table.
178178
pub struct FastAppendAction<'a> {
179179
snapshot_produce_action: SnapshotProduceAction<'a>,
180+
check_duplicate: bool,
180181
}
181182

182183
impl<'a> FastAppendAction<'a> {
@@ -196,9 +197,16 @@ impl<'a> FastAppendAction<'a> {
196197
commit_uuid,
197198
snapshot_properties,
198199
)?,
200+
check_duplicate: true,
199201
})
200202
}
201203

204+
/// Set whether to check duplicate files
205+
pub fn with_check_duplicate(mut self, v: bool) -> Self {
206+
self.check_duplicate = v;
207+
self
208+
}
209+
202210
/// Add data files to the snapshot.
203211
pub fn add_data_files(
204212
&mut self,
@@ -242,51 +250,53 @@ impl<'a> FastAppendAction<'a> {
242250
/// Finished building the action and apply it to the transaction.
243251
pub async fn apply(self) -> Result<Transaction<'a>> {
244252
// Checks duplicate files
245-
let new_files: HashSet<&str> = self
246-
.snapshot_produce_action
247-
.added_data_files
248-
.iter()
249-
.map(|df| df.file_path.as_str())
250-
.collect();
251-
252-
let mut manifest_stream = self
253-
.snapshot_produce_action
254-
.tx
255-
.table
256-
.inspect()
257-
.manifests()
258-
.scan()
259-
.await?;
260-
let mut referenced_files = Vec::new();
261-
262-
while let Some(batch) = manifest_stream.try_next().await? {
263-
let file_path_array = batch
264-
.column(1)
265-
.as_any()
266-
.downcast_ref::<StringArray>()
267-
.ok_or_else(|| {
268-
Error::new(
269-
ErrorKind::DataInvalid,
270-
"Failed to downcast file_path column to StringArray",
271-
)
272-
})?;
273-
274-
for i in 0..batch.num_rows() {
275-
let file_path = file_path_array.value(i);
276-
if new_files.contains(file_path) {
277-
referenced_files.push(file_path.to_string());
253+
if self.check_duplicate {
254+
let new_files: HashSet<&str> = self
255+
.snapshot_produce_action
256+
.added_data_files
257+
.iter()
258+
.map(|df| df.file_path.as_str())
259+
.collect();
260+
261+
let mut manifest_stream = self
262+
.snapshot_produce_action
263+
.tx
264+
.table
265+
.inspect()
266+
.manifests()
267+
.scan()
268+
.await?;
269+
let mut referenced_files = Vec::new();
270+
271+
while let Some(batch) = manifest_stream.try_next().await? {
272+
let file_path_array = batch
273+
.column(1)
274+
.as_any()
275+
.downcast_ref::<StringArray>()
276+
.ok_or_else(|| {
277+
Error::new(
278+
ErrorKind::DataInvalid,
279+
"Failed to downcast file_path column to StringArray",
280+
)
281+
})?;
282+
283+
for i in 0..batch.num_rows() {
284+
let file_path = file_path_array.value(i);
285+
if new_files.contains(file_path) {
286+
referenced_files.push(file_path.to_string());
287+
}
278288
}
279289
}
280-
}
281290

282-
if !referenced_files.is_empty() {
283-
return Err(Error::new(
284-
ErrorKind::DataInvalid,
285-
format!(
286-
"Cannot add files that are already referenced by table, files: {}",
287-
referenced_files.join(", ")
288-
),
289-
));
291+
if !referenced_files.is_empty() {
292+
return Err(Error::new(
293+
ErrorKind::DataInvalid,
294+
format!(
295+
"Cannot add files that are already referenced by table, files: {}",
296+
referenced_files.join(", ")
297+
),
298+
));
299+
}
290300
}
291301

292302
self.snapshot_produce_action

0 commit comments

Comments
 (0)