Skip to content

Commit c9ffbce

Browse files
authored
feat(iceberg): Add memory file IO support (#481)
* feat(iceberg): Add memory file IO support Signed-off-by: Xuanwo <[email protected]> * Fix typo Signed-off-by: Xuanwo <[email protected]> * Add comments for memory file io Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]>
1 parent e4d8001 commit c9ffbce

File tree

12 files changed

+188
-95
lines changed

12 files changed

+188
-95
lines changed

crates/iceberg/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ license = { workspace = true }
2929
keywords = ["iceberg"]
3030

3131
[features]
32-
default = ["storage-fs", "storage-s3", "tokio"]
33-
storage-all = ["storage-fs", "storage-s3"]
32+
default = ["storage-memory", "storage-fs", "storage-s3", "tokio"]
33+
storage-all = ["storage-memory", "storage-fs", "storage-s3"]
3434

35+
storage-memory = ["opendal/services-memory"]
3536
storage-fs = ["opendal/services-fs"]
3637
storage-s3 = ["opendal/services-s3"]
3738

crates/iceberg/src/expr/predicate.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -478,10 +478,10 @@ impl Predicate {
478478
/// # Example
479479
///
480480
/// ```rust
481-
/// use std::ops::Bound::Unbounded;
482481
/// use iceberg::expr::BoundPredicate::Unary;
483482
/// use iceberg::expr::Reference;
484483
/// use iceberg::spec::Datum;
484+
/// use std::ops::Bound::Unbounded;
485485
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
486486
///
487487
/// let expr2 = Reference::new("b").less_than(Datum::long(20));
@@ -505,10 +505,10 @@ impl Predicate {
505505
/// # Example
506506
///
507507
/// ```rust
508-
/// use std::ops::Bound::Unbounded;
509508
/// use iceberg::expr::BoundPredicate::Unary;
510509
/// use iceberg::expr::Reference;
511510
/// use iceberg::spec::Datum;
511+
/// use std::ops::Bound::Unbounded;
512512
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
513513
///
514514
/// let expr2 = Reference::new("b").less_than(Datum::long(20));
@@ -534,12 +534,14 @@ impl Predicate {
534534
/// # Example
535535
///
536536
/// ```rust
537-
/// use std::ops::Bound::Unbounded;
538537
/// use iceberg::expr::BoundPredicate::Unary;
539538
/// use iceberg::expr::{LogicalExpression, Predicate, Reference};
540539
/// use iceberg::spec::Datum;
540+
/// use std::ops::Bound::Unbounded;
541541
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
542-
/// let expr2 = Reference::new("b").less_than(Datum::long(5)).and(Reference::new("c").less_than(Datum::long(10)));
542+
/// let expr2 = Reference::new("b")
543+
/// .less_than(Datum::long(5))
544+
/// .and(Reference::new("c").less_than(Datum::long(10)));
543545
///
544546
/// let result = expr1.negate();
545547
/// assert_eq!(&format!("{result}"), "a >= 10");
@@ -632,16 +634,16 @@ impl Not for Predicate {
632634
/// # Example
633635
///
634636
///```rust
635-
///use std::ops::Bound::Unbounded;
636-
///use iceberg::expr::BoundPredicate::Unary;
637-
///use iceberg::expr::Reference;
638-
///use iceberg::spec::Datum;
639-
///let expr1 = Reference::new("a").less_than(Datum::long(10));
640-
///
641-
///let expr = !expr1;
642-
///
643-
///assert_eq!(&format!("{expr}"), "NOT (a < 10)");
644-
///```
637+
/// use iceberg::expr::BoundPredicate::Unary;
638+
/// use iceberg::expr::Reference;
639+
/// use iceberg::spec::Datum;
640+
/// use std::ops::Bound::Unbounded;
641+
/// let expr1 = Reference::new("a").less_than(Datum::long(10));
642+
///
643+
/// let expr = !expr1;
644+
///
645+
/// assert_eq!(&format!("{expr}"), "NOT (a < 10)");
646+
/// ```
645647
fn not(self) -> Self::Output {
646648
Predicate::Not(LogicalExpression::new([Box::new(self)]))
647649
}

crates/iceberg/src/expr/term.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ impl Reference {
5656
/// # Example
5757
///
5858
/// ```rust
59-
///
6059
/// use iceberg::expr::Reference;
6160
/// use iceberg::spec::Datum;
6261
/// let expr = Reference::new("a").less_than(Datum::long(10));
@@ -76,7 +75,6 @@ impl Reference {
7675
/// # Example
7776
///
7877
/// ```rust
79-
///
8078
/// use iceberg::expr::Reference;
8179
/// use iceberg::spec::Datum;
8280
/// let expr = Reference::new("a").less_than_or_equal_to(Datum::long(10));
@@ -96,7 +94,6 @@ impl Reference {
9694
/// # Example
9795
///
9896
/// ```rust
99-
///
10097
/// use iceberg::expr::Reference;
10198
/// use iceberg::spec::Datum;
10299
/// let expr = Reference::new("a").greater_than(Datum::long(10));
@@ -116,7 +113,6 @@ impl Reference {
116113
/// # Example
117114
///
118115
/// ```rust
119-
///
120116
/// use iceberg::expr::Reference;
121117
/// use iceberg::spec::Datum;
122118
/// let expr = Reference::new("a").greater_than_or_equal_to(Datum::long(10));
@@ -136,7 +132,6 @@ impl Reference {
136132
/// # Example
137133
///
138134
/// ```rust
139-
///
140135
/// use iceberg::expr::Reference;
141136
/// use iceberg::spec::Datum;
142137
/// let expr = Reference::new("a").equal_to(Datum::long(10));
@@ -152,7 +147,6 @@ impl Reference {
152147
/// # Example
153148
///
154149
/// ```rust
155-
///
156150
/// use iceberg::expr::Reference;
157151
/// use iceberg::spec::Datum;
158152
/// let expr = Reference::new("a").not_equal_to(Datum::long(10));
@@ -168,7 +162,6 @@ impl Reference {
168162
/// # Example
169163
///
170164
/// ```rust
171-
///
172165
/// use iceberg::expr::Reference;
173166
/// use iceberg::spec::Datum;
174167
/// let expr = Reference::new("a").starts_with(Datum::string("foo"));
@@ -188,7 +181,6 @@ impl Reference {
188181
/// # Example
189182
///
190183
/// ```rust
191-
///
192184
/// use iceberg::expr::Reference;
193185
/// use iceberg::spec::Datum;
194186
///
@@ -209,7 +201,6 @@ impl Reference {
209201
/// # Example
210202
///
211203
/// ```rust
212-
///
213204
/// use iceberg::expr::Reference;
214205
/// use iceberg::spec::Datum;
215206
/// let expr = Reference::new("a").is_nan();
@@ -225,7 +216,6 @@ impl Reference {
225216
/// # Example
226217
///
227218
/// ```rust
228-
///
229219
/// use iceberg::expr::Reference;
230220
/// use iceberg::spec::Datum;
231221
/// let expr = Reference::new("a").is_not_nan();
@@ -241,7 +231,6 @@ impl Reference {
241231
/// # Example
242232
///
243233
/// ```rust
244-
///
245234
/// use iceberg::expr::Reference;
246235
/// use iceberg::spec::Datum;
247236
/// let expr = Reference::new("a").is_null();
@@ -257,7 +246,6 @@ impl Reference {
257246
/// # Example
258247
///
259248
/// ```rust
260-
///
261249
/// use iceberg::expr::Reference;
262250
/// use iceberg::spec::Datum;
263251
/// let expr = Reference::new("a").is_not_null();
@@ -273,7 +261,6 @@ impl Reference {
273261
/// # Example
274262
///
275263
/// ```rust
276-
///
277264
/// use fnv::FnvHashSet;
278265
/// use iceberg::expr::Reference;
279266
/// use iceberg::spec::Datum;
@@ -295,7 +282,6 @@ impl Reference {
295282
/// # Example
296283
///
297284
/// ```rust
298-
///
299285
/// use fnv::FnvHashSet;
300286
/// use iceberg::expr::Reference;
301287
/// use iceberg::spec::Datum;

crates/iceberg/src/io/mod.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,41 @@
2020
//! # How to build `FileIO`
2121
//!
2222
//! We provided a `FileIOBuilder` to build `FileIO` from scratch. For example:
23+
//!
2324
//! ```rust
2425
//! use iceberg::io::{FileIOBuilder, S3_REGION};
26+
//! use iceberg::Result;
2527
//!
28+
//! # fn test() -> Result<()> {
29+
//! // Build a memory file io.
30+
//! let file_io = FileIOBuilder::new("memory").build()?;
31+
//! // Build an fs file io.
32+
//! let file_io = FileIOBuilder::new("fs").build()?;
33+
//! // Build an s3 file io.
2634
//! let file_io = FileIOBuilder::new("s3")
2735
//! .with_prop(S3_REGION, "us-east-1")
28-
//! .build()
29-
//! .unwrap();
36+
//! .build()?;
37+
//! # Ok(())
38+
//! # }
3039
//! ```
3140
//!
3241
//! Or you can pass a path to ask `FileIO` to infer schema for you:
42+
//!
3343
//! ```rust
3444
//! use iceberg::io::{FileIO, S3_REGION};
35-
//! let file_io = FileIO::from_path("s3://bucket/a")
36-
//! .unwrap()
45+
//! use iceberg::Result;
46+
//!
47+
//! # fn test() -> Result<()> {
48+
//! // Build a memory file io.
49+
//! let file_io = FileIO::from_path("memory:///")?.build()?;
50+
//! // Build an fs file io.
51+
//! let file_io = FileIO::from_path("fs:///tmp")?.build()?;
52+
//! // Build an s3 file io.
53+
//! let file_io = FileIO::from_path("s3://bucket/a")?
3754
//! .with_prop(S3_REGION, "us-east-1")
38-
//! .build()
39-
//! .unwrap();
55+
//! .build()?;
56+
//! # Ok(())
57+
//! # }
4058
//! ```
4159
//!
4260
//! # How to use `FileIO`
@@ -52,6 +70,10 @@ mod file_io;
5270
pub use file_io::*;
5371

5472
mod storage;
73+
#[cfg(feature = "storage-memory")]
74+
mod storage_memory;
75+
#[cfg(feature = "storage-memory")]
76+
use storage_memory::*;
5577
#[cfg(feature = "storage-s3")]
5678
mod storage_s3;
5779
#[cfg(feature = "storage-s3")]

crates/iceberg/src/io/storage.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
use super::FileIOBuilder;
1919
#[cfg(feature = "storage-fs")]
2020
use super::FsConfig;
21+
#[cfg(feature = "storage-memory")]
22+
use super::MemoryConfig;
2123
#[cfg(feature = "storage-s3")]
2224
use super::S3Config;
2325
use crate::{Error, ErrorKind};
@@ -26,6 +28,8 @@ use opendal::{Operator, Scheme};
2628
/// The storage carries all supported storage services in iceberg
2729
#[derive(Debug)]
2830
pub(crate) enum Storage {
31+
#[cfg(feature = "storage-memory")]
32+
Memory { config: MemoryConfig },
2933
#[cfg(feature = "storage-fs")]
3034
LocalFs { config: FsConfig },
3135
#[cfg(feature = "storage-s3")]
@@ -44,6 +48,10 @@ impl Storage {
4448
let scheme = Self::parse_scheme(&scheme_str)?;
4549

4650
match scheme {
51+
#[cfg(feature = "storage-memory")]
52+
Scheme::Memory => Ok(Self::Memory {
53+
config: MemoryConfig::new(props),
54+
}),
4755
#[cfg(feature = "storage-fs")]
4856
Scheme::Fs => Ok(Self::LocalFs {
4957
config: FsConfig::new(props),
@@ -72,13 +80,22 @@ impl Storage {
7280
///
7381
/// * An [`opendal::Operator`] instance used to operate on file.
7482
/// * Relative path to the root uri of [`opendal::Operator`].
75-
///
7683
pub(crate) fn create_operator<'a>(
7784
&self,
7885
path: &'a impl AsRef<str>,
7986
) -> crate::Result<(Operator, &'a str)> {
8087
let path = path.as_ref();
8188
match self {
89+
#[cfg(feature = "storage-memory")]
90+
Storage::Memory { config } => {
91+
let op = config.build(path)?;
92+
93+
if let Some(stripped) = path.strip_prefix("memory:/") {
94+
Ok((op, stripped))
95+
} else {
96+
Ok((op, &path[1..]))
97+
}
98+
}
8299
#[cfg(feature = "storage-fs")]
83100
Storage::LocalFs { config } => {
84101
let op = config.build(path)?;
@@ -116,6 +133,7 @@ impl Storage {
116133
/// Parse scheme.
117134
fn parse_scheme(scheme: &str) -> crate::Result<Scheme> {
118135
match scheme {
136+
"memory" => Ok(Scheme::Memory),
119137
"file" | "" => Ok(Scheme::Fs),
120138
"s3" | "s3a" => Ok(Scheme::S3),
121139
s => Ok(s.parse::<Scheme>()?),
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::Result;
19+
use opendal::{Operator, Scheme};
20+
use std::collections::HashMap;
21+
use std::fmt::{Debug, Formatter};
22+
23+
#[derive(Default, Clone)]
24+
pub(crate) struct MemoryConfig {}
25+
26+
impl Debug for MemoryConfig {
27+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
28+
f.debug_struct("MemoryConfig").finish()
29+
}
30+
}
31+
32+
impl MemoryConfig {
33+
/// Decode from iceberg props.
34+
pub fn new(_: HashMap<String, String>) -> Self {
35+
Self::default()
36+
}
37+
38+
/// Build new opendal operator from given path.
39+
pub fn build(&self, _: &str) -> Result<Operator> {
40+
let m = HashMap::new();
41+
Ok(Operator::via_map(Scheme::Memory, m)?)
42+
}
43+
}

crates/iceberg/src/spec/datatypes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/*!
1919
* Data Types
20-
*/
20+
*/
2121
use crate::ensure_data_valid;
2222
use crate::error::Result;
2323
use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};

crates/iceberg/src/spec/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/*!
1919
* Partitioning
20-
*/
20+
*/
2121
use serde::{Deserialize, Serialize};
2222
use std::sync::Arc;
2323
use typed_builder::TypedBuilder;

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/*!
1919
* Snapshots
20-
*/
20+
*/
2121
use crate::error::Result;
2222
use chrono::{DateTime, TimeZone, Utc};
2323
use serde::{Deserialize, Serialize};

crates/iceberg/src/spec/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/*!
1919
* Sorting
20-
*/
20+
*/
2121
use crate::error::Result;
2222
use crate::spec::Schema;
2323
use crate::{Error, ErrorKind};

0 commit comments

Comments
 (0)