Skip to content

Commit b7c15f9

Browse files
committed
feat: initialise SQL Catalog
Signed-off-by: callum-ryan <[email protected]>
1 parent 4083f81 commit b7c15f9

File tree

4 files changed

+379
-0
lines changed

4 files changed

+379
-0
lines changed

crates/catalog/sql/Cargo.toml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-catalog-sql"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Rust Sql Catalog"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "sql", "catalog"]
30+
31+
[dependencies]
32+
async-trait = { workspace = true }
33+
iceberg = { workspace = true }
34+
serde_json = { workspace = true }
35+
sqlx = { version = "0.7.4", features = ["tls-rustls", "any"], default-features = false }
36+
typed-builder = { workspace = true }
37+
uuid = { workspace = true, features = ["v4"] }
38+
39+
[dev-dependencies]
40+
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
41+
itertools = { workspace = true }
42+
regex = "1.10.5"
43+
sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "migrate"], default-features = false }
44+
tempfile = { workspace = true }
45+
tokio = { workspace = true }

crates/catalog/sql/src/catalog.rs

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::borrow::Cow;
19+
use std::collections::HashMap;
20+
use std::time::Duration;
21+
22+
use async_trait::async_trait;
23+
use iceberg::io::FileIO;
24+
use iceberg::table::Table;
25+
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
26+
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
27+
use sqlx::AnyPool;
28+
use typed_builder::TypedBuilder;
29+
30+
use crate::error::from_sqlx_error;
31+
32+
static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
33+
static CATALOG_NAME: &str = "catalog_name";
34+
static TABLE_NAME: &str = "table_name";
35+
static TABLE_NAMESPACE: &str = "table_namespace";
36+
static METADATA_LOCATION_PROP: &str = "metadata_location";
37+
static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
38+
static RECORD_TYPE: &str = "iceberg_type";
39+
40+
static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties";
41+
static NAMESPACE_NAME: &str = "namespace";
42+
static NAMESPACE_PROPERTY_KEY: &str = "property_key";
43+
static NAMESPACE_PROPERTY_VALUE: &str = "property_value";
44+
45+
static MAX_CONNECTIONS: u32 = 10;
46+
static IDLE_TIMEOUT: u64 = 10;
47+
static TEST_BEFORE_ACQUIRE: bool = true;
48+
49+
/// Sql catalog config
50+
#[derive(Debug, TypedBuilder)]
51+
pub struct SqlCatalogConfig {
52+
uri: String,
53+
name: String,
54+
warehouse_location: String,
55+
file_io: FileIO,
56+
#[builder(default)]
57+
props: HashMap<String, String>,
58+
}
59+
60+
#[derive(Debug)]
61+
/// Sql catalog implementation.
62+
pub struct SqlCatalog {
63+
_name: String,
64+
connection: AnyPool,
65+
_warehouse_location: String,
66+
_fileio: FileIO,
67+
backend: DatabaseType,
68+
}
69+
70+
#[derive(Debug, PartialEq)]
71+
enum DatabaseType {
72+
PostgreSQL,
73+
MySQL,
74+
SQLite,
75+
}
76+
77+
impl SqlCatalog {
78+
/// Create new sql catalog instance
79+
pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
80+
install_default_drivers();
81+
let max_connections: u32 = config
82+
.props
83+
.get("pool.max-connections")
84+
.map(|v| v.parse().unwrap())
85+
.unwrap_or(MAX_CONNECTIONS);
86+
let idle_timeout: u64 = config
87+
.props
88+
.get("pool.idle-timeout")
89+
.map(|v| v.parse().unwrap())
90+
.unwrap_or(IDLE_TIMEOUT);
91+
let test_before_acquire: bool = config
92+
.props
93+
.get("pool.test-before-acquire")
94+
.map(|v| v.parse().unwrap())
95+
.unwrap_or(TEST_BEFORE_ACQUIRE);
96+
97+
let pool = AnyPoolOptions::new()
98+
.max_connections(max_connections)
99+
.idle_timeout(Duration::from_secs(idle_timeout))
100+
.test_before_acquire(test_before_acquire)
101+
.connect(&config.uri)
102+
.await
103+
.map_err(from_sqlx_error)?;
104+
105+
let conn = pool.acquire().await.map_err(from_sqlx_error)?;
106+
107+
let db_type = match conn.backend_name() {
108+
"PostgreSQL" => DatabaseType::PostgreSQL,
109+
"MySQL" => DatabaseType::MySQL,
110+
"SQLite" => DatabaseType::SQLite,
111+
_ => DatabaseType::SQLite,
112+
};
113+
114+
sqlx::query(&format!(
115+
"CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_VIEW_NAME} (
116+
{CATALOG_NAME} VARCHAR(255) NOT NULL,
117+
{TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
118+
{TABLE_NAME} VARCHAR(255) NOT NULL,
119+
{METADATA_LOCATION_PROP} VARCHAR(1000),
120+
{PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
121+
{RECORD_TYPE} VARCHAR(5),
122+
PRIMARY KEY ({CATALOG_NAME}, {TABLE_NAMESPACE}, {TABLE_NAME}))"
123+
))
124+
.execute(&pool)
125+
.await
126+
.map_err(from_sqlx_error)?;
127+
128+
sqlx::query(&format!(
129+
"CREATE TABLE IF NOT EXISTS {NAMESPACE_PROPERTIES_TABLE_NAME} (
130+
{CATALOG_NAME} VARCHAR(255) NOT NULL,
131+
{NAMESPACE_NAME} VARCHAR(255) NOT NULL,
132+
{NAMESPACE_PROPERTY_KEY} VARCHAR(255),
133+
{NAMESPACE_PROPERTY_VALUE} VARCHAR(1000),
134+
PRIMARY KEY ({CATALOG_NAME}, {NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))"
135+
))
136+
.execute(&pool)
137+
.await
138+
.map_err(from_sqlx_error)?;
139+
140+
Ok(SqlCatalog {
141+
_name: config.name.to_owned(),
142+
connection: pool,
143+
_warehouse_location: config.warehouse_location,
144+
_fileio: config.file_io,
145+
backend: db_type,
146+
})
147+
}
148+
149+
/// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
150+
pub async fn execute_statement(
151+
&self,
152+
query: &String,
153+
args: Vec<Option<&String>>,
154+
) -> Result<Vec<AnyRow>> {
155+
let query_with_placeholders: Cow<str> = if self.backend == DatabaseType::PostgreSQL {
156+
let mut query = query.clone();
157+
for i in 0..args.len() {
158+
query = query.replacen("?", &format!("${}", i + 1), 1);
159+
}
160+
Cow::Owned(query)
161+
} else {
162+
Cow::Borrowed(query)
163+
};
164+
165+
let mut sqlx_query = sqlx::query(&query_with_placeholders);
166+
for arg in args {
167+
sqlx_query = sqlx_query.bind(arg);
168+
}
169+
170+
sqlx_query
171+
.fetch_all(&self.connection)
172+
.await
173+
.map_err(from_sqlx_error)
174+
}
175+
}
176+
177+
#[async_trait]
178+
impl Catalog for SqlCatalog {
179+
async fn list_namespaces(
180+
&self,
181+
_parent: Option<&NamespaceIdent>,
182+
) -> Result<Vec<NamespaceIdent>> {
183+
todo!()
184+
}
185+
186+
async fn create_namespace(
187+
&self,
188+
_namespace: &NamespaceIdent,
189+
_properties: HashMap<String, String>,
190+
) -> Result<Namespace> {
191+
todo!()
192+
}
193+
194+
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
195+
todo!()
196+
}
197+
198+
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
199+
todo!()
200+
}
201+
202+
async fn update_namespace(
203+
&self,
204+
_namespace: &NamespaceIdent,
205+
_properties: HashMap<String, String>,
206+
) -> Result<()> {
207+
todo!()
208+
}
209+
210+
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
211+
todo!()
212+
}
213+
214+
async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
215+
todo!()
216+
}
217+
218+
async fn table_exists(&self, _identifier: &TableIdent) -> Result<bool> {
219+
todo!()
220+
}
221+
222+
async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
223+
todo!()
224+
}
225+
226+
async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
227+
todo!()
228+
}
229+
230+
async fn create_table(
231+
&self,
232+
_namespace: &NamespaceIdent,
233+
_creation: TableCreation,
234+
) -> Result<Table> {
235+
todo!()
236+
}
237+
238+
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
239+
todo!()
240+
}
241+
242+
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
243+
todo!()
244+
}
245+
}
246+
247+
#[cfg(test)]
248+
mod tests {
249+
use iceberg::io::FileIOBuilder;
250+
use iceberg::Catalog;
251+
use sqlx::migrate::MigrateDatabase;
252+
use tempfile::TempDir;
253+
254+
use crate::{SqlCatalog, SqlCatalogConfig};
255+
256+
fn temp_path() -> String {
257+
let temp_dir = TempDir::new().unwrap();
258+
temp_dir.path().to_str().unwrap().to_string()
259+
}
260+
261+
async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
262+
let sql_lite_uri = format!("sqlite:{}", temp_path());
263+
sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
264+
265+
let config = SqlCatalogConfig::builder()
266+
.uri(sql_lite_uri.to_string())
267+
.name("iceberg".to_string())
268+
.warehouse_location(warehouse_location)
269+
.file_io(FileIOBuilder::new_fs_io().build().unwrap())
270+
.build();
271+
272+
SqlCatalog::new(config).await.unwrap()
273+
}
274+
275+
#[tokio::test]
276+
async fn test_initialized() {
277+
let warehouse_loc = temp_path();
278+
new_sql_catalog(warehouse_loc.clone()).await;
279+
// catalog instantiation should not fail even if tables exist
280+
new_sql_catalog(warehouse_loc.clone()).await;
281+
new_sql_catalog(warehouse_loc.clone()).await;
282+
}
283+
}

crates/catalog/sql/src/error.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind};
19+
20+
/// Format an sqlx error into iceberg error.
21+
pub fn from_sqlx_error(error: sqlx::Error) -> Error {
22+
Error::new(
23+
ErrorKind::Unexpected,
24+
"operation failed for hitting sqlx error".to_string(),
25+
)
26+
.with_source(error)
27+
}

crates/catalog/sql/src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg sql catalog implementation.
19+
20+
#![deny(missing_docs)]
21+
22+
mod catalog;
23+
mod error;
24+
pub use catalog::*;

0 commit comments

Comments
 (0)