Skip to content

Commit 80c1399

Browse files
authored
feat: initialise SQL Catalog (#524)
* feat: initialise SQL Catalog Signed-off-by: callum-ryan <[email protected]> * fix: remove rls-rustls Signed-off-by: callum-ryan <[email protected]> * feat: change to SqlBindStyle and rename consts Signed-off-by: callum-ryan <[email protected]> --------- Signed-off-by: callum-ryan <[email protected]>
1 parent 2daa2c9 commit 80c1399

File tree

4 files changed

+373
-0
lines changed

4 files changed

+373
-0
lines changed

crates/catalog/sql/Cargo.toml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-catalog-sql"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Rust Sql Catalog"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "sql", "catalog"]
30+
31+
[dependencies]
32+
async-trait = { workspace = true }
33+
iceberg = { workspace = true }
34+
sqlx = { version = "0.7.4", features = ["any"], default-features = false }
35+
typed-builder = { workspace = true }
36+
37+
[dev-dependencies]
38+
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
39+
itertools = { workspace = true }
40+
regex = "1.10.5"
41+
sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "migrate"], default-features = false }
42+
tempfile = { workspace = true }
43+
tokio = { workspace = true }

crates/catalog/sql/src/catalog.rs

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::borrow::Cow;
19+
use std::collections::HashMap;
20+
use std::time::Duration;
21+
22+
use async_trait::async_trait;
23+
use iceberg::io::FileIO;
24+
use iceberg::table::Table;
25+
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
26+
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
27+
use sqlx::AnyPool;
28+
use typed_builder::TypedBuilder;
29+
30+
use crate::error::from_sqlx_error;
31+
32+
static CATALOG_TABLE_NAME: &str = "iceberg_tables";
33+
static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
34+
static CATALOG_FIELD_TABLE_NAME: &str = "table_name";
35+
static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
36+
static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
37+
static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
38+
static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
39+
40+
static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
41+
static NAMESPACE_FIELD_NAME: &str = "namespace";
42+
static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key";
43+
static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value";
44+
45+
static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided
46+
static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed
47+
static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning
48+
49+
/// Sql catalog config
50+
#[derive(Debug, TypedBuilder)]
51+
pub struct SqlCatalogConfig {
52+
uri: String,
53+
name: String,
54+
warehouse_location: String,
55+
file_io: FileIO,
56+
sql_bind_style: SqlBindStyle,
57+
#[builder(default)]
58+
props: HashMap<String, String>,
59+
}
60+
61+
#[derive(Debug)]
62+
/// Sql catalog implementation.
63+
pub struct SqlCatalog {
64+
_name: String,
65+
connection: AnyPool,
66+
_warehouse_location: String,
67+
_fileio: FileIO,
68+
sql_bind_style: SqlBindStyle,
69+
}
70+
71+
#[derive(Debug, PartialEq)]
72+
/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB)
73+
pub enum SqlBindStyle {
74+
/// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style
75+
DollarNumeric,
76+
/// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB)
77+
QMark,
78+
}
79+
80+
impl SqlCatalog {
81+
/// Create new sql catalog instance
82+
pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
83+
install_default_drivers();
84+
let max_connections: u32 = config
85+
.props
86+
.get("pool.max-connections")
87+
.map(|v| v.parse().unwrap())
88+
.unwrap_or(MAX_CONNECTIONS);
89+
let idle_timeout: u64 = config
90+
.props
91+
.get("pool.idle-timeout")
92+
.map(|v| v.parse().unwrap())
93+
.unwrap_or(IDLE_TIMEOUT);
94+
let test_before_acquire: bool = config
95+
.props
96+
.get("pool.test-before-acquire")
97+
.map(|v| v.parse().unwrap())
98+
.unwrap_or(TEST_BEFORE_ACQUIRE);
99+
100+
let pool = AnyPoolOptions::new()
101+
.max_connections(max_connections)
102+
.idle_timeout(Duration::from_secs(idle_timeout))
103+
.test_before_acquire(test_before_acquire)
104+
.connect(&config.uri)
105+
.await
106+
.map_err(from_sqlx_error)?;
107+
108+
sqlx::query(&format!(
109+
"CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} (
110+
{CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
111+
{CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL,
112+
{CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL,
113+
{CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000),
114+
{CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000),
115+
{CATALOG_FIELD_RECORD_TYPE} VARCHAR(5),
116+
PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))"
117+
))
118+
.execute(&pool)
119+
.await
120+
.map_err(from_sqlx_error)?;
121+
122+
sqlx::query(&format!(
123+
"CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} (
124+
{CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL,
125+
{NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL,
126+
{NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255),
127+
{NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000),
128+
PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))"
129+
))
130+
.execute(&pool)
131+
.await
132+
.map_err(from_sqlx_error)?;
133+
134+
Ok(SqlCatalog {
135+
_name: config.name.to_owned(),
136+
connection: pool,
137+
_warehouse_location: config.warehouse_location,
138+
_fileio: config.file_io,
139+
sql_bind_style: config.sql_bind_style,
140+
})
141+
}
142+
143+
/// SQLX Any does not implement PostgresSQL bindings, so we have to do this.
144+
pub async fn execute_statement(
145+
&self,
146+
query: &String,
147+
args: Vec<Option<&String>>,
148+
) -> Result<Vec<AnyRow>> {
149+
let query_with_placeholders: Cow<str> =
150+
if self.sql_bind_style == SqlBindStyle::DollarNumeric {
151+
let mut query = query.clone();
152+
for i in 0..args.len() {
153+
query = query.replacen("?", &format!("${}", i + 1), 1);
154+
}
155+
Cow::Owned(query)
156+
} else {
157+
Cow::Borrowed(query)
158+
};
159+
160+
let mut sqlx_query = sqlx::query(&query_with_placeholders);
161+
for arg in args {
162+
sqlx_query = sqlx_query.bind(arg);
163+
}
164+
165+
sqlx_query
166+
.fetch_all(&self.connection)
167+
.await
168+
.map_err(from_sqlx_error)
169+
}
170+
}
171+
172+
#[async_trait]
173+
impl Catalog for SqlCatalog {
174+
async fn list_namespaces(
175+
&self,
176+
_parent: Option<&NamespaceIdent>,
177+
) -> Result<Vec<NamespaceIdent>> {
178+
todo!()
179+
}
180+
181+
async fn create_namespace(
182+
&self,
183+
_namespace: &NamespaceIdent,
184+
_properties: HashMap<String, String>,
185+
) -> Result<Namespace> {
186+
todo!()
187+
}
188+
189+
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
190+
todo!()
191+
}
192+
193+
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
194+
todo!()
195+
}
196+
197+
async fn update_namespace(
198+
&self,
199+
_namespace: &NamespaceIdent,
200+
_properties: HashMap<String, String>,
201+
) -> Result<()> {
202+
todo!()
203+
}
204+
205+
async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
206+
todo!()
207+
}
208+
209+
async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
210+
todo!()
211+
}
212+
213+
async fn table_exists(&self, _identifier: &TableIdent) -> Result<bool> {
214+
todo!()
215+
}
216+
217+
async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
218+
todo!()
219+
}
220+
221+
async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
222+
todo!()
223+
}
224+
225+
async fn create_table(
226+
&self,
227+
_namespace: &NamespaceIdent,
228+
_creation: TableCreation,
229+
) -> Result<Table> {
230+
todo!()
231+
}
232+
233+
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
234+
todo!()
235+
}
236+
237+
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
238+
todo!()
239+
}
240+
}
241+
242+
#[cfg(test)]
243+
mod tests {
244+
use iceberg::io::FileIOBuilder;
245+
use iceberg::Catalog;
246+
use sqlx::migrate::MigrateDatabase;
247+
use tempfile::TempDir;
248+
249+
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
250+
251+
fn temp_path() -> String {
252+
let temp_dir = TempDir::new().unwrap();
253+
temp_dir.path().to_str().unwrap().to_string()
254+
}
255+
256+
async fn new_sql_catalog(warehouse_location: String) -> impl Catalog {
257+
let sql_lite_uri = format!("sqlite:{}", temp_path());
258+
sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap();
259+
260+
let config = SqlCatalogConfig::builder()
261+
.uri(sql_lite_uri.to_string())
262+
.name("iceberg".to_string())
263+
.warehouse_location(warehouse_location)
264+
.file_io(FileIOBuilder::new_fs_io().build().unwrap())
265+
.sql_bind_style(SqlBindStyle::QMark)
266+
.build();
267+
268+
SqlCatalog::new(config).await.unwrap()
269+
}
270+
271+
#[tokio::test]
272+
async fn test_initialized() {
273+
let warehouse_loc = temp_path();
274+
new_sql_catalog(warehouse_loc.clone()).await;
275+
// catalog instantiation should not fail even if tables exist
276+
new_sql_catalog(warehouse_loc.clone()).await;
277+
new_sql_catalog(warehouse_loc.clone()).await;
278+
}
279+
}

crates/catalog/sql/src/error.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind};
19+
20+
/// Format an sqlx error into iceberg error.
21+
pub fn from_sqlx_error(error: sqlx::Error) -> Error {
22+
Error::new(
23+
ErrorKind::Unexpected,
24+
"operation failed for hitting sqlx error".to_string(),
25+
)
26+
.with_source(error)
27+
}

crates/catalog/sql/src/lib.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg sql catalog implementation.
19+
20+
#![deny(missing_docs)]
21+
22+
mod catalog;
23+
mod error;
24+
pub use catalog::*;

0 commit comments

Comments
 (0)