Skip to content

Commit c20c598

Browse files
author
ZENOTME
committed
add apply in transaction to support stack action
1 parent 6f2ce71 commit c20c598

File tree

1 file changed

+101
-91
lines changed

1 file changed

+101
-91
lines changed

crates/iceberg/src/transaction.rs

Lines changed: 101 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ use crate::io::OutputFile;
3030
use crate::spec::{
3131
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
3232
ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
33-
SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH,
33+
SortDirection, SortField, SortOrder, Struct, StructType, Summary, TableMetadata, Transform,
34+
MAIN_BRANCH,
3435
};
3536
use crate::table::Table;
3637
use crate::TableUpdate::UpgradeFormatVersion;
@@ -40,7 +41,8 @@ const META_ROOT_PATH: &str = "metadata";
4041

4142
/// Table transaction.
4243
pub struct Transaction<'a> {
43-
table: &'a Table,
44+
base_table: &'a Table,
45+
current_metadata: TableMetadata,
4446
updates: Vec<TableUpdate>,
4547
requirements: Vec<TableRequirement>,
4648
}
@@ -49,38 +51,59 @@ impl<'a> Transaction<'a> {
4951
/// Creates a new transaction.
5052
pub fn new(table: &'a Table) -> Self {
5153
Self {
52-
table,
54+
base_table: table,
55+
current_metadata: table.metadata().clone(),
5356
updates: vec![],
5457
requirements: vec![],
5558
}
5659
}
5760

58-
fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
59-
for update in &updates {
60-
for up in &self.updates {
61-
if discriminant(up) == discriminant(update) {
62-
return Err(Error::new(
63-
ErrorKind::DataInvalid,
64-
format!(
65-
"Cannot apply update with same type at same time: {:?}",
66-
update
67-
),
68-
));
69-
}
70-
}
61+
fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> {
62+
let mut metadata_builder = self.current_metadata.clone().into_builder(None);
63+
for update in updates {
64+
metadata_builder = update.clone().apply(metadata_builder)?;
7165
}
72-
self.updates.extend(updates);
66+
67+
self.current_metadata = metadata_builder.build()?.metadata;
68+
7369
Ok(())
7470
}
7571

76-
fn append_requirements(&mut self, requirements: Vec<TableRequirement>) -> Result<()> {
77-
self.requirements.extend(requirements);
72+
fn apply(
73+
&mut self,
74+
updates: Vec<TableUpdate>,
75+
requirements: Vec<TableRequirement>,
76+
) -> Result<()> {
77+
for requirment in &requirements {
78+
requirment.check(Some(&self.current_metadata))?;
79+
}
80+
81+
self.update_table_metadata(&updates)?;
82+
83+
self.updates.extend(updates);
84+
85+
// For the requirements, it does not make sense to add a requirement more than once
86+
// For example, you cannot assert that the current schema has two different IDs
87+
for new_requirment in requirements {
88+
if self
89+
.requirements
90+
.iter()
91+
.map(discriminant)
92+
.all(|d| d != discriminant(&new_requirment))
93+
{
94+
self.requirements.push(new_requirment);
95+
}
96+
}
97+
98+
// # TODO
99+
// Support auto commit later.
100+
78101
Ok(())
79102
}
80103

81104
/// Sets table to a new version.
82105
pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> {
83-
let current_version = self.table.metadata().format_version();
106+
let current_version = self.current_metadata.format_version();
84107
match current_version.cmp(&format_version) {
85108
Ordering::Greater => {
86109
return Err(Error::new(
@@ -92,7 +115,7 @@ impl<'a> Transaction<'a> {
92115
));
93116
}
94117
Ordering::Less => {
95-
self.append_updates(vec![UpgradeFormatVersion { format_version }])?;
118+
self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?;
96119
}
97120
Ordering::Equal => {
98121
// Do nothing.
@@ -103,7 +126,7 @@ impl<'a> Transaction<'a> {
103126

104127
/// Update table's property.
105128
pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> {
106-
self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?;
129+
self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?;
107130
Ok(self)
108131
}
109132

@@ -119,8 +142,7 @@ impl<'a> Transaction<'a> {
119142
};
120143
let mut snapshot_id = generate_random_id();
121144
while self
122-
.table
123-
.metadata()
145+
.current_metadata
124146
.snapshots()
125147
.any(|s| s.snapshot_id() == snapshot_id)
126148
{
@@ -155,14 +177,17 @@ impl<'a> Transaction<'a> {
155177

156178
/// Remove properties in table.
157179
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
158-
self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?;
180+
self.apply(
181+
vec![TableUpdate::RemoveProperties { removals: keys }],
182+
vec![],
183+
)?;
159184
Ok(self)
160185
}
161186

162187
/// Commit transaction.
163188
pub async fn commit(self, catalog: &impl Catalog) -> Result<Table> {
164189
let table_commit = TableCommit::builder()
165-
.ident(self.table.identifier().clone())
190+
.ident(self.base_table.identifier().clone())
166191
.updates(self.updates)
167192
.requirements(self.requirements)
168193
.build();
@@ -231,14 +256,14 @@ impl SnapshotProduceOperation for FastAppendOperation {
231256
&self,
232257
snapshot_produce: &SnapshotProduceAction<'_>,
233258
) -> Result<Vec<ManifestFile>> {
234-
let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else {
259+
let Some(snapshot) = snapshot_produce.tx.current_metadata.current_snapshot() else {
235260
return Ok(vec![]);
236261
};
237262

238263
let manifest_list = snapshot
239264
.load_manifest_list(
240-
snapshot_produce.tx.table.file_io(),
241-
&snapshot_produce.tx.table.metadata_ref(),
265+
snapshot_produce.tx.base_table.file_io(),
266+
&snapshot_produce.tx.current_metadata,
242267
)
243268
.await?;
244269

@@ -355,7 +380,7 @@ impl<'a> SnapshotProduceAction<'a> {
355380
}
356381
Self::validate_partition_value(
357382
data_file.partition(),
358-
self.tx.table.metadata().default_partition_type(),
383+
self.tx.current_metadata.default_partition_type(),
359384
)?;
360385
}
361386
self.added_data_files.extend(data_files);
@@ -365,24 +390,25 @@ impl<'a> SnapshotProduceAction<'a> {
365390
fn new_manifest_output(&mut self) -> Result<OutputFile> {
366391
let new_manifest_path = format!(
367392
"{}/{}/{}-m{}.{}",
368-
self.tx.table.metadata().location(),
393+
self.tx.current_metadata.location(),
369394
META_ROOT_PATH,
370395
self.commit_uuid,
371396
self.manifest_counter.next().unwrap(),
372397
DataFileFormat::Avro
373398
);
374-
self.tx.table.file_io().new_output(new_manifest_path)
399+
self.tx.base_table.file_io().new_output(new_manifest_path)
375400
}
376401

377402
// Write manifest file for added data files and return the ManifestFile for ManifestList.
378403
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
379404
let added_data_files = std::mem::take(&mut self.added_data_files);
380405
let snapshot_id = self.snapshot_id;
406+
let format_version = self.tx.current_metadata.format_version();
381407
let manifest_entries = added_data_files.into_iter().map(|data_file| {
382408
let builder = ManifestEntry::builder()
383409
.status(crate::spec::ManifestStatus::Added)
384410
.data_file(data_file);
385-
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
411+
if format_version == FormatVersion::V1 {
386412
builder.snapshot_id(snapshot_id).build()
387413
} else {
388414
// For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
@@ -395,15 +421,14 @@ impl<'a> SnapshotProduceAction<'a> {
395421
self.new_manifest_output()?,
396422
Some(self.snapshot_id),
397423
self.key_metadata.clone(),
398-
self.tx.table.metadata().current_schema().clone(),
424+
self.tx.current_metadata.current_schema().clone(),
399425
self.tx
400-
.table
401-
.metadata()
426+
.current_metadata
402427
.default_partition_spec()
403428
.as_ref()
404429
.clone(),
405430
);
406-
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
431+
if self.tx.current_metadata.format_version() == FormatVersion::V1 {
407432
builder.build_v1()
408433
} else {
409434
builder.build_v2_data()
@@ -443,7 +468,7 @@ impl<'a> SnapshotProduceAction<'a> {
443468
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
444469
format!(
445470
"{}/{}/snap-{}-{}-{}.{}",
446-
self.tx.table.metadata().location(),
471+
self.tx.current_metadata.location(),
447472
META_ROOT_PATH,
448473
self.snapshot_id,
449474
attempt,
@@ -461,28 +486,28 @@ impl<'a> SnapshotProduceAction<'a> {
461486
let new_manifests = self
462487
.manifest_file(&snapshot_produce_operation, &process)
463488
.await?;
464-
let next_seq_num = self.tx.table.metadata().next_sequence_number();
489+
let next_seq_num = self.tx.current_metadata.next_sequence_number();
465490

466491
let summary = self.summary(&snapshot_produce_operation);
467492

468493
let manifest_list_path = self.generate_manifest_list_file_path(0);
469494

470-
let mut manifest_list_writer = match self.tx.table.metadata().format_version() {
495+
let mut manifest_list_writer = match self.tx.current_metadata.format_version() {
471496
FormatVersion::V1 => ManifestListWriter::v1(
472497
self.tx
473-
.table
498+
.base_table
474499
.file_io()
475500
.new_output(manifest_list_path.clone())?,
476501
self.snapshot_id,
477-
self.tx.table.metadata().current_snapshot_id(),
502+
self.tx.current_metadata.current_snapshot_id(),
478503
),
479504
FormatVersion::V2 => ManifestListWriter::v2(
480505
self.tx
481-
.table
506+
.base_table
482507
.file_io()
483508
.new_output(manifest_list_path.clone())?,
484509
self.snapshot_id,
485-
self.tx.table.metadata().current_snapshot_id(),
510+
self.tx.current_metadata.current_snapshot_id(),
486511
next_seq_num,
487512
),
488513
};
@@ -493,34 +518,36 @@ impl<'a> SnapshotProduceAction<'a> {
493518
let new_snapshot = Snapshot::builder()
494519
.with_manifest_list(manifest_list_path)
495520
.with_snapshot_id(self.snapshot_id)
496-
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
521+
.with_parent_snapshot_id(self.tx.current_metadata.current_snapshot_id())
497522
.with_sequence_number(next_seq_num)
498523
.with_summary(summary)
499-
.with_schema_id(self.tx.table.metadata().current_schema_id())
524+
.with_schema_id(self.tx.current_metadata.current_schema_id())
500525
.with_timestamp_ms(commit_ts)
501526
.build();
502527

503-
self.tx.append_updates(vec![
504-
TableUpdate::AddSnapshot {
505-
snapshot: new_snapshot,
506-
},
507-
TableUpdate::SetSnapshotRef {
508-
ref_name: MAIN_BRANCH.to_string(),
509-
reference: SnapshotReference::new(
510-
self.snapshot_id,
511-
SnapshotRetention::branch(None, None, None),
512-
),
513-
},
514-
])?;
515-
self.tx.append_requirements(vec![
516-
TableRequirement::UuidMatch {
517-
uuid: self.tx.table.metadata().uuid(),
518-
},
519-
TableRequirement::RefSnapshotIdMatch {
520-
r#ref: MAIN_BRANCH.to_string(),
521-
snapshot_id: self.tx.table.metadata().current_snapshot_id(),
522-
},
523-
])?;
528+
self.tx.apply(
529+
vec![
530+
TableUpdate::AddSnapshot {
531+
snapshot: new_snapshot.clone(),
532+
},
533+
TableUpdate::SetSnapshotRef {
534+
ref_name: MAIN_BRANCH.to_string(),
535+
reference: SnapshotReference::new(
536+
self.snapshot_id,
537+
SnapshotRetention::branch(None, None, None),
538+
),
539+
},
540+
],
541+
vec![
542+
TableRequirement::UuidMatch {
543+
uuid: self.tx.current_metadata.uuid(),
544+
},
545+
TableRequirement::RefSnapshotIdMatch {
546+
r#ref: MAIN_BRANCH.to_string(),
547+
snapshot_id: self.tx.current_metadata.current_snapshot_id(),
548+
},
549+
],
550+
)?;
524551
Ok(self.tx)
525552
}
526553
}
@@ -557,15 +584,14 @@ impl<'a> ReplaceSortOrderAction<'a> {
557584

558585
let requirements = vec![
559586
TableRequirement::CurrentSchemaIdMatch {
560-
current_schema_id: self.tx.table.metadata().current_schema().schema_id(),
587+
current_schema_id: self.tx.current_metadata.current_schema().schema_id(),
561588
},
562589
TableRequirement::DefaultSortOrderIdMatch {
563-
default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id,
590+
default_sort_order_id: self.tx.current_metadata.default_sort_order().order_id,
564591
},
565592
];
566593

567-
self.tx.append_requirements(requirements)?;
568-
self.tx.append_updates(updates)?;
594+
self.tx.apply(updates, requirements)?;
569595
Ok(self.tx)
570596
}
571597

@@ -577,8 +603,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
577603
) -> Result<Self> {
578604
let field_id = self
579605
.tx
580-
.table
581-
.metadata()
606+
.current_metadata
582607
.current_schema()
583608
.field_id_by_name(name)
584609
.ok_or_else(|| {
@@ -806,14 +831,15 @@ mod tests {
806831
assert!(
807832
matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH)
808833
);
834+
// requriments is based on original table metadata
809835
assert_eq!(
810836
vec![
811837
TableRequirement::UuidMatch {
812-
uuid: tx.table.metadata().uuid()
838+
uuid: table.metadata().uuid()
813839
},
814840
TableRequirement::RefSnapshotIdMatch {
815841
r#ref: MAIN_BRANCH.to_string(),
816-
snapshot_id: tx.table.metadata().current_snapshot_id
842+
snapshot_id: table.metadata().current_snapshot_id()
817843
}
818844
],
819845
tx.requirements
@@ -853,20 +879,4 @@ mod tests {
853879
);
854880
assert_eq!(data_file, *manifest.entries()[0].data_file());
855881
}
856-
857-
#[test]
858-
fn test_do_same_update_in_same_transaction() {
859-
let table = make_v2_table();
860-
let tx = Transaction::new(&table);
861-
let tx = tx
862-
.remove_properties(vec!["a".to_string(), "b".to_string()])
863-
.unwrap();
864-
865-
let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
866-
867-
assert!(
868-
tx.is_err(),
869-
"Should not allow to do same kinds update in same transaction"
870-
);
871-
}
872882
}

0 commit comments

Comments
 (0)