Skip to content

feat: support append delete file #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 52 additions & 18 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use uuid::Uuid;
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH,
DataContentType, DataFile, DataFileFormat, FormatVersion, ManifestContentType, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType,
Summary, Transform, MAIN_BRANCH,
};
use crate::table::Table;
use crate::writer::file_writer::ParquetWriter;
Expand Down Expand Up @@ -365,6 +366,7 @@ struct SnapshotProduceAction<'a> {
commit_uuid: Uuid,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
Expand All @@ -385,6 +387,7 @@ impl<'a> SnapshotProduceAction<'a> {
commit_uuid,
snapshot_properties,
added_data_files: vec![],
added_delete_files: vec![],
manifest_counter: (0..),
key_metadata,
})
Expand Down Expand Up @@ -429,19 +432,17 @@ impl<'a> SnapshotProduceAction<'a> {
data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
let data_files: Vec<DataFile> = data_files.into_iter().collect();
for data_file in &data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
));
}
for data_file in data_files {
Self::validate_partition_value(
data_file.partition(),
self.tx.table.metadata().default_partition_type(),
)?;
if data_file.content_type() == DataContentType::Data {
self.added_data_files.push(data_file);
} else {
self.added_delete_files.push(data_file);
}
}
self.added_data_files.extend(data_files);
Ok(self)
}

Expand All @@ -458,9 +459,32 @@ impl<'a> SnapshotProduceAction<'a> {
}

// Write manifest file for added data files and return the ManifestFile for ManifestList.
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
async fn write_added_manifest(
&mut self,
added_data_files: Vec<DataFile>,
) -> Result<ManifestFile> {
let snapshot_id = self.snapshot_id;
let content_type = {
let mut data_num = 0;
let mut delete_num = 0;
for f in &added_data_files {
match f.content_type() {
DataContentType::Data => data_num += 1,
DataContentType::PositionDeletes => delete_num += 1,
DataContentType::EqualityDeletes => delete_num += 1,
}
}
if data_num == added_data_files.len() {
ManifestContentType::Data
} else if delete_num == added_data_files.len() {
ManifestContentType::Deletes
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
"added DataFile for a ManifestFile should be same type (Data or Delete)",
));
}
};
let manifest_entries = added_data_files.into_iter().map(|data_file| {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
Expand Down Expand Up @@ -489,7 +513,10 @@ impl<'a> SnapshotProduceAction<'a> {
if self.tx.table.metadata().format_version() == FormatVersion::V1 {
builder.build_v1()
} else {
builder.build_v2_data()
match content_type {
ManifestContentType::Data => builder.build_v2_data(),
ManifestContentType::Deletes => builder.build_v2_deletes(),
}
}
};
for entry in manifest_entries {
Expand All @@ -503,12 +530,19 @@ impl<'a> SnapshotProduceAction<'a> {
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
let added_manifest = self.write_added_manifest().await?;
let mut manifest_files = vec![];
let data_files = std::mem::take(&mut self.added_data_files);
let delete_files = std::mem::take(&mut self.added_delete_files);
if !data_files.is_empty() {
let added_manifest = self.write_added_manifest(data_files).await?;
manifest_files.push(added_manifest);
}
if !delete_files.is_empty() {
let added_delete_manifest = self.write_added_manifest(delete_files).await?;
manifest_files.push(added_delete_manifest);
}
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
// # TODO
// Support process delete entries.

let mut manifest_files = vec![added_manifest];
manifest_files.extend(existing_manifests);
let manifest_files = manifest_process.process_manifeset(manifest_files);
Ok(manifest_files)
Expand Down