@@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet};
22
22
use std:: future:: Future ;
23
23
use std:: mem:: discriminant;
24
24
use std:: ops:: RangeFrom ;
25
+ use std:: sync:: Arc ;
25
26
26
27
use arrow_array:: StringArray ;
27
28
use futures:: TryStreamExt ;
@@ -32,8 +33,7 @@ use crate::io::OutputFile;
32
33
use crate :: spec:: {
33
34
DataFile , DataFileFormat , FormatVersion , ManifestEntry , ManifestFile , ManifestListWriter ,
34
35
ManifestWriterBuilder , NullOrder , Operation , Snapshot , SnapshotReference , SnapshotRetention ,
35
- SortDirection , SortField , SortOrder , Struct , StructType , Summary , TableMetadata , Transform ,
36
- MAIN_BRANCH ,
36
+ SortDirection , SortField , SortOrder , Struct , StructType , Summary , Transform , MAIN_BRANCH ,
37
37
} ;
38
38
use crate :: table:: Table ;
39
39
use crate :: writer:: file_writer:: ParquetWriter ;
@@ -45,7 +45,7 @@ const META_ROOT_PATH: &str = "metadata";
45
45
/// Table transaction.
46
46
pub struct Transaction < ' a > {
47
47
base_table : & ' a Table ,
48
- current_metadata : TableMetadata ,
48
+ current_table : Table ,
49
49
updates : Vec < TableUpdate > ,
50
50
requirements : Vec < TableRequirement > ,
51
51
}
@@ -55,19 +55,20 @@ impl<'a> Transaction<'a> {
55
55
pub fn new ( table : & ' a Table ) -> Self {
56
56
Self {
57
57
base_table : table,
58
- current_metadata : table. metadata ( ) . clone ( ) ,
58
+ current_table : table. clone ( ) ,
59
59
updates : vec ! [ ] ,
60
60
requirements : vec ! [ ] ,
61
61
}
62
62
}
63
63
64
64
fn update_table_metadata ( & mut self , updates : & [ TableUpdate ] ) -> Result < ( ) > {
65
- let mut metadata_builder = self . current_metadata . clone ( ) . into_builder ( None ) ;
65
+ let mut metadata_builder = self . current_table . metadata ( ) . clone ( ) . into_builder ( None ) ;
66
66
for update in updates {
67
67
metadata_builder = update. clone ( ) . apply ( metadata_builder) ?;
68
68
}
69
69
70
- self . current_metadata = metadata_builder. build ( ) ?. metadata ;
70
+ self . current_table
71
+ . with_metadata ( Arc :: new ( metadata_builder. build ( ) ?. metadata ) ) ;
71
72
72
73
Ok ( ( ) )
73
74
}
@@ -78,7 +79,7 @@ impl<'a> Transaction<'a> {
78
79
requirements : Vec < TableRequirement > ,
79
80
) -> Result < ( ) > {
80
81
for requirement in & requirements {
81
- requirement. check ( Some ( & self . current_metadata ) ) ?;
82
+ requirement. check ( Some ( self . current_table . metadata ( ) ) ) ?;
82
83
}
83
84
84
85
self . update_table_metadata ( & updates) ?;
@@ -106,7 +107,7 @@ impl<'a> Transaction<'a> {
106
107
107
108
/// Sets table to a new version.
108
109
pub fn upgrade_table_version ( mut self , format_version : FormatVersion ) -> Result < Self > {
109
- let current_version = self . current_metadata . format_version ( ) ;
110
+ let current_version = self . current_table . metadata ( ) . format_version ( ) ;
110
111
match current_version. cmp ( & format_version) {
111
112
Ordering :: Greater => {
112
113
return Err ( Error :: new (
@@ -145,7 +146,8 @@ impl<'a> Transaction<'a> {
145
146
} ;
146
147
let mut snapshot_id = generate_random_id ( ) ;
147
148
while self
148
- . current_metadata
149
+ . current_table
150
+ . metadata ( )
149
151
. snapshots ( )
150
152
. any ( |s| s. snapshot_id ( ) == snapshot_id)
151
153
{
@@ -247,7 +249,8 @@ impl<'a> FastAppendAction<'a> {
247
249
if !self
248
250
. snapshot_produce_action
249
251
. tx
250
- . current_metadata
252
+ . current_table
253
+ . metadata ( )
251
254
. default_spec
252
255
. is_unpartitioned ( )
253
256
{
@@ -258,9 +261,9 @@ impl<'a> FastAppendAction<'a> {
258
261
}
259
262
260
263
let data_files = ParquetWriter :: parquet_files_to_data_files (
261
- self . snapshot_produce_action . tx . base_table . file_io ( ) ,
264
+ self . snapshot_produce_action . tx . current_table . file_io ( ) ,
262
265
file_path,
263
- & self . snapshot_produce_action . tx . current_metadata ,
266
+ self . snapshot_produce_action . tx . current_table . metadata ( ) ,
264
267
)
265
268
. await ?;
266
269
@@ -283,7 +286,7 @@ impl<'a> FastAppendAction<'a> {
283
286
let mut manifest_stream = self
284
287
. snapshot_produce_action
285
288
. tx
286
- . base_table
289
+ . current_table
287
290
. inspect ( )
288
291
. manifests ( )
289
292
. scan ( )
@@ -345,14 +348,19 @@ impl SnapshotProduceOperation for FastAppendOperation {
345
348
& self ,
346
349
snapshot_produce : & SnapshotProduceAction < ' _ > ,
347
350
) -> Result < Vec < ManifestFile > > {
348
- let Some ( snapshot) = snapshot_produce. tx . current_metadata . current_snapshot ( ) else {
351
+ let Some ( snapshot) = snapshot_produce
352
+ . tx
353
+ . current_table
354
+ . metadata ( )
355
+ . current_snapshot ( )
356
+ else {
349
357
return Ok ( vec ! [ ] ) ;
350
358
} ;
351
359
352
360
let manifest_list = snapshot
353
361
. load_manifest_list (
354
- snapshot_produce. tx . base_table . file_io ( ) ,
355
- & snapshot_produce. tx . current_metadata ,
362
+ snapshot_produce. tx . current_table . file_io ( ) ,
363
+ snapshot_produce. tx . current_table . metadata ( ) ,
356
364
)
357
365
. await ?;
358
366
@@ -470,7 +478,7 @@ impl<'a> SnapshotProduceAction<'a> {
470
478
}
471
479
Self :: validate_partition_value (
472
480
data_file. partition ( ) ,
473
- self . tx . current_metadata . default_partition_type ( ) ,
481
+ self . tx . current_table . metadata ( ) . default_partition_type ( ) ,
474
482
) ?;
475
483
}
476
484
self . added_data_files . extend ( data_files) ;
@@ -480,20 +488,23 @@ impl<'a> SnapshotProduceAction<'a> {
480
488
fn new_manifest_output ( & mut self ) -> Result < OutputFile > {
481
489
let new_manifest_path = format ! (
482
490
"{}/{}/{}-m{}.{}" ,
483
- self . tx. current_metadata . location( ) ,
491
+ self . tx. current_table . metadata ( ) . location( ) ,
484
492
META_ROOT_PATH ,
485
493
self . commit_uuid,
486
494
self . manifest_counter. next( ) . unwrap( ) ,
487
495
DataFileFormat :: Avro
488
496
) ;
489
- self . tx . base_table . file_io ( ) . new_output ( new_manifest_path)
497
+ self . tx
498
+ . current_table
499
+ . file_io ( )
500
+ . new_output ( new_manifest_path)
490
501
}
491
502
492
503
// Write manifest file for added data files and return the ManifestFile for ManifestList.
493
504
async fn write_added_manifest ( & mut self ) -> Result < ManifestFile > {
494
505
let added_data_files = std:: mem:: take ( & mut self . added_data_files ) ;
495
506
let snapshot_id = self . snapshot_id ;
496
- let format_version = self . tx . current_metadata . format_version ( ) ;
507
+ let format_version = self . tx . current_table . metadata ( ) . format_version ( ) ;
497
508
let manifest_entries = added_data_files. into_iter ( ) . map ( |data_file| {
498
509
let builder = ManifestEntry :: builder ( )
499
510
. status ( crate :: spec:: ManifestStatus :: Added )
@@ -511,14 +522,15 @@ impl<'a> SnapshotProduceAction<'a> {
511
522
self . new_manifest_output ( ) ?,
512
523
Some ( self . snapshot_id ) ,
513
524
self . key_metadata . clone ( ) ,
514
- self . tx . current_metadata . current_schema ( ) . clone ( ) ,
525
+ self . tx . current_table . metadata ( ) . current_schema ( ) . clone ( ) ,
515
526
self . tx
516
- . current_metadata
527
+ . current_table
528
+ . metadata ( )
517
529
. default_partition_spec ( )
518
530
. as_ref ( )
519
531
. clone ( ) ,
520
532
) ;
521
- if self . tx . current_metadata . format_version ( ) == FormatVersion :: V1 {
533
+ if self . tx . current_table . metadata ( ) . format_version ( ) == FormatVersion :: V1 {
522
534
builder. build_v1 ( )
523
535
} else {
524
536
builder. build_v2_data ( )
@@ -558,7 +570,7 @@ impl<'a> SnapshotProduceAction<'a> {
558
570
fn generate_manifest_list_file_path ( & self , attempt : i64 ) -> String {
559
571
format ! (
560
572
"{}/{}/snap-{}-{}-{}.{}" ,
561
- self . tx. current_metadata . location( ) ,
573
+ self . tx. current_table . metadata ( ) . location( ) ,
562
574
META_ROOT_PATH ,
563
575
self . snapshot_id,
564
576
attempt,
@@ -576,28 +588,28 @@ impl<'a> SnapshotProduceAction<'a> {
576
588
let new_manifests = self
577
589
. manifest_file ( & snapshot_produce_operation, & process)
578
590
. await ?;
579
- let next_seq_num = self . tx . current_metadata . next_sequence_number ( ) ;
591
+ let next_seq_num = self . tx . current_table . metadata ( ) . next_sequence_number ( ) ;
580
592
581
593
let summary = self . summary ( & snapshot_produce_operation) ;
582
594
583
595
let manifest_list_path = self . generate_manifest_list_file_path ( 0 ) ;
584
596
585
- let mut manifest_list_writer = match self . tx . current_metadata . format_version ( ) {
597
+ let mut manifest_list_writer = match self . tx . current_table . metadata ( ) . format_version ( ) {
586
598
FormatVersion :: V1 => ManifestListWriter :: v1 (
587
599
self . tx
588
- . base_table
600
+ . current_table
589
601
. file_io ( )
590
602
. new_output ( manifest_list_path. clone ( ) ) ?,
591
603
self . snapshot_id ,
592
- self . tx . current_metadata . current_snapshot_id ( ) ,
604
+ self . tx . current_table . metadata ( ) . current_snapshot_id ( ) ,
593
605
) ,
594
606
FormatVersion :: V2 => ManifestListWriter :: v2 (
595
607
self . tx
596
- . base_table
608
+ . current_table
597
609
. file_io ( )
598
610
. new_output ( manifest_list_path. clone ( ) ) ?,
599
611
self . snapshot_id ,
600
- self . tx . current_metadata . current_snapshot_id ( ) ,
612
+ self . tx . current_table . metadata ( ) . current_snapshot_id ( ) ,
601
613
next_seq_num,
602
614
) ,
603
615
} ;
@@ -608,10 +620,10 @@ impl<'a> SnapshotProduceAction<'a> {
608
620
let new_snapshot = Snapshot :: builder ( )
609
621
. with_manifest_list ( manifest_list_path)
610
622
. with_snapshot_id ( self . snapshot_id )
611
- . with_parent_snapshot_id ( self . tx . current_metadata . current_snapshot_id ( ) )
623
+ . with_parent_snapshot_id ( self . tx . current_table . metadata ( ) . current_snapshot_id ( ) )
612
624
. with_sequence_number ( next_seq_num)
613
625
. with_summary ( summary)
614
- . with_schema_id ( self . tx . current_metadata . current_schema_id ( ) )
626
+ . with_schema_id ( self . tx . current_table . metadata ( ) . current_schema_id ( ) )
615
627
. with_timestamp_ms ( commit_ts)
616
628
. build ( ) ;
617
629
@@ -630,11 +642,11 @@ impl<'a> SnapshotProduceAction<'a> {
630
642
] ,
631
643
vec ! [
632
644
TableRequirement :: UuidMatch {
633
- uuid: self . tx. current_metadata . uuid( ) ,
645
+ uuid: self . tx. current_table . metadata ( ) . uuid( ) ,
634
646
} ,
635
647
TableRequirement :: RefSnapshotIdMatch {
636
648
r#ref: MAIN_BRANCH . to_string( ) ,
637
- snapshot_id: self . tx. current_metadata . current_snapshot_id( ) ,
649
+ snapshot_id: self . tx. current_table . metadata ( ) . current_snapshot_id( ) ,
638
650
} ,
639
651
] ,
640
652
) ?;
@@ -674,10 +686,20 @@ impl<'a> ReplaceSortOrderAction<'a> {
674
686
675
687
let requirements = vec ! [
676
688
TableRequirement :: CurrentSchemaIdMatch {
677
- current_schema_id: self . tx. current_metadata. current_schema( ) . schema_id( ) ,
689
+ current_schema_id: self
690
+ . tx
691
+ . current_table
692
+ . metadata( )
693
+ . current_schema( )
694
+ . schema_id( ) ,
678
695
} ,
679
696
TableRequirement :: DefaultSortOrderIdMatch {
680
- default_sort_order_id: self . tx. current_metadata. default_sort_order( ) . order_id,
697
+ default_sort_order_id: self
698
+ . tx
699
+ . current_table
700
+ . metadata( )
701
+ . default_sort_order( )
702
+ . order_id,
681
703
} ,
682
704
] ;
683
705
@@ -693,7 +715,8 @@ impl<'a> ReplaceSortOrderAction<'a> {
693
715
) -> Result < Self > {
694
716
let field_id = self
695
717
. tx
696
- . current_metadata
718
+ . current_table
719
+ . metadata ( )
697
720
. current_schema ( )
698
721
. field_id_by_name ( name)
699
722
. ok_or_else ( || {
0 commit comments