Skip to content

Commit bc067f0

Browse files
authored
allow specify snapshot id for fast append (#14)
* allow specify snapshot id for fast append
1 parent 683fb89 commit bc067f0

File tree

4 files changed

+27
-10
lines changed

4 files changed

+27
-10
lines changed

crates/iceberg/src/transaction.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ impl<'a> Transaction<'a> {
108108
Ok(self)
109109
}
110110

111-
fn generate_unique_snapshot_id(&self) -> i64 {
111+
/// Generate a unique snapshot id.
112+
pub fn generate_unique_snapshot_id(&self) -> i64 {
112113
let generate_random_id = || -> i64 {
113114
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
114115
let snapshot_id = (lhs ^ rhs) as i64;
@@ -133,10 +134,26 @@ impl<'a> Transaction<'a> {
133134
/// Creates a fast append action.
134135
pub fn fast_append(
135136
self,
137+
snapshot_id: Option<i64>,
136138
commit_uuid: Option<Uuid>,
137139
key_metadata: Vec<u8>,
138140
) -> Result<FastAppendAction<'a>> {
139-
let snapshot_id = self.generate_unique_snapshot_id();
141+
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
142+
if self
143+
.table
144+
.metadata()
145+
.snapshots()
146+
.any(|s| s.snapshot_id() == snapshot_id)
147+
{
148+
return Err(Error::new(
149+
ErrorKind::DataInvalid,
150+
format!("Snapshot id {} already exists", snapshot_id),
151+
));
152+
}
153+
snapshot_id
154+
} else {
155+
self.generate_unique_snapshot_id()
156+
};
140157
FastAppendAction::new(
141158
self,
142159
snapshot_id,
@@ -819,7 +836,7 @@ mod tests {
819836
async fn test_fast_append_action() {
820837
let table = make_v2_minimal_table();
821838
let tx = Transaction::new(&table);
822-
let mut action = tx.fast_append(None, vec![]).unwrap();
839+
let mut action = tx.fast_append(None, None, vec![]).unwrap();
823840

824841
// check add data file with incompatible partition value
825842
let data_file = DataFileBuilder::default()

crates/integration_tests/tests/append_data_file_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ async fn test_append_data_file() {
134134

135135
// commit result
136136
let tx = Transaction::new(&table);
137-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
137+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
138138
append_action.add_data_files(data_file.clone()).unwrap();
139139
let tx = append_action.apply().await.unwrap();
140140
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
@@ -154,7 +154,7 @@ async fn test_append_data_file() {
154154

155155
// commit result again
156156
let tx = Transaction::new(&table);
157-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
157+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
158158
append_action.add_data_files(data_file.clone()).unwrap();
159159
let tx = append_action.apply().await.unwrap();
160160
let table = tx.commit(&fixture.rest_catalog).await.unwrap();

crates/integration_tests/tests/append_partition_data_file_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ async fn test_append_partition_data_file() {
143143

144144
// commit result
145145
let tx = Transaction::new(&table);
146-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
146+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
147147
append_action
148148
.add_data_files(data_file_valid.clone())
149149
.unwrap();
@@ -202,7 +202,7 @@ async fn test_schema_incompatible_partition_type(
202202
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
203203

204204
let tx = Transaction::new(&table);
205-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
205+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
206206
if append_action
207207
.add_data_files(data_file_invalid.clone())
208208
.is_ok()
@@ -241,7 +241,7 @@ async fn test_schema_incompatible_partition_fields(
241241
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
242242

243243
let tx = Transaction::new(&table);
244-
let mut append_action = tx.fast_append(None, vec![]).unwrap();
244+
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
245245
if append_action
246246
.add_data_files(data_file_invalid.clone())
247247
.is_ok()

crates/integration_tests/tests/conflict_commit_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,11 @@ async fn test_append_data_file_conflict() {
112112

113113
// start two transaction and commit one of them
114114
let tx1 = Transaction::new(&table);
115-
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
115+
let mut append_action = tx1.fast_append(None, None, vec![]).unwrap();
116116
append_action.add_data_files(data_file.clone()).unwrap();
117117
let tx1 = append_action.apply().await.unwrap();
118118
let tx2 = Transaction::new(&table);
119-
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
119+
let mut append_action = tx2.fast_append(None, None, vec![]).unwrap();
120120
append_action.add_data_files(data_file.clone()).unwrap();
121121
let tx2 = append_action.apply().await.unwrap();
122122
let table = tx2

0 commit comments

Comments
 (0)