Skip to content

Commit f24c02a

Browse files
ZENOTMEZENOTME
authored andcommitted
feat: add apply in transaction to support stack action (#22)
* add apply in transaction to support stack action * fix test * store current table instead of current metadata --------- Co-authored-by: ZENOTME <[email protected]>
1 parent 0444802 commit f24c02a

File tree

5 files changed

+145
-92
lines changed

5 files changed

+145
-92
lines changed

crates/iceberg/src/table.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ pub struct Table {
162162
}
163163

164164
impl Table {
165+
pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) {
166+
self.metadata = metadata;
167+
}
168+
165169
/// Returns a TableBuilder to build a table
166170
pub fn builder() -> TableBuilder {
167171
TableBuilder::new()

crates/iceberg/src/transaction/append.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ impl<'a> FastAppendAction<'a> {
7878
if !self
7979
.snapshot_produce_action
8080
.tx
81-
.table
81+
.current_table
8282
.metadata()
8383
.default_spec
8484
.is_unpartitioned()
@@ -89,10 +89,10 @@ impl<'a> FastAppendAction<'a> {
8989
));
9090
}
9191

92-
let table_metadata = self.snapshot_produce_action.tx.table.metadata();
92+
let table_metadata = self.snapshot_produce_action.tx.current_table.metadata();
9393

9494
let data_files = ParquetWriter::parquet_files_to_data_files(
95-
self.snapshot_produce_action.tx.table.file_io(),
95+
self.snapshot_produce_action.tx.current_table.file_io(),
9696
file_path,
9797
table_metadata,
9898
)
@@ -117,7 +117,7 @@ impl<'a> FastAppendAction<'a> {
117117
let mut manifest_stream = self
118118
.snapshot_produce_action
119119
.tx
120-
.table
120+
.current_table
121121
.inspect()
122122
.manifests()
123123
.scan()
@@ -179,14 +179,19 @@ impl SnapshotProduceOperation for FastAppendOperation {
179179
&self,
180180
snapshot_produce: &SnapshotProduceAction<'_>,
181181
) -> Result<Vec<ManifestFile>> {
182-
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
182+
let Some(snapshot) = snapshot_produce
183+
.tx
184+
.current_table
185+
.metadata()
186+
.current_snapshot()
187+
else {
183188
return Ok(vec![]);
184189
};
185190

186191
let manifest_list = snapshot
187192
.load_manifest_list(
188-
snapshot_produce.tx.table.file_io(),
189-
&snapshot_produce.tx.table.metadata_ref(),
193+
snapshot_produce.tx.current_table.file_io(),
194+
snapshot_produce.tx.current_table.metadata(),
190195
)
191196
.await?;
192197

@@ -213,7 +218,7 @@ mod tests {
213218
async fn test_fast_append_action() {
214219
let table = make_v2_minimal_table();
215220
let tx = Transaction::new(&table);
216-
let mut action = tx.fast_append(None, vec![]).unwrap();
221+
let mut action = tx.fast_append(None, None, vec![]).unwrap();
217222

218223
// check add data file with incompatible partition value
219224
let data_file = DataFileBuilder::default()
@@ -246,11 +251,11 @@ mod tests {
246251
assert_eq!(
247252
vec![
248253
TableRequirement::UuidMatch {
249-
uuid: tx.table.metadata().uuid()
254+
uuid: table.metadata().uuid()
250255
},
251256
TableRequirement::RefSnapshotIdMatch {
252257
r#ref: MAIN_BRANCH.to_string(),
253-
snapshot_id: tx.table.metadata().current_snapshot_id
258+
snapshot_id: table.metadata().current_snapshot_id()
254259
}
255260
],
256261
tx.requirements
@@ -304,7 +309,7 @@ mod tests {
304309
format!("{}/3.parquet", &fixture.table_location),
305310
];
306311

307-
let fast_append_action = tx.fast_append(None, vec![]).unwrap();
312+
let fast_append_action = tx.fast_append(None, None, vec![]).unwrap();
308313

309314
// Attempt to add the existing Parquet files with fast append.
310315
let new_tx = fast_append_action

crates/iceberg/src/transaction/mod.rs

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod sort_order;
2424
use std::cmp::Ordering;
2525
use std::collections::HashMap;
2626
use std::mem::discriminant;
27+
use std::sync::Arc;
2728

2829
use uuid::Uuid;
2930

@@ -37,7 +38,8 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat
3738

3839
/// Table transaction.
3940
pub struct Transaction<'a> {
40-
table: &'a Table,
41+
base_table: &'a Table,
42+
current_table: Table,
4143
updates: Vec<TableUpdate>,
4244
requirements: Vec<TableRequirement>,
4345
}
@@ -46,38 +48,60 @@ impl<'a> Transaction<'a> {
4648
/// Creates a new transaction.
4749
pub fn new(table: &'a Table) -> Self {
4850
Self {
49-
table,
51+
base_table: table,
52+
current_table: table.clone(),
5053
updates: vec![],
5154
requirements: vec![],
5255
}
5356
}
5457

55-
fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
56-
for update in &updates {
57-
for up in &self.updates {
58-
if discriminant(up) == discriminant(update) {
59-
return Err(Error::new(
60-
ErrorKind::DataInvalid,
61-
format!(
62-
"Cannot apply update with same type at same time: {:?}",
63-
update
64-
),
65-
));
66-
}
67-
}
58+
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
59+
let mut metadata_builder = self.current_table.metadata().clone().into_builder(None);
60+
for update in updates {
61+
metadata_builder = update.clone().apply(metadata_builder)?;
6862
}
69-
self.updates.extend(updates);
63+
64+
self.current_table
65+
.with_metadata(Arc::new(metadata_builder.build()?.metadata));
66+
7067
Ok(())
7168
}
7269

73-
fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> Result<()> {
74-
self.requirements.extend(requirements);
70+
fn apply(
71+
&mut self,
72+
updates: Vec<TableUpdate>,
73+
requirements: Vec<TableRequirement>,
74+
) -> Result<()> {
75+
for requirement in &requirements {
76+
requirement.check(Some(self.current_table.metadata()))?;
77+
}
78+
79+
self.update_table_metadata(&updates)?;
80+
81+
self.updates.extend(updates);
82+
83+
// For the requirements, it does not make sense to add a requirement more than once
84+
// For example, you cannot assert that the current schema has two different IDs
85+
for new_requirement in requirements {
86+
if self
87+
.requirements
88+
.iter()
89+
.map(discriminant)
90+
.all(|d| d != discriminant(&new_requirement))
91+
{
92+
self.requirements.push(new_requirement);
93+
}
94+
}
95+
96+
// # TODO
97+
// Support auto commit later.
98+
7599
Ok(())
76100
}
77101

78102
/// Sets table to a new version.
79103
pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> {
80-
let current_version = self.table.metadata().format_version();
104+
let current_version = self.current_table.metadata().format_version();
81105
match current_version.cmp(&format_version) {
82106
Ordering::Greater => {
83107
return Err(Error::new(
@@ -89,7 +113,7 @@ impl<'a> Transaction<'a> {
89113
));
90114
}
91115
Ordering::Less => {
92-
self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
116+
self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?;
93117
}
94118
Ordering::Equal => {
95119
// Do nothing.
@@ -100,7 +124,7 @@ impl<'a> Transaction<'a> {
100124

101125
/// Update table's property.
102126
pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> {
103-
self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
127+
self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?;
104128
Ok(self)
105129
}
106130

@@ -116,7 +140,7 @@ impl<'a> Transaction<'a> {
116140
};
117141
let mut snapshot_id = generate_random_id();
118142
while self
119-
.table
143+
.current_table
120144
.metadata()
121145
.snapshots()
122146
.any(|s| s.snapshot_id() == snapshot_id)
@@ -135,7 +159,7 @@ impl<'a> Transaction<'a> {
135159
) -> Result<FastAppendAction<'a>> {
136160
let snapshot_id = if let Some(snapshot_id) = snapshot_id {
137161
if self
138-
.table
162+
.current_table
139163
.metadata()
140164
.snapshots()
141165
.any(|s| s.snapshot_id() == snapshot_id)
@@ -168,14 +192,17 @@ impl<'a> Transaction<'a> {
168192

169193
/// Remove properties in table.
170194
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
171-
self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
195+
self.apply(
196+
vec![TableUpdate::RemoveProperties { removals: keys }],
197+
vec![],
198+
)?;
172199
Ok(self)
173200
}
174201

175202
/// Commit transaction.
176203
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
177204
let table_commit = TableCommit::builder()
178-
.ident(self.table.identifier().clone())
205+
.ident(self.base_table.identifier().clone())
179206
.updates(self.updates)
180207
.requirements(self.requirements)
181208
.build();
@@ -197,7 +224,7 @@ mod tests {
197224
use crate::{TableIdent, TableUpdate};
198225

199226
fn make_v1_table() -> Table {
200-
let file = File::open(format!(
227+
let file: File = File::open(format!(
201228
"{}/testdata/table_metadata/{}",
202229
env!("CARGO_MANIFEST_DIR"),
203230
"TableMetadataV1Valid.json"
@@ -324,19 +351,21 @@ mod tests {
324351
);
325352
}
326353

327-
#[test]
328-
fn test_do_same_update_in_same_transaction() {
329-
let table = make_v2_table();
354+
#[tokio::test]
355+
async fn test_transaction_apply_upgrade() {
356+
let table = make_v1_table();
330357
let tx = Transaction::new(&table);
331-
let tx = tx
332-
.remove_properties(vec!["a".to_string(), "b".to_string()])
333-
.unwrap();
334-
335-
let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
336-
337-
assert!(
338-
tx.is_err(),
339-
"Should not allow to do same kinds update in same transaction"
358+
// Upgrade v1 to v1, do nothing.
359+
let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap();
360+
// Upgrade v1 to v2, success.
361+
let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
362+
assert_eq!(
363+
vec![TableUpdate::UpgradeFormatVersion {
364+
format_version: FormatVersion::V2
365+
}],
366+
tx.updates
340367
);
368+
// Upgrade v2 to v1, return error.
369+
assert!(tx.upgrade_table_version(FormatVersion::V1).is_err());
341370
}
342371
}

0 commit comments

Comments
 (0)