Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ impl<'a> Transaction<'a> {
Ok(self)
}

fn generate_unique_snapshot_id(&self) -> i64 {
/// Generate a unique snapshot id.
pub fn generate_unique_snapshot_id(&self) -> i64 {
let generate_random_id = || -> i64 {
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
let snapshot_id = (lhs ^ rhs) as i64;
Expand All @@ -133,10 +134,26 @@ impl<'a> Transaction<'a> {
/// Creates a fast append action.
pub fn fast_append(
self,
snapshot_id: Option<i64>,
commit_uuid: Option<Uuid>,
key_metadata: Vec<u8>,
) -> Result<FastAppendAction<'a>> {
let snapshot_id = self.generate_unique_snapshot_id();
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
if self
.table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Snapshot id {} already exists", snapshot_id),
));
}
snapshot_id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ensure the snapshot id is unique in snapshots. Otherwise we should return the error and let client to regenerate it.

fn generate_unique_snapshot_id(&self) -> i64 {
       ...
        let mut snapshot_id = generate_random_id();
        while self
            .table
            .metadata()
            .snapshots()
            .any(|s| s.snapshot_id() == snapshot_id)
        {
            snapshot_id = generate_random_id();
        }
        snapshot_id
    }

} else {
self.generate_unique_snapshot_id()
};
FastAppendAction::new(
self,
snapshot_id,
Expand Down Expand Up @@ -819,7 +836,7 @@ mod tests {
async fn test_fast_append_action() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let mut action = tx.fast_append(None, vec![]).unwrap();
let mut action = tx.fast_append(None, None, vec![]).unwrap();

// check add data file with incompatible partition value
let data_file = DataFileBuilder::default()
Expand Down
4 changes: 2 additions & 2 deletions crates/integration_tests/tests/append_data_file_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async fn test_append_data_file() {

// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
Expand All @@ -154,7 +154,7 @@ async fn test_append_data_file() {

// commit result again
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async fn test_append_partition_data_file() {

// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action
.add_data_files(data_file_valid.clone())
.unwrap();
Expand Down Expand Up @@ -202,7 +202,7 @@ async fn test_schema_incompatible_partition_type(
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();

let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
if append_action
.add_data_files(data_file_invalid.clone())
.is_ok()
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn test_schema_incompatible_partition_fields(
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();

let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
if append_action
.add_data_files(data_file_invalid.clone())
.is_ok()
Expand Down
4 changes: 2 additions & 2 deletions crates/integration_tests/tests/conflict_commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ async fn test_append_data_file_conflict() {

// start two transaction and commit one of them
let tx1 = Transaction::new(&table);
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
let mut append_action = tx1.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx1 = append_action.apply().await.unwrap();
let tx2 = Transaction::new(&table);
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
let mut append_action = tx2.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx2 = append_action.apply().await.unwrap();
let table = tx2
Expand Down