Skip to content

Commit 0444802

Browse files
committed
allow specify snapshot id for fast append (#25)(#14)
Signed-off-by: xxchan <[email protected]>
1 parent fb82cee commit 0444802

File tree

5 files changed

+25
-9
lines changed

5 files changed

+25
-9
lines changed

crates/iceberg/src/transaction/mod.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,26 @@ impl<'a> Transaction<'a> {
129129
/// Creates a fast append action.
130130
pub fn fast_append(
131131
self,
132+
snapshot_id: Option<i64>,
132133
commit_uuid: Option<Uuid>,
133134
key_metadata: Vec<u8>,
134135
) -> Result<FastAppendAction<'a>> {
135-
let snapshot_id = self.generate_unique_snapshot_id();
136+
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
137+
if self
138+
.table
139+
.metadata()
140+
.snapshots()
141+
.any(|s| s.snapshot_id() == snapshot_id)
142+
{
143+
return Err(Error::new(
144+
ErrorKind::DataInvalid,
145+
format!("Snapshot id {} already exists", snapshot_id),
146+
));
147+
}
148+
snapshot_id
149+
} else {
150+
self.generate_unique_snapshot_id()
151+
};
136152
FastAppendAction::new(
137153
self,
138154
snapshot_id,

crates/integration_tests/tests/shared_tests/append_data_file_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ async fn test_append_data_file() {
112112

113113
// commit result
114114
let tx = Transaction::new(&table);
115-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
115+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
116116
append_action.add_data_files(data_file.clone()).unwrap();
117117
let tx = append_action.apply().await.unwrap();
118118
let table = tx.commit(&rest_catalog).await.unwrap();
@@ -132,7 +132,7 @@ async fn test_append_data_file() {
132132

133133
// commit result again
134134
let tx = Transaction::new(&table);
135-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
135+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
136136
append_action.add_data_files(data_file.clone()).unwrap();
137137
let tx = append_action.apply().await.unwrap();
138138
let table = tx.commit(&rest_catalog).await.unwrap();

crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ async fn test_append_partition_data_file() {
119119

120120
// commit result
121121
let tx = Transaction::new(&table);
122-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
122+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
123123
append_action
124124
.add_data_files(data_file_valid.clone())
125125
.unwrap();
@@ -178,7 +178,7 @@ async fn test_schema_incompatible_partition_type(
178178
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
179179

180180
let tx = Transaction::new(&table);
181-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
181+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
182182
if append_action
183183
.add_data_files(data_file_invalid.clone())
184184
.is_ok()
@@ -217,7 +217,7 @@ async fn test_schema_incompatible_partition_fields(
217217
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
218218

219219
let tx = Transaction::new(&table);
220-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
220+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
221221
if append_action
222222
.add_data_files(data_file_invalid.clone())
223223
.is_ok()

crates/integration_tests/tests/shared_tests/conflict_commit_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ async fn test_append_data_file_conflict() {
9090

9191
// start two transaction and commit one of them
9292
let tx1 = Transaction::new(&table);
93-
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
93+
let mut append_action = tx1.fast_append(None, None, vec![]).unwrap();
9494
append_action.add_data_files(data_file.clone()).unwrap();
9595
let tx1 = append_action.apply().await.unwrap();
9696

9797
let tx2 = Transaction::new(&table);
98-
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
98+
let mut append_action = tx2.fast_append(None, None, vec![]).unwrap();
9999
append_action.add_data_files(data_file.clone()).unwrap();
100100
let tx2 = append_action.apply().await.unwrap();
101101
let table = tx2

crates/integration_tests/tests/shared_tests/scan_all_type.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ async fn test_scan_all_type() {
309309

310310
// commit result
311311
let tx = Transaction::new(&table);
312-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
312+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
313313
append_action.add_data_files(data_file.clone()).unwrap();
314314
let tx = append_action.apply().await.unwrap();
315315
let table = tx.commit(&rest_catalog).await.unwrap();

0 commit comments

Comments
 (0)