From e56760c1a7faecdbfc5260deafe7ec17c72fab51 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 4 Mar 2025 15:20:04 +0800 Subject: [PATCH 1/2] allow specify snapshot id for fast append --- crates/iceberg/src/transaction.rs | 12 +++++++++--- .../integration_tests/tests/append_data_file_test.rs | 4 ++-- .../tests/append_partition_data_file_test.rs | 6 +++--- .../integration_tests/tests/conflict_commit_test.rs | 4 ++-- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 5dac1ce5a5..099cf9b9e0 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -108,7 +108,8 @@ impl<'a> Transaction<'a> { Ok(self) } - fn generate_unique_snapshot_id(&self) -> i64 { + /// Generate a unique snapshot id. + pub fn generate_unique_snapshot_id(&self) -> i64 { let generate_random_id = || -> i64 { let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); let snapshot_id = (lhs ^ rhs) as i64; @@ -133,10 +134,15 @@ impl<'a> Transaction<'a> { /// Creates a fast append action. pub fn fast_append( self, + snapshot_id: Option, commit_uuid: Option, key_metadata: Vec, ) -> Result> { - let snapshot_id = self.generate_unique_snapshot_id(); + let snapshot_id = if let Some(snapshot_id) = snapshot_id { + snapshot_id + } else { + self.generate_unique_snapshot_id() + }; FastAppendAction::new( self, snapshot_id, @@ -819,7 +825,7 @@ mod tests { async fn test_fast_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table); - let mut action = tx.fast_append(None, vec![]).unwrap(); + let mut action = tx.fast_append(None, None, vec![]).unwrap(); // check add data file with incompatible partition value let data_file = DataFileBuilder::default() diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/append_data_file_test.rs index 60d4f04c68..13b831b99a 100644 --- a/crates/integration_tests/tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/append_data_file_test.rs @@ -134,7 +134,7 @@ async fn test_append_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&fixture.rest_catalog).await.unwrap(); @@ -154,7 +154,7 @@ async fn test_append_data_file() { // commit result again let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx = append_action.apply().await.unwrap(); let table = tx.commit(&fixture.rest_catalog).await.unwrap(); diff --git a/crates/integration_tests/tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/append_partition_data_file_test.rs index 103021532f..04f474ea8b 100644 --- a/crates/integration_tests/tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/append_partition_data_file_test.rs @@ -143,7 +143,7 @@ async fn test_append_partition_data_file() { // commit result let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); append_action .add_data_files(data_file_valid.clone()) .unwrap(); @@ -202,7 +202,7 @@ async fn test_schema_incompatible_partition_type( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) .is_ok() @@ -241,7 +241,7 @@ async fn test_schema_incompatible_partition_fields( let data_file_invalid = data_file_writer_invalid.close().await.unwrap(); let tx = Transaction::new(&table); - let mut append_action = tx.fast_append(None, vec![]).unwrap(); + let mut append_action = tx.fast_append(None, None, vec![]).unwrap(); if append_action .add_data_files(data_file_invalid.clone()) .is_ok() diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index 52575d1ce7..002f6e5c98 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -112,11 +112,11 @@ async fn test_append_data_file_conflict() { // start two transaction and commit one of them let tx1 = Transaction::new(&table); - let mut append_action = tx1.fast_append(None, vec![]).unwrap(); + let mut append_action = tx1.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx1 = append_action.apply().await.unwrap(); let tx2 = Transaction::new(&table); - let mut append_action = tx2.fast_append(None, vec![]).unwrap(); + let mut append_action = tx2.fast_append(None, None, vec![]).unwrap(); append_action.add_data_files(data_file.clone()).unwrap(); let tx2 = append_action.apply().await.unwrap(); let table = tx2 From 67ba562f498b2c3017b6ec686486f06e0f1785d9 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 4 Mar 2025 15:32:01 +0800 Subject: [PATCH 2/2] fix --- crates/iceberg/src/transaction.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 099cf9b9e0..e4b9b432c5 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -139,6 +139,17 @@ impl<'a> Transaction<'a> { key_metadata: Vec, ) -> Result> { let snapshot_id = if let Some(snapshot_id) = snapshot_id { + if self + .table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot id {} already exists", snapshot_id), + )); + } snapshot_id } else { self.generate_unique_snapshot_id()