Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ impl<'a> Transaction<'a> {
Ok(self)
}

fn generate_unique_snapshot_id(&self) -> i64 {
/// Generate a unique snapshot id.
pub fn generate_unique_snapshot_id(&self) -> i64 {
let generate_random_id = || -> i64 {
let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
let snapshot_id = (lhs ^ rhs) as i64;
Expand All @@ -136,10 +137,26 @@ impl<'a> Transaction<'a> {
/// Creates a fast append action.
pub fn fast_append(
self,
snapshot_id: Option<i64>,
commit_uuid: Option<Uuid>,
key_metadata: Vec<u8>,
) -> Result<FastAppendAction<'a>> {
let snapshot_id = self.generate_unique_snapshot_id();
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
if self
.table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Snapshot id {} already exists", snapshot_id),
));
}
snapshot_id
} else {
self.generate_unique_snapshot_id()
};
FastAppendAction::new(
self,
snapshot_id,
Expand Down Expand Up @@ -894,7 +911,7 @@ mod tests {
async fn test_fast_append_action() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);
let mut action = tx.fast_append(None, vec![]).unwrap();
let mut action = tx.fast_append(None, None, vec![]).unwrap();

// check add data file with incompatible partition value
let data_file = DataFileBuilder::default()
Expand Down Expand Up @@ -1001,7 +1018,7 @@ mod tests {
format!("{}/3.parquet", &fixture.table_location),
];

let fast_append_action = tx.fast_append(None, vec![]).unwrap();
let fast_append_action = tx.fast_append(None, None, vec![]).unwrap();

// Attempt to add the existing Parquet files with fast append.
let new_tx = fast_append_action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async fn test_append_data_file() {

// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();
Expand All @@ -132,7 +132,7 @@ async fn test_append_data_file() {

// commit result again
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn test_append_partition_data_file() {

// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action
.add_data_files(data_file_valid.clone())
.unwrap();
Expand Down Expand Up @@ -178,7 +178,7 @@ async fn test_schema_incompatible_partition_type(
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();

let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
if append_action
.add_data_files(data_file_invalid.clone())
.is_ok()
Expand Down Expand Up @@ -217,7 +217,7 @@ async fn test_schema_incompatible_partition_fields(
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();

let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
if append_action
.add_data_files(data_file_invalid.clone())
.is_ok()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ async fn test_append_data_file_conflict() {

// start two transaction and commit one of them
let tx1 = Transaction::new(&table);
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
let mut append_action = tx1.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx1 = append_action.apply().await.unwrap();

let tx2 = Transaction::new(&table);
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
let mut append_action = tx2.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx2 = append_action.apply().await.unwrap();
let table = tx2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ async fn test_scan_all_type() {

// commit result
let tx = Transaction::new(&table);
let mut append_action = tx.fast_append(None, vec![]).unwrap();
let mut append_action = tx.fast_append(None, None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx = append_action.apply().await.unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();
Expand Down
Loading