Skip to content

Commit b5b944b

Browse files
jonathanc-nZENOTME
authored and
ZENOTME
committed
feat: Make duplicate check optional for adding parquet files (apache#1034)
## Which issue does this PR close? - Closes apache#1031. ## What changes are included in this PR? Added option for checking duplicates when adding parquet files.
1 parent 7091b88 commit b5b944b

File tree

1 file changed

+52
-42
lines changed

1 file changed

+52
-42
lines changed

crates/iceberg/src/transaction.rs

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ impl<'a> Transaction<'a> {
178178
/// FastAppendAction is a transaction action for fast append data files to the table.
179179
pub struct FastAppendAction<'a> {
180180
snapshot_produce_action: SnapshotProduceAction<'a>,
181+
check_duplicate: bool,
181182
}
182183

183184
impl<'a> FastAppendAction<'a> {
@@ -197,9 +198,16 @@ impl<'a> FastAppendAction<'a> {
197198
commit_uuid,
198199
snapshot_properties,
199200
)?,
201+
check_duplicate: true,
200202
})
201203
}
202204

205+
/// Set whether to check duplicate files
206+
pub fn with_check_duplicate(mut self, v: bool) -> Self {
207+
self.check_duplicate = v;
208+
self
209+
}
210+
203211
/// Add data files to the snapshot.
204212
pub fn add_data_files(
205213
&mut self,
@@ -243,51 +251,53 @@ impl<'a> FastAppendAction<'a> {
243251
/// Finished building the action and apply it to the transaction.
244252
pub async fn apply(self) -> Result<Transaction<'a>> {
245253
// Checks duplicate files
246-
let new_files: HashSet<&str> = self
247-
.snapshot_produce_action
248-
.added_data_files
249-
.iter()
250-
.map(|df| df.file_path.as_str())
251-
.collect();
252-
253-
let mut manifest_stream = self
254-
.snapshot_produce_action
255-
.tx
256-
.table
257-
.inspect()
258-
.manifests()
259-
.scan()
260-
.await?;
261-
let mut referenced_files = Vec::new();
262-
263-
while let Some(batch) = manifest_stream.try_next().await? {
264-
let file_path_array = batch
265-
.column(1)
266-
.as_any()
267-
.downcast_ref::<StringArray>()
268-
.ok_or_else(|| {
269-
Error::new(
270-
ErrorKind::DataInvalid,
271-
"Failed to downcast file_path column to StringArray",
272-
)
273-
})?;
274-
275-
for i in 0..batch.num_rows() {
276-
let file_path = file_path_array.value(i);
277-
if new_files.contains(file_path) {
278-
referenced_files.push(file_path.to_string());
254+
if self.check_duplicate {
255+
let new_files: HashSet<&str> = self
256+
.snapshot_produce_action
257+
.added_data_files
258+
.iter()
259+
.map(|df| df.file_path.as_str())
260+
.collect();
261+
262+
let mut manifest_stream = self
263+
.snapshot_produce_action
264+
.tx
265+
.table
266+
.inspect()
267+
.manifests()
268+
.scan()
269+
.await?;
270+
let mut referenced_files = Vec::new();
271+
272+
while let Some(batch) = manifest_stream.try_next().await? {
273+
let file_path_array = batch
274+
.column(1)
275+
.as_any()
276+
.downcast_ref::<StringArray>()
277+
.ok_or_else(|| {
278+
Error::new(
279+
ErrorKind::DataInvalid,
280+
"Failed to downcast file_path column to StringArray",
281+
)
282+
})?;
283+
284+
for i in 0..batch.num_rows() {
285+
let file_path = file_path_array.value(i);
286+
if new_files.contains(file_path) {
287+
referenced_files.push(file_path.to_string());
288+
}
279289
}
280290
}
281-
}
282291

283-
if !referenced_files.is_empty() {
284-
return Err(Error::new(
285-
ErrorKind::DataInvalid,
286-
format!(
287-
"Cannot add files that are already referenced by table, files: {}",
288-
referenced_files.join(", ")
289-
),
290-
));
292+
if !referenced_files.is_empty() {
293+
return Err(Error::new(
294+
ErrorKind::DataInvalid,
295+
format!(
296+
"Cannot add files that are already referenced by table, files: {}",
297+
referenced_files.join(", ")
298+
),
299+
));
300+
}
291301
}
292302

293303
self.snapshot_produce_action

0 commit comments

Comments
 (0)