Skip to content

Commit 82be6c6

Browse files
committed
feat: SQL Catalog - namespaces
Signed-off-by: callum-ryan <[email protected]>
1 parent 80c1399 commit 82be6c6

File tree

2 files changed

+193
-15
lines changed

2 files changed

+193
-15
lines changed

crates/catalog/sql/src/catalog.rs

Lines changed: 185 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ use std::time::Duration;
2222
use async_trait::async_trait;
2323
use iceberg::io::FileIO;
2424
use iceberg::table::Table;
25-
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
25+
use iceberg::{
26+
Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
27+
};
2628
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
27-
use sqlx::AnyPool;
29+
use sqlx::{AnyPool, Row};
2830
use typed_builder::TypedBuilder;
2931

30-
use crate::error::from_sqlx_error;
32+
use crate::error::{from_sqlx_error, no_such_namespace_err};
3133

3234
static CATALOG_TABLE_NAME: &str = "iceberg_tables";
3335
static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name";
@@ -61,7 +63,7 @@ pub struct SqlCatalogConfig {
6163
#[derive(Debug)]
6264
/// Sql catalog implementation.
6365
pub struct SqlCatalog {
64-
_name: String,
66+
name: String,
6567
connection: AnyPool,
6668
_warehouse_location: String,
6769
_fileio: FileIO,
@@ -132,7 +134,7 @@ impl SqlCatalog {
132134
.map_err(from_sqlx_error)?;
133135

134136
Ok(SqlCatalog {
135-
_name: config.name.to_owned(),
137+
name: config.name.to_owned(),
136138
connection: pool,
137139
_warehouse_location: config.warehouse_location,
138140
_fileio: config.file_io,
@@ -173,25 +175,194 @@ impl SqlCatalog {
173175
impl Catalog for SqlCatalog {
174176
async fn list_namespaces(
175177
&self,
176-
_parent: Option<&NamespaceIdent>,
178+
parent: Option<&NamespaceIdent>,
177179
) -> Result<Vec<NamespaceIdent>> {
178-
todo!()
180+
let table_namespaces_stmt = format!(
181+
"SELECT {CATALOG_FIELD_TABLE_NAMESPACE}
182+
FROM {CATALOG_TABLE_NAME}
183+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
184+
);
185+
let namespaces_stmt = format!(
186+
"SELECT {NAMESPACE_FIELD_NAME}
187+
FROM {NAMESPACE_TABLE_NAME}
188+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?"
189+
);
190+
191+
match parent {
192+
Some(parent) => match self.namespace_exists(parent).await? {
193+
true => {
194+
let parent_table_namespaces_stmt = format!(
195+
"{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')"
196+
);
197+
let parent_namespaces_stmt =
198+
format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')");
199+
200+
let namespace_rows = self
201+
.execute_statement(
202+
&format!(
203+
"{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}"
204+
),
205+
vec![
206+
Some(&self.name),
207+
Some(&parent.join(".")),
208+
Some(&self.name),
209+
Some(&parent.join(".")),
210+
],
211+
)
212+
.await?;
213+
214+
Ok(namespace_rows
215+
.iter()
216+
.filter_map(|r| {
217+
let nsp = r.try_get::<String, _>(0).ok();
218+
nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok())
219+
})
220+
.collect())
221+
}
222+
false => no_such_namespace_err(parent),
223+
},
224+
None => {
225+
let namespace_rows = self
226+
.execute_statement(
227+
&format!("{namespaces_stmt} UNION {table_namespaces_stmt}"),
228+
vec![Some(&self.name), Some(&self.name)],
229+
)
230+
.await?;
231+
232+
Ok(namespace_rows
233+
.iter()
234+
.filter_map(|r| {
235+
let nsp = r.try_get::<String, _>(0).ok();
236+
nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok())
237+
})
238+
.collect())
239+
}
240+
}
179241
}
180242

181243
async fn create_namespace(
182244
&self,
183-
_namespace: &NamespaceIdent,
184-
_properties: HashMap<String, String>,
245+
namespace: &NamespaceIdent,
246+
properties: HashMap<String, String>,
185247
) -> Result<Namespace> {
186-
todo!()
248+
let exists = self.namespace_exists(namespace).await?;
249+
if exists {
250+
Err(Error::new(
251+
iceberg::ErrorKind::Unexpected,
252+
format!("Namespace {:?} already exists", namespace),
253+
))
254+
} else {
255+
let namespace_str = namespace.join(".");
256+
let insert = format!(
257+
"INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE})
258+
VALUES (?, ?, ?, ?)");
259+
if !properties.is_empty() {
260+
let mut query_args = vec![];
261+
let mut properties_insert = insert.clone();
262+
for (index, (key, value)) in properties.iter().enumerate() {
263+
query_args.extend(
264+
[
265+
Some(&self.name),
266+
Some(&namespace_str),
267+
Some(key),
268+
Some(value),
269+
]
270+
.iter(),
271+
);
272+
if index > 0 {
273+
properties_insert = format!("{properties_insert}, (?, ?, ?, ?)");
274+
}
275+
}
276+
277+
self.execute_statement(&properties_insert, query_args)
278+
.await?;
279+
280+
Ok(Namespace::with_properties(namespace.clone(), properties))
281+
} else {
282+
// set a default property of exists = true
283+
self.execute_statement(&insert, vec![
284+
Some(&self.name),
285+
Some(&namespace_str),
286+
Some(&"exists".to_string()),
287+
Some(&"true".to_string()),
288+
])
289+
.await?;
290+
Ok(Namespace::with_properties(namespace.clone(), properties))
291+
}
292+
}
187293
}
188294

189-
async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
190-
todo!()
295+
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
296+
let exists = self.namespace_exists(namespace).await?;
297+
if exists {
298+
let namespace_props = self
299+
.execute_statement(
300+
&format!(
301+
"SELECT
302+
{NAMESPACE_FIELD_NAME},
303+
{NAMESPACE_FIELD_PROPERTY_KEY},
304+
{NAMESPACE_FIELD_PROPERTY_VALUE}
305+
FROM {NAMESPACE_TABLE_NAME}
306+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
307+
AND {NAMESPACE_FIELD_NAME} = ?"
308+
),
309+
vec![Some(&self.name), Some(&namespace.join("."))],
310+
)
311+
.await?;
312+
313+
let properties: HashMap<String, String> = namespace_props
314+
.iter()
315+
.filter_map(|r| {
316+
let key = r.try_get(NAMESPACE_FIELD_PROPERTY_KEY).ok();
317+
let value = r.try_get(NAMESPACE_FIELD_PROPERTY_VALUE).ok();
318+
match (key, value) {
319+
(Some(k), Some(v)) => Some((k, v)),
320+
_ => None,
321+
}
322+
})
323+
.collect();
324+
325+
Ok(Namespace::with_properties(namespace.clone(), properties))
326+
} else {
327+
no_such_namespace_err(namespace)
328+
}
191329
}
192330

193-
async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
194-
todo!()
331+
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
332+
let namespace_str = namespace.join(".");
333+
334+
let table_namespaces = self
335+
.execute_statement(
336+
&format!(
337+
"SELECT 1 FROM {CATALOG_TABLE_NAME}
338+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
339+
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
340+
LIMIT 1"
341+
),
342+
vec![Some(&self.name), Some(&namespace_str)],
343+
)
344+
.await?;
345+
346+
if !table_namespaces.is_empty() {
347+
Ok(true)
348+
} else {
349+
let namespaces = self
350+
.execute_statement(
351+
&format!(
352+
"SELECT 1 FROM {NAMESPACE_TABLE_NAME}
353+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
354+
AND {NAMESPACE_FIELD_NAME} = ?
355+
LIMIT 1"
356+
),
357+
vec![Some(&self.name), Some(&namespace_str)],
358+
)
359+
.await?;
360+
if !namespaces.is_empty() {
361+
Ok(true)
362+
} else {
363+
Ok(false)
364+
}
365+
}
195366
}
196367

197368
async fn update_namespace(

crates/catalog/sql/src/error.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use iceberg::{Error, ErrorKind};
18+
use iceberg::{Error, ErrorKind, NamespaceIdent, Result};
1919

2020
/// Format an sqlx error into iceberg error.
2121
pub fn from_sqlx_error(error: sqlx::Error) -> Error {
@@ -25,3 +25,10 @@ pub fn from_sqlx_error(error: sqlx::Error) -> Error {
2525
)
2626
.with_source(error)
2727
}
28+
29+
pub fn no_such_namespace_err<T>(namespace: &NamespaceIdent) -> Result<T> {
30+
Err(Error::new(
31+
ErrorKind::Unexpected,
32+
format!("No such namespace: {:?}", namespace),
33+
))
34+
}

0 commit comments

Comments
 (0)