Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod remove_snapshots;
mod rewrite_files;
mod snapshot;
mod sort_order;
mod utils;

use std::cmp::Ordering;
use std::collections::HashMap;
Expand Down
73 changes: 62 additions & 11 deletions crates/iceberg/src/transaction/remove_snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,27 @@
//! Transaction action for removing snapshot.

use std::collections::{HashMap, HashSet};
use std::ops::Deref;

use itertools::Itertools;

use super::utils::ReachableFileCleanupStrategy;
use crate::error::Result;
use crate::io::FileIO;
use crate::spec::{
SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT,
MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP,
MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
SnapshotReference, SnapshotRetention, TableMetadataRef, MAIN_BRANCH, MAX_REF_AGE_MS,
MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT,
MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
};
use crate::table::Table;
use crate::transaction::Transaction;
use crate::utils::ancestors_of;
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
use crate::{Catalog, Error, ErrorKind, TableRequirement, TableUpdate};

/// RemoveSnapshotAction is a transaction action for removing snapshot.
pub struct RemoveSnapshotAction<'a> {
tx: Transaction<'a>,
clear_expire_files: bool,
clear_expired_files: bool,
ids_to_remove: HashSet<i64>,
default_expired_older_than: i64,
default_min_num_snapshots: i32,
Expand Down Expand Up @@ -69,7 +73,7 @@ impl<'a> RemoveSnapshotAction<'a> {

Self {
tx,
clear_expire_files: false,
clear_expired_files: false,
ids_to_remove: HashSet::new(),
default_expired_older_than: now - default_max_snapshot_age_ms,
default_min_num_snapshots,
Expand All @@ -80,8 +84,8 @@ impl<'a> RemoveSnapshotAction<'a> {
}

/// Finished building the action and apply it to the transaction.
pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self {
self.clear_expire_files = clear_expire_files;
pub fn clear_expired_files(mut self, clear_expired_files: bool) -> Self {
self.clear_expired_files = clear_expired_files;
self
}

Expand Down Expand Up @@ -110,9 +114,12 @@ impl<'a> RemoveSnapshotAction<'a> {
}

/// Finished building the action and apply it to the transaction.
pub async fn apply(mut self) -> Result<Transaction<'a>> {
pub async fn apply(mut self) -> Result<RemoveSnapshotApplyResult<'a>> {
if self.tx.current_table.metadata().refs.is_empty() {
return Ok(self.tx);
return Ok(RemoveSnapshotApplyResult {
tx: self.tx,
clear_expired_files: self.clear_expired_files,
});
}

let table_meta = self.tx.current_table.metadata().clone();
Expand Down Expand Up @@ -255,7 +262,10 @@ impl<'a> RemoveSnapshotAction<'a> {
},
])?;

Ok(self.tx)
Ok(RemoveSnapshotApplyResult {
tx: self.tx,
clear_expired_files: self.clear_expired_files,
})
}

fn compute_retained_refs(
Expand Down Expand Up @@ -403,6 +413,47 @@ impl<'a> RemoveSnapshotAction<'a> {
}
}

pub struct RemoveSnapshotApplyResult<'a> {
tx: Transaction<'a>,
clear_expired_files: bool,
}

impl RemoveSnapshotApplyResult<'_> {
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
let after_expiration = self.tx.current_table.metadata_ref();
let before_expiration = self.tx.base_table.metadata_ref();
let file_io = self.tx.current_table.file_io().clone();

let table = self.tx.commit(catalog).await?;

if self.clear_expired_files {
Self::clean_expired_files(file_io, &before_expiration, &after_expiration).await?;
}

Ok(table)
}

async fn clean_expired_files(
file_io: FileIO,
before_expiration: &TableMetadataRef,
after_expiration: &TableMetadataRef,
) -> Result<()> {
let file_cleanup_strategy = ReachableFileCleanupStrategy::new(file_io);

file_cleanup_strategy
.clean_files(before_expiration, after_expiration)
.await
}
}

impl<'a> Deref for RemoveSnapshotApplyResult<'a> {
type Target = Transaction<'a>;

fn deref(&self) -> &Self::Target {
&self.tx
}
}

#[cfg(test)]
mod tests {
use std::fs::File;
Expand Down
5 changes: 4 additions & 1 deletion crates/iceberg/src/transaction/rewrite_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,16 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
})
.collect();

if found_deleted_files.is_empty() {
if found_deleted_files.is_empty()
&& (manifest_file.has_added_files() || manifest_file.has_existing_files())
{
existing_files.push(manifest_file.clone());
} else {
// Rewrite the manifest file without the deleted data files
if manifest
.entries()
.iter()
.filter(|entry| entry.status() != ManifestStatus::Deleted)
.any(|entry| !found_deleted_files.contains(entry.data_file().file_path()))
{
let mut manifest_writer = snapshot_produce.new_manifest_writer(
Expand Down
149 changes: 149 additions & 0 deletions crates/iceberg/src/transaction/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::sync::Arc;

use crate::error::Result;
use crate::io::FileIO;
use crate::spec::{ManifestFile, Snapshot, TableMetadataRef};

pub struct ReachableFileCleanupStrategy {
file_io: FileIO,
}

impl ReachableFileCleanupStrategy {
pub fn new(file_io: FileIO) -> Self {
Self { file_io }
}
}

impl ReachableFileCleanupStrategy {
pub async fn clean_files(
&self,
before_expiration: &TableMetadataRef,
after_expiration: &TableMetadataRef,
) -> Result<()> {
let mut manifest_lists_to_delete: HashSet<&str> = HashSet::default();
let mut expired_snapshots = Vec::default();
for snapshot in before_expiration.snapshots() {
if after_expiration
.snapshot_by_id(snapshot.snapshot_id())
.is_none()
{
expired_snapshots.push(snapshot);
manifest_lists_to_delete.insert(snapshot.manifest_list());
}
}

let deletion_candidates = {
let mut deletion_candidates = HashSet::default();
for snapshot in expired_snapshots {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, before_expiration)
.await?;

for manifest_file in manifest_list.entries() {
deletion_candidates.insert(manifest_file.clone());
}
}
deletion_candidates
};

if !deletion_candidates.is_empty() {
let (manifests_to_delete, referenced_manifests) = self
.prune_referenced_manifests(
after_expiration.snapshots(),
after_expiration,
deletion_candidates,
)
.await?;

if !manifests_to_delete.is_empty() {
let files_to_delete = self
.find_files_to_delete(&manifests_to_delete, &referenced_manifests)
.await?;

for file in files_to_delete {
self.file_io.delete(file).await?;
}

for manifest_file in manifests_to_delete {
self.file_io.delete(manifest_file.manifest_path).await?;
}
}
}

for manifest_list_path in manifest_lists_to_delete {
self.file_io.delete(manifest_list_path).await?;
}

Ok(())
}

async fn prune_referenced_manifests(
&self,
snapshots: impl Iterator<Item = &Arc<Snapshot>>,
table_meta_data_ref: &TableMetadataRef,
mut deletion_candidates: HashSet<ManifestFile>,
) -> Result<(HashSet<ManifestFile>, HashSet<ManifestFile>)> {
let mut referenced_manifests = HashSet::default();
for snapshot in snapshots {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, table_meta_data_ref)
.await?;

for manifest_file in manifest_list.entries() {
deletion_candidates.remove(manifest_file);
referenced_manifests.insert(manifest_file.clone());

if deletion_candidates.is_empty() {
break;
}
}
}

Ok((deletion_candidates, referenced_manifests))
}

async fn find_files_to_delete(
&self,
manifest_files: &HashSet<ManifestFile>,
referenced_manifests: &HashSet<ManifestFile>,
) -> Result<HashSet<String>> {
let mut files_to_delete = HashSet::default();
for manifest_file in manifest_files {
let m = manifest_file.load_manifest(&self.file_io).await.unwrap();
for entry in m.entries() {
files_to_delete.insert(entry.data_file().file_path().to_owned());
}
}

if files_to_delete.is_empty() {
return Ok(files_to_delete);
}

for manifest_file in referenced_manifests {
let m = manifest_file.load_manifest(&self.file_io).await.unwrap();
for entry in m.entries() {
files_to_delete.remove(entry.data_file().file_path());
}
}

Ok(files_to_delete)
}
}
Loading
Loading