Skip to content

Commit 3de1771

Browse files
committed
feat: add the rest of table ops
Signed-off-by: callum-ryan <[email protected]>
1 parent 32b54fe commit 3de1771

File tree

1 file changed

+249
-9
lines changed

1 file changed

+249
-9
lines changed

crates/catalog/sql/src/catalog.rs

Lines changed: 249 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,11 @@ impl Catalog for SqlCatalog {
518518
{CATALOG_FIELD_TABLE_NAMESPACE}
519519
FROM {CATALOG_TABLE_NAME}
520520
WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
521-
AND {CATALOG_FIELD_CATALOG_NAME} = ?"
521+
AND {CATALOG_FIELD_CATALOG_NAME} = ?
522+
AND (
523+
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
524+
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
525+
)",
522526
),
523527
vec![Some(&namespace.join(".")), Some(&self.name)],
524528
)
@@ -553,7 +557,11 @@ impl Catalog for SqlCatalog {
553557
FROM {CATALOG_TABLE_NAME}
554558
WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ?
555559
AND {CATALOG_FIELD_CATALOG_NAME} = ?
556-
AND {CATALOG_FIELD_TABLE_NAME} = ?"
560+
AND {CATALOG_FIELD_TABLE_NAME} = ?
561+
AND (
562+
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
563+
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
564+
)"
557565
),
558566
vec![Some(&namespace), Some(&self.name), Some(table_name)],
559567
)
@@ -566,8 +574,32 @@ impl Catalog for SqlCatalog {
566574
}
567575
}
568576

569-
async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
570-
todo!()
577+
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
578+
if !self.table_exists(identifier).await? {
579+
return no_such_table_err(identifier);
580+
}
581+
582+
self.execute(
583+
&format!(
584+
"DELETE FROM {CATALOG_TABLE_NAME}
585+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
586+
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
587+
AND {CATALOG_FIELD_TABLE_NAME} = ?
588+
AND (
589+
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
590+
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
591+
)"
592+
),
593+
vec![
594+
Some(&self.name),
595+
Some(identifier.name()),
596+
Some(&identifier.namespace().join(".")),
597+
],
598+
None,
599+
)
600+
.await?;
601+
602+
Ok(())
571603
}
572604

573605
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
@@ -674,8 +706,10 @@ impl Catalog for SqlCatalog {
674706
.await?;
675707

676708
self.execute(&format!(
677-
"INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP})
678-
VALUES (?, ?, ?, ?)"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?;
709+
"INSERT INTO {CATALOG_TABLE_NAME}
710+
({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
711+
VALUES (?, ?, ?, ?, ?)
712+
"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
679713

680714
Ok(Table::builder()
681715
.file_io(self.fileio.clone())
@@ -685,8 +719,47 @@ impl Catalog for SqlCatalog {
685719
.build()?)
686720
}
687721

688-
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
689-
todo!()
722+
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
723+
if src == dest {
724+
return Ok(());
725+
}
726+
727+
if !self.table_exists(src).await? {
728+
return no_such_table_err(src);
729+
}
730+
731+
if !self.namespace_exists(dest.namespace()).await? {
732+
return no_such_namespace_err(dest.namespace());
733+
}
734+
735+
if self.table_exists(dest).await? {
736+
return table_already_exists_err(dest);
737+
}
738+
739+
self.execute(
740+
&format!(
741+
"UPDATE {CATALOG_TABLE_NAME}
742+
SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ?
743+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
744+
AND {CATALOG_FIELD_TABLE_NAME} = ?
745+
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
746+
AND (
747+
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
748+
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
749+
)"
750+
),
751+
vec![
752+
Some(dest.name()),
753+
Some(&dest.namespace().join(".")),
754+
Some(&self.name),
755+
Some(src.name()),
756+
Some(&src.namespace().join(".")),
757+
],
758+
None,
759+
)
760+
.await?;
761+
762+
Ok(())
690763
}
691764

692765
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
@@ -711,6 +784,8 @@ mod tests {
711784
use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY;
712785
use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig};
713786

787+
const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
788+
714789
fn temp_path() -> String {
715790
let temp_dir = TempDir::new().unwrap();
716791
temp_dir.path().to_str().unwrap().to_string()
@@ -778,7 +853,11 @@ mod tests {
778853
.unwrap();
779854
}
780855

781-
const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
856+
async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
857+
for table_ident in table_idents {
858+
create_table(catalog, table_ident).await;
859+
}
860+
}
782861

783862
fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
784863
assert_eq!(table.identifier(), expected_table_ident);
@@ -1470,4 +1549,165 @@ mod tests {
14701549
)
14711550
);
14721551
}
1552+
1553+
#[tokio::test]
1554+
async fn test_rename_table_in_same_namespace() {
1555+
let warehouse_loc = temp_path();
1556+
let catalog = new_sql_catalog(warehouse_loc).await;
1557+
let namespace_ident = NamespaceIdent::new("n1".into());
1558+
create_namespace(&catalog, &namespace_ident).await;
1559+
let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1560+
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1561+
create_table(&catalog, &src_table_ident).await;
1562+
1563+
catalog
1564+
.rename_table(&src_table_ident, &dst_table_ident)
1565+
.await
1566+
.unwrap();
1567+
1568+
assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1569+
dst_table_ident
1570+
],);
1571+
}
1572+
1573+
#[tokio::test]
1574+
async fn test_rename_table_across_namespaces() {
1575+
let warehouse_loc = temp_path();
1576+
let catalog = new_sql_catalog(warehouse_loc).await;
1577+
let src_namespace_ident = NamespaceIdent::new("a".into());
1578+
let dst_namespace_ident = NamespaceIdent::new("b".into());
1579+
create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1580+
let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1581+
let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1582+
create_table(&catalog, &src_table_ident).await;
1583+
1584+
catalog
1585+
.rename_table(&src_table_ident, &dst_table_ident)
1586+
.await
1587+
.unwrap();
1588+
1589+
assert_eq!(
1590+
catalog.list_tables(&src_namespace_ident).await.unwrap(),
1591+
vec![],
1592+
);
1593+
1594+
assert_eq!(
1595+
catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1596+
vec![dst_table_ident],
1597+
);
1598+
}
1599+
1600+
#[tokio::test]
1601+
async fn test_rename_table_src_table_is_same_as_dst_table() {
1602+
let warehouse_loc = temp_path();
1603+
let catalog = new_sql_catalog(warehouse_loc).await;
1604+
let namespace_ident = NamespaceIdent::new("n1".into());
1605+
create_namespace(&catalog, &namespace_ident).await;
1606+
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1607+
create_table(&catalog, &table_ident).await;
1608+
1609+
catalog
1610+
.rename_table(&table_ident, &table_ident)
1611+
.await
1612+
.unwrap();
1613+
1614+
assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1615+
table_ident
1616+
],);
1617+
}
1618+
1619+
#[tokio::test]
1620+
async fn test_rename_table_across_nested_namespaces() {
1621+
let warehouse_loc = temp_path();
1622+
let catalog = new_sql_catalog(warehouse_loc).await;
1623+
let namespace_ident_a = NamespaceIdent::new("a".into());
1624+
let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1625+
let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1626+
create_namespaces(&catalog, &vec![
1627+
&namespace_ident_a,
1628+
&namespace_ident_a_b,
1629+
&namespace_ident_a_b_c,
1630+
])
1631+
.await;
1632+
1633+
let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1634+
create_tables(&catalog, vec![&src_table_ident]).await;
1635+
1636+
let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1637+
catalog
1638+
.rename_table(&src_table_ident, &dst_table_ident)
1639+
.await
1640+
.unwrap();
1641+
1642+
assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1643+
1644+
assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1645+
}
1646+
1647+
#[tokio::test]
1648+
async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1649+
let warehouse_loc = temp_path();
1650+
let catalog = new_sql_catalog(warehouse_loc).await;
1651+
let src_namespace_ident = NamespaceIdent::new("n1".into());
1652+
let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1653+
create_namespace(&catalog, &src_namespace_ident).await;
1654+
create_table(&catalog, &src_table_ident).await;
1655+
1656+
let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1657+
let dst_table_ident =
1658+
TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1659+
assert_eq!(
1660+
catalog
1661+
.rename_table(&src_table_ident, &dst_table_ident)
1662+
.await
1663+
.unwrap_err()
1664+
.to_string(),
1665+
format!(
1666+
"Unexpected => No such namespace: {:?}",
1667+
non_existent_dst_namespace_ident
1668+
),
1669+
);
1670+
}
1671+
1672+
#[tokio::test]
1673+
async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1674+
let warehouse_loc = temp_path();
1675+
let catalog = new_sql_catalog(warehouse_loc).await;
1676+
let namespace_ident = NamespaceIdent::new("n1".into());
1677+
create_namespace(&catalog, &namespace_ident).await;
1678+
let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1679+
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1680+
1681+
assert_eq!(
1682+
catalog
1683+
.rename_table(&src_table_ident, &dst_table_ident)
1684+
.await
1685+
.unwrap_err()
1686+
.to_string(),
1687+
format!("Unexpected => No such table: {:?}", src_table_ident),
1688+
);
1689+
}
1690+
1691+
#[tokio::test]
1692+
async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1693+
let warehouse_loc = temp_path();
1694+
let catalog = new_sql_catalog(warehouse_loc).await;
1695+
let namespace_ident = NamespaceIdent::new("n1".into());
1696+
create_namespace(&catalog, &namespace_ident).await;
1697+
let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1698+
let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1699+
create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1700+
1701+
assert_eq!(
1702+
catalog
1703+
.rename_table(&src_table_ident, &dst_table_ident)
1704+
.await
1705+
.unwrap_err()
1706+
.to_string(),
1707+
format!(
1708+
"Unexpected => Cannot create table {:? }. Table already exists.",
1709+
&dst_table_ident
1710+
),
1711+
);
1712+
}
14731713
}

0 commit comments

Comments
 (0)