diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 007a3745f0..f24ad55ec9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -111,7 +111,8 @@ impl<'a> Transaction<'a> { Ok(self) } - fn generate_unique_snapshot_id(&self) -> i64 { + /// Generate a unique snapshot id. + pub fn generate_unique_snapshot_id(&self) -> i64 { let generate_random_id = || -> i64 { let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); let snapshot_id = (lhs ^ rhs) as i64; @@ -136,10 +137,26 @@ impl<'a> Transaction<'a> { /// Creates a fast append action. pub fn fast_append( self, + snapshot_id: Option, commit_uuid: Option, key_metadata: Vec, ) -> Result> { - let snapshot_id = self.generate_unique_snapshot_id(); + let snapshot_id = if let Some(snapshot_id) = snapshot_id { + if self + .table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot id {} already exists", snapshot_id), + )); + } + snapshot_id + } else { + self.generate_unique_snapshot_id() + }; FastAppendAction::new( self, snapshot_id, @@ -894,7 +911,7 @@ mod tests { async fn test_fast_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); + let mut action = tx.fast_append(None, None, vec![]).unwrap(); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() @@ -1001,7 +1018,7 @@ mod tests { format!("{}/3.parquet", &fixture.table_location), ]; - let fast_append_action = tx.fast_append(None, vec![]).unwrap(); + let fast_append_action = tx.fast_append(None, None, vec![]).unwrap(); // Attempt to add the existing Parquet files with fast append. let new_tx = fast_append_action diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index a24b886349..c8d1a98599 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -112,7 +112,7 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); @@ -132,7 +132,7 @@ async fn test_append_data_file() { // commit result again let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 42cc596f54..e164a7ad16 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -119,7 +119,7 @@ async fn test_append_partition_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action .add_data_files(data_file_valid.clone()) .unwrap(); @@ -178,7 +178,7 @@ async fn test_schema_incompatible_partition_type( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) .is_ok() @@ -217,7 +217,7 @@ async fn test_schema_incompatible_partition_fields( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) .is_ok() diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 0b4d9785dc..9f46866201 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -90,12 +90,12 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); - let mut append_action = tx1.fast_append(None, vec![]).unwrap(); + let mut append_action = tx1.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx1 = append_action.apply().await.unwrap(); let tx2 = Transaction::new(&table); - let mut append_action = tx2.fast_append(None, vec![]).unwrap(); + let mut append_action = tx2.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx2 = append_action.apply().await.unwrap(); let table = tx2 diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 673a78ac03..3f518cbe1e 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -309,7 +309,7 @@ async fn test_scan_all_type() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&rest_catalog).await.unwrap();