From 19fe1989833bd8a2a04a607ab8b90dc196bd61b6 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 5 Mar 2025 15:46:08 +0800 Subject: [PATCH 1/5] add apply in transaction to support stack action --- crates/iceberg/src/transaction/mod.rs | 92 +++++++++++++++++++-------- 1 file changed, 67 insertions(+), 25 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index d3c7bc3f9..bfab3c05d 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -37,7 +37,8 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat /// Table transaction. pub struct Transaction<'a> { - table: &'a Table, + base_table: &'a Table, + current_metadata: TableMetadata, updates: Vec, requirements: Vec, } @@ -46,38 +47,59 @@ impl<'a> Transaction<'a> { /// Creates a new transaction. pub fn new(table: &'a Table) -> Self { Self { - table, + base_table: table, + current_metadata: table.metadata().clone(), updates: vec![], requirements: vec![], } } - fn append_updates(&mut self, updates: Vec) -> Result<()> { - for update in &updates { - for up in &self.updates { - if discriminant(up) == discriminant(update) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot apply update with same type at same time: {:?}", - update - ), - )); - } - } + fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { + let mut metadata_builder = self.current_metadata.clone().into_builder(None); + for update in updates { + metadata_builder = update.clone().apply(metadata_builder)?; } - self.updates.extend(updates); + + self.current_metadata = metadata_builder.build()?.metadata; + Ok(()) } - fn append_requirements(&mut self, requirements: Vec) -> Result<()> { - self.requirements.extend(requirements); + fn apply( + &mut self, + updates: Vec, + requirements: Vec, + ) -> Result<()> { + for requirement in &requirements { + requirement.check(Some(&self.current_metadata))?; + } + + self.update_table_metadata(&updates)?; + + self.updates.extend(updates); + + // For the requirements, it does not make sense to add a requirement more than once + // For example, you cannot assert that the current schema has two different IDs + for new_requirement in requirements { + if self + .requirements + .iter() + .map(discriminant) + .all(|d| d != discriminant(&new_requirement)) + { + self.requirements.push(new_requirement); + } + } + + // # TODO + // Support auto commit later. + Ok(()) } /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.table.metadata().format_version(); + let current_version = self.current_metadata.format_version(); match current_version.cmp(&format_version) { Ordering::Greater => { return Err(Error::new( @@ -89,7 +111,7 @@ impl<'a> Transaction<'a> { )); } Ordering::Less => { - self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; } Ordering::Equal => { // Do nothing. @@ -100,7 +122,7 @@ impl<'a> Transaction<'a> { /// Update table's property. pub fn set_properties(mut self, props: HashMap) -> Result { - self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; + self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; Ok(self) } @@ -116,8 +138,7 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .table - .metadata() + .current_metadata .snapshots() .any(|s| s.snapshot_id() == snapshot_id) { @@ -152,14 +173,17 @@ impl<'a> Transaction<'a> { /// Remove properties in table. pub fn remove_properties(mut self, keys: Vec) -> Result { - self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; + self.apply( + vec![TableUpdate::RemoveProperties { removals: keys }], + vec![], + )?; Ok(self) } /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { let table_commit = TableCommit::builder() - .ident(self.table.identifier().clone()) + .ident(self.base_table.identifier().clone()) .updates(self.updates) .requirements(self.requirements) .build(); @@ -323,4 +347,22 @@ mod tests { "Should not allow to do same kinds update in same transaction" ); } + + #[tokio::test] + async fn test_transaction_apply_upgrade() { + let table = make_v1_table(); + let tx = Transaction::new(&table); + // Upgrade v1 to v1, do nothing. + let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); + // Upgrade v1 to v2, success. + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + assert_eq!( + vec![TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2 + }], + tx.updates + ); + // Upgrade v2 to v1, return error. + assert!(tx.upgrade_table_version(FormatVersion::V1).is_err()); + } } From 59e37d80295f2385d59a24eac6c82ba8eddb4c91 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 10 Mar 2025 03:21:04 +0800 Subject: [PATCH 2/5] fix test --- crates/iceberg/src/transaction/mod.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index bfab3c05d..a4e97faad 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -332,22 +332,6 @@ mod tests { ); } - #[test] - fn test_do_same_update_in_same_transaction() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); - - assert!( - tx.is_err(), - "Should not allow to do same kinds update in same transaction" - ); - } - #[tokio::test] async fn test_transaction_apply_upgrade() { let table = make_v1_table(); From 5c6b6ad3fa64584f24f79b0f308016fe11c1b71f Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 12 Mar 2025 15:22:31 +0800 Subject: [PATCH 3/5] store current table instead of current metadata --- crates/iceberg/src/table.rs | 4 ++++ crates/iceberg/src/transaction/mod.rs | 16 +++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f4..d910b5c8f 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -162,6 +162,10 @@ pub struct Table { } impl Table { + pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) { + self.metadata = metadata; + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index a4e97faad..6b46ff32a 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -38,7 +38,7 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat /// Table transaction. pub struct Transaction<'a> { base_table: &'a Table, - current_metadata: TableMetadata, + current_table: Table, updates: Vec, requirements: Vec, } @@ -48,19 +48,20 @@ impl<'a> Transaction<'a> { pub fn new(table: &'a Table) -> Self { Self { base_table: table, - current_metadata: table.metadata().clone(), + current_table: table.clone(), updates: vec![], requirements: vec![], } } fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { - let mut metadata_builder = self.current_metadata.clone().into_builder(None); + let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); for update in updates { metadata_builder = update.clone().apply(metadata_builder)?; } - self.current_metadata = metadata_builder.build()?.metadata; + self.current_table + .with_metadata(Arc::new(metadata_builder.build()?.metadata)); Ok(()) } @@ -71,7 +72,7 @@ impl<'a> Transaction<'a> { requirements: Vec, ) -> Result<()> { for requirement in &requirements { - requirement.check(Some(&self.current_metadata))?; + requirement.check(Some(self.current_table.metadata()))?; } self.update_table_metadata(&updates)?; @@ -99,7 +100,7 @@ impl<'a> Transaction<'a> { /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.current_metadata.format_version(); + let current_version = self.current_table.metadata().format_version(); match current_version.cmp(&format_version) { Ordering::Greater => { return Err(Error::new( @@ -138,7 +139,8 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .current_metadata + .current_table + .metadata() .snapshots() .any(|s| s.snapshot_id() == snapshot_id) { From e264f948a083fc080026285658a6a8cce6b4e781 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 18 Mar 2025 18:36:52 +0800 Subject: [PATCH 4/5] fix rebase --- crates/iceberg/src/transaction/append.rs | 23 +++--- crates/iceberg/src/transaction/mod.rs | 1 + crates/iceberg/src/transaction/snapshot.rs | 85 +++++++++++--------- crates/iceberg/src/transaction/sort_order.rs | 20 +++-- 4 files changed, 77 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index a4e7695ce..160bc1263 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -78,7 +78,7 @@ impl<'a> FastAppendAction<'a> { if !self .snapshot_produce_action .tx - .table + .current_table .metadata() .default_spec .is_unpartitioned() @@ -89,10 +89,10 @@ impl<'a> FastAppendAction<'a> { )); } - let table_metadata = self.snapshot_produce_action.tx.table.metadata(); + let table_metadata = self.snapshot_produce_action.tx.current_table.metadata(); let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.table.file_io(), + self.snapshot_produce_action.tx.current_table.file_io(), file_path, table_metadata, ) @@ -117,7 +117,7 @@ impl<'a> FastAppendAction<'a> { let mut manifest_stream = self .snapshot_produce_action .tx - .table + .current_table .inspect() .manifests() .scan() @@ -179,14 +179,19 @@ impl SnapshotProduceOperation for FastAppendOperation { &self, snapshot_produce: &SnapshotProduceAction<'_>, ) -> Result> { - let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + let Some(snapshot) = snapshot_produce + .tx + .current_table + .metadata() + .current_snapshot() + else { return Ok(vec![]); }; let manifest_list = snapshot .load_manifest_list( - snapshot_produce.tx.table.file_io(), - &snapshot_produce.tx.table.metadata_ref(), + snapshot_produce.tx.current_table.file_io(), + &snapshot_produce.tx.current_table.metadata_ref(), ) .await?; @@ -248,11 +253,11 @@ mod tests { assert_eq!( vec![ TableRequirement::UuidMatch { - uuid: tx.table.metadata().uuid() + uuid: table.metadata().uuid() }, TableRequirement::RefSnapshotIdMatch { r#ref: MAIN_BRANCH.to_string(), - snapshot_id: tx.table.metadata().current_snapshot_id + snapshot_id: table.metadata().current_snapshot_id } ], tx.requirements diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 6b46ff32a..6ae25775b 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -24,6 +24,7 @@ mod sort_order; use std::cmp::Ordering; use std::collections::HashMap; use std::mem::discriminant; +use std::sync::Arc; use uuid::Uuid; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index e1a2fa0d3..71ddd9d5a 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -137,7 +137,9 @@ impl<'a> SnapshotProduceAction<'a> { )); } // Check if the data file partition spec id matches the table default partition spec id. - if self.tx.table.metadata().default_partition_spec_id() != data_file.partition_spec_id { + if self.tx.current_table.metadata().default_partition_spec_id() + != data_file.partition_spec_id + { return Err(Error::new( ErrorKind::DataInvalid, "Data file partition spec id does not match table default partition spec id", @@ -145,7 +147,7 @@ impl<'a> SnapshotProduceAction<'a> { } Self::validate_partition_value( data_file.partition(), - self.tx.table.metadata().default_partition_type(), + self.tx.current_table.metadata().default_partition_type(), )?; } self.added_data_files.extend(data_files); @@ -155,24 +157,28 @@ impl<'a> SnapshotProduceAction<'a> { fn new_manifest_output(&mut self) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.table.metadata().location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx.table.file_io().new_output(new_manifest_path) + self.tx + .current_table + .file_io() + .new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. async fn write_added_manifest(&mut self) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); let snapshot_id = self.snapshot_id; + let format_version = self.tx.current_table.metadata().format_version(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if format_version == FormatVersion::V1 { builder.snapshot_id(snapshot_id).build() } else { // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when @@ -185,15 +191,15 @@ impl<'a> SnapshotProduceAction<'a> { self.new_manifest_output()?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.table.metadata().current_schema().clone(), + self.tx.current_table.metadata().current_schema().clone(), self.tx - .table + .current_table .metadata() .default_partition_spec() .as_ref() .clone(), ); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { builder.build_v2_data() @@ -233,7 +239,7 @@ impl<'a> SnapshotProduceAction<'a> { fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.table.metadata().location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -251,28 +257,28 @@ impl<'a> SnapshotProduceAction<'a> { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.table.metadata().next_sequence_number(); + let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); let summary = self.summary(&snapshot_produce_operation); let manifest_list_path = self.generate_manifest_list_file_path(0); - let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.tx - .table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( self.tx - .table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -283,34 +289,37 @@ impl<'a> SnapshotProduceAction<'a> { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.table.metadata().current_schema_id()) + .with_schema_id(self.tx.current_table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.append_updates(vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ])?; - self.tx.append_requirements(vec![ - TableRequirement::UuidMatch { - uuid: self.tx.table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.table.metadata().current_snapshot_id(), - }, - ])?; + self.tx.apply( + vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ], + vec![ + TableRequirement::UuidMatch { + uuid: self.tx.current_table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), + }, + ], + )?; + Ok(self.tx) } } diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index 4f21eef07..51012dca1 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -52,15 +52,25 @@ impl<'a> ReplaceSortOrderAction<'a> { let requirements = vec![ TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.table.metadata().current_schema().schema_id(), + current_schema_id: self + .tx + .current_table + .metadata() + .current_schema() + .schema_id(), }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, + default_sort_order_id: self + .tx + .current_table + .metadata() + .default_sort_order() + .order_id, }, ]; - self.tx.append_requirements(requirements)?; - self.tx.append_updates(updates)?; + self.tx.apply(updates, requirements)?; + Ok(self.tx) } @@ -72,7 +82,7 @@ impl<'a> ReplaceSortOrderAction<'a> { ) -> Result { let field_id = self .tx - .table + .current_table .metadata() .current_schema() .field_id_by_name(name) From 27457eaf3088ccfe11ca8d2882ca3c06854185d0 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Tue, 22 Apr 2025 22:15:19 +0800 Subject: [PATCH 5/5] fix after merge --- crates/iceberg/src/transaction/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 7eedda7e4..ee9721c16 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -233,7 +233,7 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); - let table_metadata = self.tx.table.metadata_ref(); + let table_metadata = self.tx.current_table.metadata_ref(); let partition_summary_limit = if let Some(limit) = table_metadata .properties()