Skip to content

Commit a229d89

Browse files
authored
feat(io): add OSS storage implementation (#1153)
## What changes are included in this PR? - [x] Support AliyunOSS backend by OpenDAL - [x] Update doc explains why feature `storage-oss` is not included in `storage-all`. - [x] Fix typo <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> The support for AliyunOSS is based on OpenDAL, a production-verified project. Therefore, no additional testing has been added. Add a new example. Signed-off-by: divinerapier <[email protected]>
1 parent b31ebcc commit a229d89

File tree

9 files changed

+221
-2
lines changed

9 files changed

+221
-2
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/examples/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ version = { workspace = true }
2828
iceberg = { workspace = true }
2929
iceberg-catalog-rest = { workspace = true }
3030
tokio = { workspace = true, features = ["full"] }
31+
futures = { workspace = true }
3132

3233
[[example]]
3334
name = "rest-catalog-namespace"
@@ -36,3 +37,13 @@ path = "src/rest_catalog_namespace.rs"
3637
[[example]]
3738
name = "rest-catalog-table"
3839
path = "src/rest_catalog_table.rs"
40+
41+
[[example]]
42+
name = "oss-backend"
43+
path = "src/oss_backend.rs"
44+
required-features = ["storage-oss"]
45+
46+
47+
[features]
48+
default = []
49+
storage-oss = ["iceberg/storage-oss"]

crates/examples/src/oss_backend.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use futures::stream::StreamExt;
21+
use iceberg::{Catalog, NamespaceIdent, TableIdent};
22+
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
23+
24+
// Configure these values according to your environment
25+
26+
static REST_URI: &str = "http://127.0.0.1:8181";
27+
static NAMESPACE: &str = "default";
28+
static TABLE_NAME: &str = "t1";
29+
static OSS_ENDPOINT: &str = "https://oss-cn-hangzhou.aliyuncs.com/bucket";
30+
static OSS_ACCESS_KEY_ID: &str = "LTAI5t999999999999999";
31+
static OSS_ACCESS_KEY_SECRET: &str = "99999999999999999999999999999999";
32+
33+
/// This is a simple example that demonstrates how to use [`RestCatalog`] to read data from OSS.
34+
///
35+
/// The demo reads data from an existing table in OSS storage.
36+
///
37+
/// A running instance of the iceberg-rest catalog on port 8181 is required. You can find how to run
38+
/// the iceberg-rest catalog with `docker compose` in the official
39+
/// [quickstart documentation](https://iceberg.apache.org/spark-quickstart/).
40+
///
41+
/// The example also requires valid OSS credentials and endpoint to be configured.
42+
#[tokio::main]
43+
async fn main() {
44+
// Create the REST iceberg catalog.
45+
let config = RestCatalogConfig::builder()
46+
.uri(REST_URI.to_string())
47+
.props(HashMap::from([
48+
(
49+
iceberg::io::OSS_ENDPOINT.to_string(),
50+
OSS_ENDPOINT.to_string(),
51+
),
52+
(
53+
iceberg::io::OSS_ACCESS_KEY_ID.to_string(),
54+
OSS_ACCESS_KEY_ID.to_string(),
55+
),
56+
(
57+
iceberg::io::OSS_ACCESS_KEY_SECRET.to_string(),
58+
OSS_ACCESS_KEY_SECRET.to_string(),
59+
),
60+
]))
61+
.build();
62+
let catalog = RestCatalog::new(config);
63+
64+
// Create the table identifier.
65+
let namespace_ident = NamespaceIdent::from_vec(vec![NAMESPACE.to_string()]).unwrap();
66+
let table_ident = TableIdent::new(namespace_ident.clone(), TABLE_NAME.to_string());
67+
68+
// Check if the table exists.
69+
if !catalog.table_exists(&table_ident).await.unwrap() {
70+
println!("Table {TABLE_NAME} must exists.");
71+
return;
72+
}
73+
74+
let table = catalog.load_table(&table_ident).await.unwrap();
75+
println!("Table {TABLE_NAME} loaded!");
76+
77+
let scan = table.scan().select_all().build().unwrap();
78+
let reader = scan.to_arrow().await.unwrap();
79+
let buf = reader.collect::<Vec<_>>().await;
80+
81+
println!("Table {TABLE_NAME} has {} batches.", buf.len());
82+
83+
assert!(!buf.is_empty());
84+
assert!(buf.iter().all(|x| x.is_ok()));
85+
}

crates/iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ storage-fs = ["opendal/services-fs"]
3636
storage-gcs = ["opendal/services-gcs"]
3737
storage-memory = ["opendal/services-memory"]
3838
storage-s3 = ["opendal/services-s3"]
39+
storage-oss = ["opendal/services-oss"]
3940

4041
async-std = ["dep:async-std"]
4142
tokio = ["tokio/rt-multi-thread"]

crates/iceberg/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,26 @@ async fn main() -> Result<()> {
5757
Ok(())
5858
}
5959
```
60+
61+
## IO Support
62+
63+
Iceberg Rust provides various storage backends through feature flags. Here are the currently supported storage backends:
64+
65+
| Storage Backend | Feature Flag | Status | Description |
66+
|----------------|--------------|--------|-------------|
67+
| Memory | `storage-memory` | ✅ Stable | In-memory storage for testing and development |
68+
| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage |
69+
| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage |
70+
| Google Cloud Storage | `storage-gcs` | ✅ Stable | Google Cloud Storage |
71+
| Alibaba Cloud OSS | `storage-oss` | 🧪 Experimental | Alibaba Cloud Object Storage Service |
72+
73+
You can enable all stable storage backends at once using the `storage-all` feature flag.
74+
75+
> Note that `storage-oss` is currently experimental and not included in `storage-all`.
76+
77+
Example usage in `Cargo.toml`:
78+
79+
```toml
80+
[dependencies]
81+
iceberg = { version = "x.y.z", features = ["storage-s3", "storage-fs"] }
82+
```

crates/iceberg/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ define_from_err!(
268268
define_from_err!(
269269
std::array::TryFromSliceError,
270270
ErrorKind::DataInvalid,
271-
"failed to convert byte slive to array"
271+
"failed to convert byte slice to array"
272272
);
273273

274274
define_from_err!(

crates/iceberg/src/io/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ mod storage_gcs;
8989
#[cfg(feature = "storage-gcs")]
9090
pub use storage_gcs::*;
9191

92+
#[cfg(feature = "storage-oss")]
93+
mod storage_oss;
94+
#[cfg(feature = "storage-oss")]
95+
pub use storage_oss::*;
96+
9297
pub(crate) fn is_truthy(value: &str) -> bool {
9398
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
9499
}

crates/iceberg/src/io/storage.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use std::sync::Arc;
2020
use opendal::layers::RetryLayer;
2121
#[cfg(feature = "storage-gcs")]
2222
use opendal::services::GcsConfig;
23+
#[cfg(feature = "storage-oss")]
24+
use opendal::services::OssConfig;
2325
#[cfg(feature = "storage-s3")]
2426
use opendal::services::S3Config;
2527
use opendal::{Operator, Scheme};
@@ -41,6 +43,8 @@ pub(crate) enum Storage {
4143
scheme_str: String,
4244
config: Arc<S3Config>,
4345
},
46+
#[cfg(feature = "storage-oss")]
47+
Oss { config: Arc<OssConfig> },
4448
#[cfg(feature = "storage-gcs")]
4549
Gcs { config: Arc<GcsConfig> },
4650
}
@@ -65,6 +69,10 @@ impl Storage {
6569
Scheme::Gcs => Ok(Self::Gcs {
6670
config: super::gcs_config_parse(props)?.into(),
6771
}),
72+
#[cfg(feature = "storage-oss")]
73+
Scheme::Oss => Ok(Self::Oss {
74+
config: super::oss_config_parse(props)?.into(),
75+
}),
6876
// Update doc on [`FileIO`] when adding new schemes.
6977
_ => Err(Error::new(
7078
ErrorKind::FeatureUnsupported,
@@ -125,6 +133,7 @@ impl Storage {
125133
))
126134
}
127135
}
136+
128137
#[cfg(feature = "storage-gcs")]
129138
Storage::Gcs { config } => {
130139
let operator = super::gcs_config_build(config, path)?;
@@ -138,10 +147,26 @@ impl Storage {
138147
))
139148
}
140149
}
150+
#[cfg(feature = "storage-oss")]
151+
Storage::Oss { config } => {
152+
let op = super::oss_config_build(config, path)?;
153+
154+
// Check prefix of oss path.
155+
let prefix = format!("oss://{}/", op.info().name());
156+
if path.starts_with(&prefix) {
157+
Ok((op, &path[prefix.len()..]))
158+
} else {
159+
Err(Error::new(
160+
ErrorKind::DataInvalid,
161+
format!("Invalid oss url: {}, should start with {}", path, prefix),
162+
))
163+
}
164+
}
141165
#[cfg(all(
142166
not(feature = "storage-s3"),
143167
not(feature = "storage-fs"),
144-
not(feature = "storage-gcs")
168+
not(feature = "storage-gcs"),
169+
not(feature = "storage-oss")
145170
))]
146171
_ => Err(Error::new(
147172
ErrorKind::FeatureUnsupported,
@@ -163,6 +188,7 @@ impl Storage {
163188
"file" | "" => Ok(Scheme::Fs),
164189
"s3" | "s3a" => Ok(Scheme::S3),
165190
"gs" | "gcs" => Ok(Scheme::Gcs),
191+
"oss" => Ok(Scheme::Oss),
166192
s => Ok(s.parse::<Scheme>()?),
167193
}
168194
}

crates/iceberg/src/io/storage_oss.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use opendal::services::OssConfig;
21+
use opendal::{Configurator, Operator};
22+
use url::Url;
23+
24+
use crate::{Error, ErrorKind, Result};
25+
26+
/// Required configuration arguments for creating an Aliyun OSS Operator with OpenDAL:
27+
/// - `oss.endpoint`: The OSS service endpoint URL
28+
/// - `oss.access-key-id`: The access key ID for authentication
29+
/// - `oss.access-key-secret`: The access key secret for authentication
30+
/// Aliyun oss endpoint.
31+
pub const OSS_ENDPOINT: &str = "oss.endpoint";
32+
/// Aliyun oss access key id.
33+
pub const OSS_ACCESS_KEY_ID: &str = "oss.access-key-id";
34+
/// Aliyun oss access key secret.
35+
pub const OSS_ACCESS_KEY_SECRET: &str = "oss.access-key-secret";
36+
37+
/// Parse iceberg props to oss config.
38+
pub(crate) fn oss_config_parse(mut m: HashMap<String, String>) -> Result<OssConfig> {
39+
let mut cfg: OssConfig = OssConfig::default();
40+
if let Some(endpoint) = m.remove(OSS_ENDPOINT) {
41+
cfg.endpoint = Some(endpoint);
42+
};
43+
if let Some(access_key_id) = m.remove(OSS_ACCESS_KEY_ID) {
44+
cfg.access_key_id = Some(access_key_id);
45+
};
46+
if let Some(access_key_secret) = m.remove(OSS_ACCESS_KEY_SECRET) {
47+
cfg.access_key_secret = Some(access_key_secret);
48+
};
49+
50+
Ok(cfg)
51+
}
52+
53+
/// Build new opendal operator from give path.
54+
pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result<Operator> {
55+
let url = Url::parse(path)?;
56+
let bucket = url.host_str().ok_or_else(|| {
57+
Error::new(
58+
ErrorKind::DataInvalid,
59+
format!("Invalid oss url: {}, missing bucket", path),
60+
)
61+
})?;
62+
63+
let builder = cfg.clone().into_builder().bucket(bucket);
64+
65+
Ok(Operator::new(builder)?.finish())
66+
}

0 commit comments

Comments
 (0)