Skip to content

feat: add apply in transaction to support stack action #949

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ pub struct Table {
}

impl Table {
pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) {
self.metadata = metadata;
}

/// Returns a TableBuilder to build a table
pub fn builder() -> TableBuilder {
TableBuilder::new()
Expand Down
23 changes: 14 additions & 9 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<'a> FastAppendAction<'a> {
if !self
.snapshot_produce_action
.tx
.table
.current_table
.metadata()
.default_spec
.is_unpartitioned()
Expand All @@ -94,10 +94,10 @@ impl<'a> FastAppendAction<'a> {
));
}

let table_metadata = self.snapshot_produce_action.tx.table.metadata();
let table_metadata = self.snapshot_produce_action.tx.current_table.metadata();

let data_files = ParquetWriter::parquet_files_to_data_files(
self.snapshot_produce_action.tx.table.file_io(),
self.snapshot_produce_action.tx.current_table.file_io(),
file_path,
table_metadata,
)
Expand All @@ -122,7 +122,7 @@ impl<'a> FastAppendAction<'a> {
let mut manifest_stream = self
.snapshot_produce_action
.tx
.table
.current_table
.inspect()
.manifests()
.scan()
Expand Down Expand Up @@ -184,14 +184,19 @@ impl SnapshotProduceOperation for FastAppendOperation {
&self,
snapshot_produce: &SnapshotProduceAction<'_>,
) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
let Some(snapshot) = snapshot_produce
.tx
.current_table
.metadata()
.current_snapshot()
else {
return Ok(vec![]);
};

let manifest_list = snapshot
.load_manifest_list(
snapshot_produce.tx.table.file_io(),
&snapshot_produce.tx.table.metadata_ref(),
snapshot_produce.tx.current_table.file_io(),
&snapshot_produce.tx.current_table.metadata_ref(),
)
.await?;

Expand Down Expand Up @@ -253,11 +258,11 @@ mod tests {
assert_eq!(
vec![
TableRequirement::UuidMatch {
uuid: tx.table.metadata().uuid()
uuid: table.metadata().uuid()
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
snapshot_id: tx.table.metadata().current_snapshot_id
snapshot_id: table.metadata().current_snapshot_id
}
],
tx.requirements
Expand Down
101 changes: 65 additions & 36 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod sort_order;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::mem::discriminant;
use std::sync::Arc;

use uuid::Uuid;

Expand All @@ -37,7 +38,8 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat

/// Table transaction.
pub struct Transaction<'a> {
table: &'a Table,
base_table: &'a Table,
current_table: Table,
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
}
Expand All @@ -46,38 +48,60 @@ impl<'a> Transaction<'a> {
/// Creates a new transaction.
pub fn new(table: &'a Table) -> Self {
Self {
table,
base_table: table,
current_table: table.clone(),
updates: vec![],
requirements: vec![],
}
}

fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
for update in &updates {
for up in &self.updates {
if discriminant(up) == discriminant(update) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot apply update with same type at same time: {:?}",
update
),
));
}
}
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
for update in updates {
metadata_builder = update.clone().apply(metadata_builder)?;
}
self.updates.extend(updates);

self.current_table
.with_metadata(Arc::new(metadata_builder.build()?.metadata));

Ok(())
}

fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> Result<()> {
self.requirements.extend(requirements);
fn apply(
&mut self,
updates: Vec<TableUpdate>,
requirements: Vec<TableRequirement>,
) -> Result<()> {
for requirement in &requirements {
requirement.check(Some(self.current_table.metadata()))?;
}

self.update_table_metadata(&updates)?;

self.updates.extend(updates);

// For the requirements, it does not make sense to add a requirement more than once
// For example, you cannot assert that the current schema has two different IDs
for new_requirement in requirements {
if self
.requirements
.iter()
.map(discriminant)
.all(|d| d != discriminant(&new_requirement))
{
self.requirements.push(new_requirement);
}
}

// # TODO
// Support auto commit later.

Ok(())
}

/// Sets table to a new version.
pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> {
let current_version = self.table.metadata().format_version();
let current_version = self.current_table.metadata().format_version();
match current_version.cmp(&format_version) {
Ordering::Greater => {
return Err(Error::new(
Expand All @@ -89,7 +113,7 @@ impl<'a> Transaction<'a> {
));
}
Ordering::Less => {
self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?;
}
Ordering::Equal => {
// Do nothing.
Expand All @@ -100,7 +124,7 @@ impl<'a> Transaction<'a> {

/// Update table's property.
pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> {
self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?;
Ok(self)
}

Expand All @@ -116,7 +140,7 @@ impl<'a> Transaction<'a> {
};
let mut snapshot_id = generate_random_id();
while self
.table
.current_table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
Expand Down Expand Up @@ -152,14 +176,17 @@ impl<'a> Transaction<'a> {

/// Remove properties in table.
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
self.apply(
vec![TableUpdate::RemoveProperties { removals: keys }],
vec![],
)?;
Ok(self)
}

/// Commit transaction.
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.ident(self.base_table.identifier().clone())
.updates(self.updates)
.requirements(self.requirements)
.build();
Expand Down Expand Up @@ -308,19 +335,21 @@ mod tests {
);
}

#[test]
fn test_do_same_update_in_same_transaction() {
let table = make_v2_table();
#[tokio::test]
async fn test_transaction_apply_upgrade() {
let table = make_v1_table();
let tx = Transaction::new(&table);
let tx = tx
.remove_properties(vec!["a".to_string(), "b".to_string()])
.unwrap();

let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);

assert!(
tx.is_err(),
"Should not allow to do same kinds update in same transaction"
// Upgrade v1 to v1, do nothing.
let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap();
// Upgrade v1 to v2, success.
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
assert_eq!(
vec![TableUpdate::UpgradeFormatVersion {
format_version: FormatVersion::V2
}],
tx.updates
);
// Upgrade v2 to v1, return error.
assert!(tx.upgrade_table_version(FormatVersion::V1).is_err());
}
}
Loading
Loading