Skip to content

Commit 82d73d4

Browse files
author
ZENOTME
committed
refine code and test
1 parent 5bd4b64 commit 82d73d4

File tree

4 files changed

+81
-57
lines changed

4 files changed

+81
-57
lines changed

crates/iceberg/src/spec/manifest.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,7 @@ impl ManifestEntry {
12081208
&self.data_file
12091209
}
12101210

1211-
/// get file sequence number
1211+
/// File sequence number indicating when the file was added. Inherited when null and status is 1 (added).
12121212
#[inline]
12131213
pub fn file_sequence_number(&self) -> Option<i64> {
12141214
self.file_sequence_number

crates/iceberg/src/transaction.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ impl<'a> MergeAppendAction<'a> {
399399
/// Finished building the action and apply it to the transaction.
400400
pub async fn apply(self) -> Result<Transaction<'a>> {
401401
if self.merge_enabled {
402-
let process = MergeManifsetProcess {
402+
let process = MergeManifestProcess {
403403
target_size_bytes: self.target_size_bytes,
404404
min_count_to_merge: self.min_count_to_merge,
405405
};
@@ -468,7 +468,7 @@ trait SnapshotProduceOperation: Send + Sync {
468468
struct DefaultManifestProcess;
469469

470470
impl ManifestProcess for DefaultManifestProcess {
471-
async fn process_manifeset<'a>(
471+
async fn process_manifest<'a>(
472472
&self,
473473
_snapshot_producer: &mut SnapshotProduceAction<'a>,
474474
manifests: Vec<ManifestFile>,
@@ -477,12 +477,12 @@ impl ManifestProcess for DefaultManifestProcess {
477477
}
478478
}
479479

480-
struct MergeManifsetProcess {
480+
struct MergeManifestProcess {
481481
target_size_bytes: u32,
482482
min_count_to_merge: u32,
483483
}
484484

485-
impl MergeManifsetProcess {
485+
impl MergeManifestProcess {
486486
pub fn new(target_size_bytes: u32, min_count_to_merge: u32) -> Self {
487487
Self {
488488
target_size_bytes,
@@ -508,8 +508,8 @@ impl MergeManifsetProcess {
508508
manifest_bin: Vec<ManifestFile>,
509509
mut writer: ManifestWriter,
510510
) -> Result<ManifestFile> {
511-
for manifset_file in manifest_bin {
512-
let manifest_file = manifset_file.load_manifest(&file_io).await?;
511+
for manifest_file in manifest_bin {
512+
let manifest_file = manifest_file.load_manifest(&file_io).await?;
513513
for manifest_entry in manifest_file.entries() {
514514
if manifest_entry.status() == ManifestStatus::Deleted
515515
&& manifest_entry
@@ -595,7 +595,7 @@ impl MergeManifsetProcess {
595595
Ok(merged_bins.into_iter().flatten().collect())
596596
}
597597

598-
async fn merge_manifeset<'a>(
598+
async fn merge_manifest<'a>(
599599
&self,
600600
snapshot_produce: &mut SnapshotProduceAction<'a>,
601601
manifests: Vec<ManifestFile>,
@@ -620,18 +620,18 @@ impl MergeManifsetProcess {
620620
}
621621
}
622622

623-
impl ManifestProcess for MergeManifsetProcess {
624-
async fn process_manifeset<'a>(
623+
impl ManifestProcess for MergeManifestProcess {
624+
async fn process_manifest<'a>(
625625
&self,
626626
snapshot_produce: &mut SnapshotProduceAction<'a>,
627627
manifests: Vec<ManifestFile>,
628628
) -> Result<Vec<ManifestFile>> {
629-
self.merge_manifeset(snapshot_produce, manifests).await
629+
self.merge_manifest(snapshot_produce, manifests).await
630630
}
631631
}
632632

633633
trait ManifestProcess: Send + Sync {
634-
fn process_manifeset<'a>(
634+
fn process_manifest<'a>(
635635
&self,
636636
snapshot_produce: &mut SnapshotProduceAction<'a>,
637637
manifests: Vec<ManifestFile>,
@@ -790,7 +790,7 @@ impl<'a> SnapshotProduceAction<'a> {
790790
let mut manifest_files = vec![added_manifest];
791791
manifest_files.extend(existing_manifests);
792792
manifest_process
793-
.process_manifeset(self, manifest_files)
793+
.process_manifest(self, manifest_files)
794794
.await
795795
}
796796

crates/integration_tests/tests/merge_append_test.rs renamed to crates/integration_tests/tests/shared_tests/merge_append_test.rs

Lines changed: 67 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,22 @@ use iceberg::spec::{
2525
DataFile, ManifestEntry, ManifestStatus, NestedField, PrimitiveType, Schema, Type,
2626
};
2727
use iceberg::table::Table;
28-
use iceberg::transaction::{Transaction, MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT};
28+
use iceberg::transaction::{
29+
Transaction, MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES,
30+
};
2931
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
3032
use iceberg::writer::file_writer::location_generator::{
3133
DefaultFileNameGenerator, DefaultLocationGenerator,
3234
};
3335
use iceberg::writer::file_writer::ParquetWriterBuilder;
3436
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
35-
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
36-
use iceberg_integration_tests::set_test_fixture;
37+
use iceberg::{Catalog, TableCreation};
38+
use iceberg_catalog_rest::RestCatalog;
3739
use parquet::file::properties::WriterProperties;
3840

41+
use crate::get_shared_containers;
42+
use crate::shared_tests::random_ns;
43+
3944
async fn write_new_data_file(table: &Table) -> Vec<DataFile> {
4045
let schema: Arc<arrow_schema::Schema> = Arc::new(
4146
table
@@ -60,9 +65,9 @@ async fn write_new_data_file(table: &Table) -> Vec<DataFile> {
6065
);
6166
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None);
6267
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
63-
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
64-
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
65-
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
68+
let col1 = StringArray::from(vec![Some("foo"); 100]);
69+
let col2 = Int32Array::from(vec![Some(1); 100]);
70+
let col3 = BooleanArray::from(vec![Some(true); 100]);
6671
let batch = RecordBatch::try_new(schema.clone(), vec![
6772
Arc::new(col1) as ArrayRef,
6873
Arc::new(col2) as ArrayRef,
@@ -75,21 +80,10 @@ async fn write_new_data_file(table: &Table) -> Vec<DataFile> {
7580

7681
#[tokio::test]
7782
async fn test_append_data_file() {
78-
let fixture = set_test_fixture("test_create_table").await;
83+
let fixture = get_shared_containers();
84+
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());
85+
let ns = random_ns().await;
7986

80-
// Create table
81-
let ns = Namespace::with_properties(
82-
NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
83-
HashMap::from([
84-
("owner".to_string(), "ray".to_string()),
85-
("community".to_string(), "apache".to_string()),
86-
]),
87-
);
88-
fixture
89-
.rest_catalog
90-
.create_namespace(ns.name(), ns.properties().clone())
91-
.await
92-
.unwrap();
9387
let schema = Schema::builder()
9488
.with_schema_id(1)
9589
.with_identifier_field_ids(vec![2])
@@ -104,8 +98,7 @@ async fn test_append_data_file() {
10498
.name("t1".to_string())
10599
.schema(schema.clone())
106100
.build();
107-
let mut table = fixture
108-
.rest_catalog
101+
let mut table = rest_catalog
109102
.create_table(ns.name(), table_creation)
110103
.await
111104
.unwrap();
@@ -116,9 +109,10 @@ async fn test_append_data_file() {
116109
.set_properties(HashMap::from([
117110
(MANIFEST_MERGE_ENABLED.to_string(), "true".to_string()),
118111
(MANIFEST_MIN_MERGE_COUNT.to_string(), "4".to_string()),
112+
(MANIFEST_TARGET_SIZE_BYTES.to_string(), "7000".to_string()),
119113
]))
120114
.unwrap()
121-
.commit(&fixture.rest_catalog)
115+
.commit(&rest_catalog)
122116
.await
123117
.unwrap();
124118

@@ -130,7 +124,7 @@ async fn test_append_data_file() {
130124
let mut append_action = tx.fast_append(None, vec![]).unwrap();
131125
append_action.add_data_files(data_file.clone()).unwrap();
132126
let tx = append_action.apply().await.unwrap();
133-
table = tx.commit(&fixture.rest_catalog).await.unwrap()
127+
table = tx.commit(&rest_catalog).await.unwrap()
134128
}
135129
let manifest_list = table
136130
.metadata()
@@ -140,44 +134,73 @@ async fn test_append_data_file() {
140134
.await
141135
.unwrap();
142136
assert_eq!(manifest_list.entries().len(), 3);
143-
for entry in manifest_list.entries() {
137+
138+
// construct test data
139+
for (idx, entry) in manifest_list.entries().iter().enumerate() {
144140
let manifest = entry.load_manifest(table.file_io()).await.unwrap();
145141
assert!(manifest.entries().len() == 1);
146142

147-
original_manifest_entries.push(Arc::new(
148-
ManifestEntry::builder()
149-
.status(ManifestStatus::Existing)
150-
.snapshot_id(manifest.entries()[0].snapshot_id().unwrap())
151-
.sequence_number(manifest.entries()[0].sequence_number().unwrap())
152-
.file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap())
153-
.data_file(manifest.entries()[0].data_file().clone())
154-
.build(),
155-
));
143+
// For this first manifest, it will be pack with the first additional manifest and
144+
// the count(2) is less than the min merge count(4), so these two will not merge.
145+
// See detail: `MergeManifestProcess::merge_group`
146+
if idx == 0 {
147+
original_manifest_entries.push(Arc::new(
148+
ManifestEntry::builder()
149+
.status(ManifestStatus::Added)
150+
.snapshot_id(manifest.entries()[0].snapshot_id().unwrap())
151+
.sequence_number(manifest.entries()[0].sequence_number().unwrap())
152+
.file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap())
153+
.data_file(manifest.entries()[0].data_file().clone())
154+
.build(),
155+
));
156+
} else {
157+
original_manifest_entries.push(Arc::new(
158+
ManifestEntry::builder()
159+
.status(ManifestStatus::Existing)
160+
.snapshot_id(manifest.entries()[0].snapshot_id().unwrap())
161+
.sequence_number(manifest.entries()[0].sequence_number().unwrap())
162+
.file_sequence_number(manifest.entries()[0].file_sequence_number().unwrap())
163+
.data_file(manifest.entries()[0].data_file().clone())
164+
.build(),
165+
));
166+
}
156167
}
157168

158-
// append data file with merge append, 4 data file will be merged to one manifest
169+
// append data file with merge append, 4 data file will be merged to two manifest
159170
let data_file = write_new_data_file(&table).await;
160171
let tx = Transaction::new(&table);
161172
let mut merge_append_action = tx.merge_append(None, vec![]).unwrap();
162173
merge_append_action
163174
.add_data_files(data_file.clone())
164175
.unwrap();
165176
let tx = merge_append_action.apply().await.unwrap();
166-
table = tx.commit(&fixture.rest_catalog).await.unwrap();
177+
table = tx.commit(&rest_catalog).await.unwrap();
178+
// Check manifest file
167179
let manifest_list = table
168180
.metadata()
169181
.current_snapshot()
170182
.unwrap()
171183
.load_manifest_list(table.file_io(), table.metadata())
172184
.await
173185
.unwrap();
174-
assert_eq!(manifest_list.entries().len(), 1);
175-
let manifest = manifest_list.entries()[0]
176-
.load_manifest(table.file_io())
177-
.await
178-
.unwrap();
179-
assert!(manifest.entries().len() == 4);
180-
for original_entry in original_manifest_entries.iter() {
181-
assert!(manifest.entries().contains(original_entry));
186+
assert_eq!(manifest_list.entries().len(), 3);
187+
{
188+
let manifest = manifest_list.entries()[1]
189+
.load_manifest(table.file_io())
190+
.await
191+
.unwrap();
192+
assert!(manifest.entries().len() == 1);
193+
original_manifest_entries.retain(|entry| !manifest.entries().contains(entry));
194+
assert!(original_manifest_entries.len() == 2);
195+
}
196+
{
197+
let manifest = manifest_list.entries()[2]
198+
.load_manifest(table.file_io())
199+
.await
200+
.unwrap();
201+
assert!(manifest.entries().len() == 2);
202+
for original_entry in original_manifest_entries.iter() {
203+
assert!(manifest.entries().contains(original_entry));
204+
}
182205
}
183206
}

crates/integration_tests/tests/shared_tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ mod append_data_file_test;
2727
mod append_partition_data_file_test;
2828
mod conflict_commit_test;
2929
mod datafusion;
30+
mod merge_append_test;
3031
mod read_evolved_schema;
3132
mod read_positional_deletes;
3233
mod scan_all_type;

0 commit comments

Comments
 (0)