Skip to content

Commit ac2fc39

Browse files
committed
feat(iceberg): introduce remove snapshot action (#21)
* feat(iceberg): basic remove snapshot * feat(iceberg): introduce new properties for remove snapshots * feat(iceberg): support remove schemas * refactor(iceberg): refactor file org * address comments * refactor(iceberg): refactor and ut * fix(iceberg): fix integration-test
1 parent 52e09d7 commit ac2fc39

File tree

8 files changed

+354
-1
lines changed

8 files changed

+354
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/spec/snapshot.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ impl Snapshot {
175175
}
176176

177177
/// Get parent snapshot.
178-
#[cfg(test)]
179178
pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option<SnapshotRef> {
180179
match self.parent_snapshot_id {
181180
Some(id) => table_metadata.snapshot_by_id(id).cloned(),
@@ -401,6 +400,33 @@ impl SnapshotRetention {
401400
}
402401
}
403402

403+
/// An iterator over the ancestors of a snapshot.
404+
pub struct AncestorIterator<'a> {
405+
current: Option<SnapshotRef>,
406+
table_metadata: &'a TableMetadata,
407+
}
408+
409+
impl<'a> Iterator for AncestorIterator<'a> {
410+
type Item = SnapshotRef;
411+
412+
fn next(&mut self) -> Option<Self::Item> {
413+
let current = self.current.take()?;
414+
415+
let next = current.parent_snapshot(self.table_metadata);
416+
self.current = next;
417+
418+
Some(current)
419+
}
420+
}
421+
422+
/// Returns an iterator over the ancestors of a snapshot.
423+
pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> {
424+
AncestorIterator {
425+
current: Some(snapshot),
426+
table_metadata,
427+
}
428+
}
429+
404430
#[cfg(test)]
405431
mod tests {
406432
use std::collections::HashMap;

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,19 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previo
7878
/// Default value for max number of previous versions to keep.
7979
pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
8080

81+
/// Property key for max snapshot age in milliseconds.
82+
pub const MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms";
83+
/// Default value for max snapshot age in milliseconds.
84+
pub const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000; // 5 days
85+
/// Property key for min snapshots to keep.
86+
pub const MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep";
87+
/// Default value for min snapshots to keep.
88+
pub const MIN_SNAPSHOTS_TO_KEEP_DEFAULT: i32 = 1;
89+
/// Property key for max reference age in milliseconds.
90+
pub const MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms";
91+
/// Default value for max reference age in milliseconds.
92+
pub const MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX;
93+
8194
/// Reserved Iceberg table properties list.
8295
///
8396
/// Reserved table properties are only used to control behaviors when creating or updating a

crates/iceberg/src/transaction/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module contains transaction api.
1919
2020
mod append;
21+
pub mod remove_snapshots;
2122
mod snapshot;
2223
mod sort_order;
2324

@@ -26,6 +27,7 @@ use std::collections::HashMap;
2627
use std::mem::discriminant;
2728
use std::sync::Arc;
2829

30+
use remove_snapshots::RemoveSnapshotAction;
2931
use uuid::Uuid;
3032

3133
use crate::error::Result;
@@ -190,6 +192,11 @@ impl<'a> Transaction<'a> {
190192
}
191193
}
192194

195+
/// Creates remove snapshot action.
196+
pub fn expire_snapshot(self) -> RemoveSnapshotAction<'a> {
197+
RemoveSnapshotAction::new(self)
198+
}
199+
193200
/// Remove properties in table.
194201
pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
195202
self.apply(
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
{
2+
"format-version": 2,
3+
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
4+
"location": "s3://bucket/test/location",
5+
"last-sequence-number": 34,
6+
"last-updated-ms": 1675100955770,
7+
"last-column-id": 3,
8+
"current-schema-id": 1,
9+
"schemas": [
10+
{
11+
"type": "struct",
12+
"schema-id": 0,
13+
"fields": [
14+
{
15+
"id": 1,
16+
"name": "x",
17+
"required": true,
18+
"type": "long"
19+
}
20+
]
21+
},
22+
{
23+
"type": "struct",
24+
"schema-id": 1,
25+
"identifier-field-ids": [
26+
1,
27+
2
28+
],
29+
"fields": [
30+
{
31+
"id": 1,
32+
"name": "x",
33+
"required": true,
34+
"type": "long"
35+
},
36+
{
37+
"id": 2,
38+
"name": "y",
39+
"required": true,
40+
"type": "long",
41+
"doc": "comment"
42+
},
43+
{
44+
"id": 3,
45+
"name": "z",
46+
"required": true,
47+
"type": "long"
48+
}
49+
]
50+
}
51+
],
52+
"default-spec-id": 0,
53+
"partition-specs": [
54+
{
55+
"spec-id": 0,
56+
"fields": [
57+
{
58+
"name": "x",
59+
"transform": "identity",
60+
"source-id": 1,
61+
"field-id": 1000
62+
}
63+
]
64+
}
65+
],
66+
"last-partition-id": 1000,
67+
"default-sort-order-id": 3,
68+
"sort-orders": [
69+
{
70+
"order-id": 3,
71+
"fields": [
72+
{
73+
"transform": "identity",
74+
"source-id": 2,
75+
"direction": "asc",
76+
"null-order": "nulls-first"
77+
},
78+
{
79+
"transform": "bucket[4]",
80+
"source-id": 3,
81+
"direction": "desc",
82+
"null-order": "nulls-last"
83+
}
84+
]
85+
}
86+
],
87+
"properties": {},
88+
"current-snapshot-id": 3067729675574597004,
89+
"snapshots": [
90+
{
91+
"snapshot-id": 3051729675574597004,
92+
"timestamp-ms": 1515100955770,
93+
"sequence-number": 0,
94+
"summary": {
95+
"operation": "append"
96+
},
97+
"manifest-list": "s3://a/b/1.avro"
98+
},
99+
{
100+
"snapshot-id": 3055729675574597004,
101+
"parent-snapshot-id": 3051729675574597004,
102+
"timestamp-ms": 1555100955770,
103+
"sequence-number": 1,
104+
"summary": {
105+
"operation": "append"
106+
},
107+
"manifest-list": "s3://a/b/2.avro",
108+
"schema-id": 1
109+
},
110+
{
111+
"snapshot-id": 3059729675574597004,
112+
"parent-snapshot-id": 3055729675574597004,
113+
"timestamp-ms": 1595100955770,
114+
"sequence-number": 1,
115+
"summary": {
116+
"operation": "append"
117+
},
118+
"manifest-list": "s3://a/b/3.avro",
119+
"schema-id": 1
120+
},
121+
{
122+
"snapshot-id": 3063729675574597004,
123+
"parent-snapshot-id": 3059729675574597004,
124+
"timestamp-ms": 1635100955770,
125+
"sequence-number": 1,
126+
"summary": {
127+
"operation": "append"
128+
},
129+
"manifest-list": "s3://a/b/4.avro",
130+
"schema-id": 1
131+
},
132+
{
133+
"snapshot-id": 3067729675574597004,
134+
"parent-snapshot-id": 3063729675574597004,
135+
"timestamp-ms": 1675100955770,
136+
"sequence-number": 1,
137+
"summary": {
138+
"operation": "append"
139+
},
140+
"manifest-list": "s3://a/b/5.avro",
141+
"schema-id": 1
142+
}
143+
],
144+
"snapshot-log": [
145+
{
146+
"snapshot-id": 3051729675574597004,
147+
"timestamp-ms": 1515100955770
148+
},
149+
{
150+
"snapshot-id": 3055729675574597004,
151+
"timestamp-ms": 1555100955770
152+
},
153+
{
154+
"snapshot-id": 3059729675574597004,
155+
"timestamp-ms": 1595100955770
156+
},
157+
{
158+
"snapshot-id": 3063729675574597004,
159+
"timestamp-ms": 1635100955770
160+
},
161+
{
162+
"snapshot-id": 3067729675574597004,
163+
"timestamp-ms": 1675100955770
164+
}
165+
],
166+
"metadata-log": []
167+
}

crates/integration_tests/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ rust-version = { workspace = true }
2727
[dependencies]
2828
arrow-array = { workspace = true }
2929
arrow-schema = { workspace = true }
30+
chrono = { workspace = true }
3031
ctor = { workspace = true }
3132
datafusion = { workspace = true }
3233
futures = { workspace = true }

crates/integration_tests/tests/shared_tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod conflict_commit_test;
2929
mod datafusion;
3030
mod read_evolved_schema;
3131
mod read_positional_deletes;
32+
mod remove_snapshots_test;
3233
mod scan_all_type;
3334

3435
pub async fn random_ns() -> Namespace {

0 commit comments

Comments
 (0)