diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml index daa9587f63..0508378e7f 100644 --- a/crates/catalog/glue/Cargo.toml +++ b/crates/catalog/glue/Cargo.toml @@ -35,9 +35,11 @@ aws-config = { workspace = true } aws-sdk-glue = { workspace = true } iceberg = { workspace = true } log = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } typed-builder = { workspace = true } +uuid = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } port_scanner = { workspace = true } -tokio = { workspace = true } diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index d152a541fb..f40212950f 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -16,18 +16,23 @@ // under the License. use async_trait::async_trait; +use aws_sdk_glue::types::TableInput; +use iceberg::io::FileIO; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; use std::{collections::HashMap, fmt::Debug}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use typed_builder::TypedBuilder; -use crate::error::from_aws_sdk_error; +use crate::error::{from_aws_build_error, from_aws_sdk_error}; use crate::utils::{ - convert_to_database, convert_to_namespace, create_sdk_config, validate_namespace, + convert_to_database, convert_to_glue_table, convert_to_namespace, create_metadata_location, + create_sdk_config, get_default_table_location, get_metadata_location, validate_namespace, }; use crate::with_catalog_id; @@ -38,6 +43,7 @@ pub struct GlueCatalogConfig { uri: Option, #[builder(default, setter(strip_option))] catalog_id: Option, + warehouse: String, #[builder(default)] props: HashMap, } @@ -48,6 +54,7 @@ struct GlueClient(aws_sdk_glue::Client); pub struct GlueCatalog { config: GlueCatalogConfig, client: GlueClient, + file_io: FileIO, } impl Debug for GlueCatalog { @@ -60,15 +67,24 @@ impl Debug for GlueCatalog { impl GlueCatalog { /// Create a new glue catalog - pub async fn new(config: GlueCatalogConfig) -> Self { + pub async fn new(config: GlueCatalogConfig) -> Result { let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; let client = aws_sdk_glue::Client::new(&sdk_config); - GlueCatalog { + let file_io = FileIO::from_path(&config.warehouse)? + .with_props(&config.props) + .build()?; + + Ok(GlueCatalog { config, client: GlueClient(client), - } + file_io, + }) + } + /// Get the catalogs `FileIO` + pub fn file_io(&self) -> FileIO { + self.file_io.clone() } } @@ -77,7 +93,7 @@ impl Catalog for GlueCatalog { /// List namespaces from glue catalog. /// /// Glue doesn't support nested namespaces. - /// We will return an empty list if parent is some + /// We will return an empty list if parent is some. async fn list_namespaces( &self, parent: Option<&NamespaceIdent>, @@ -277,6 +293,7 @@ impl Catalog for GlueCatalog { /// querying the database. async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { let db_name = validate_namespace(namespace)?; + let mut table_list: Vec = Vec::new(); let mut next_token: Option = None; @@ -310,31 +327,282 @@ impl Catalog for GlueCatalog { Ok(table_list) } + /// Creates a new table within a specified namespace using the provided + /// table creation settings. + /// + /// # Returns + /// A `Result` wrapping a `Table` object representing the newly created + /// table. + /// + /// # Errors + /// This function may return an error in several cases, including invalid + /// namespace identifiers, failure to determine a default storage location, + /// issues generating or writing table metadata, and errors communicating + /// with the Glue Catalog. async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result { - todo!() + let db_name = validate_namespace(namespace)?; + let table_name = creation.name.clone(); + + let location = match &creation.location { + Some(location) => location.clone(), + None => { + let ns = self.get_namespace(namespace).await?; + get_default_table_location(&ns, &db_name, &table_name, &self.config.warehouse) + } + }; + + let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; + let metadata_location = create_metadata_location(&location, 0)?; + + let mut file = self + .file_io + .new_output(&metadata_location)? + .writer() + .await?; + file.write_all(&serde_json::to_vec(&metadata)?).await?; + file.shutdown().await?; + + let glue_table = convert_to_glue_table( + &table_name, + metadata_location.clone(), + &metadata, + metadata.properties(), + None, + )?; + + let builder = self + .client + .0 + .create_table() + .database_name(&db_name) + .table_input(glue_table); + let builder = with_catalog_id!(builder, self.config); + + builder.send().await.map_err(from_aws_sdk_error)?; + + let table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) + .build(); + + Ok(table) } - async fn load_table(&self, _table: &TableIdent) -> Result
{ - todo!() + /// Loads a table from the Glue Catalog and constructs a `Table` object + /// based on its metadata. + /// + /// # Returns + /// A `Result` wrapping a `Table` object that represents the loaded table. + /// + /// # Errors + /// This function may return an error in several scenarios, including: + /// - Failure to validate the namespace. + /// - Failure to retrieve the table from the Glue Catalog. + /// - Absence of metadata location information in the table's properties. + /// - Issues reading or deserializing the table's metadata file. + async fn load_table(&self, table: &TableIdent) -> Result
{ + let db_name = validate_namespace(table.namespace())?; + let table_name = table.name(); + + let builder = self + .client + .0 + .get_table() + .database_name(&db_name) + .name(table_name); + let builder = with_catalog_id!(builder, self.config); + + let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?; + + match glue_table_output.table() { + None => Err(Error::new( + ErrorKind::Unexpected, + format!( + "Table object for database: {} and table: {} does not exist", + db_name, table_name + ), + )), + Some(table) => { + let metadata_location = get_metadata_location(&table.parameters)?; + + let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?; + let mut metadata_str = String::new(); + reader.read_to_string(&mut metadata_str).await?; + let metadata = serde_json::from_str::(&metadata_str)?; + + let table = Table::builder() + .file_io(self.file_io()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(TableIdent::new( + NamespaceIdent::new(db_name), + table_name.to_owned(), + )) + .build(); + + Ok(table) + } + } } - async fn drop_table(&self, _table: &TableIdent) -> Result<()> { - todo!() + /// Asynchronously drops a table from the database. + /// + /// # Errors + /// Returns an error if: + /// - The namespace provided in `table` cannot be validated + /// or does not exist. + /// - The underlying database client encounters an error while + /// attempting to drop the table. This includes scenarios where + /// the table does not exist. + /// - Any network or communication error occurs with the database backend. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + let db_name = validate_namespace(table.namespace())?; + let table_name = table.name(); + + let builder = self + .client + .0 + .delete_table() + .database_name(&db_name) + .name(table_name); + let builder = with_catalog_id!(builder, self.config); + + builder.send().await.map_err(from_aws_sdk_error)?; + + Ok(()) } - async fn table_exists(&self, _table: &TableIdent) -> Result { - todo!() + /// Asynchronously checks the existence of a specified table + /// in the database. + /// + /// # Returns + /// - `Ok(true)` if the table exists in the database. + /// - `Ok(false)` if the table does not exist in the database. + /// - `Err(...)` if an error occurs during the process + async fn table_exists(&self, table: &TableIdent) -> Result { + let db_name = validate_namespace(table.namespace())?; + let table_name = table.name(); + + let builder = self + .client + .0 + .get_table() + .database_name(&db_name) + .name(table_name); + let builder = with_catalog_id!(builder, self.config); + + let resp = builder.send().await; + + match resp { + Ok(_) => Ok(true), + Err(err) => { + if err + .as_service_error() + .map(|e| e.is_entity_not_found_exception()) + == Some(true) + { + return Ok(false); + } + Err(from_aws_sdk_error(err)) + } + } } - async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { - todo!() + /// Asynchronously renames a table within the database + /// or moves it between namespaces (databases). + /// + /// # Returns + /// - `Ok(())` on successful rename or move of the table. + /// - `Err(...)` if an error occurs during the process. + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + let src_db_name = validate_namespace(src.namespace())?; + let dest_db_name = validate_namespace(dest.namespace())?; + + let src_table_name = src.name(); + let dest_table_name = dest.name(); + + let builder = self + .client + .0 + .get_table() + .database_name(&src_db_name) + .name(src_table_name); + let builder = with_catalog_id!(builder, self.config); + + let glue_table_output = builder.send().await.map_err(from_aws_sdk_error)?; + + match glue_table_output.table() { + None => Err(Error::new( + ErrorKind::Unexpected, + format!( + "'Table' object for database: {} and table: {} does not exist", + src_db_name, src_table_name + ), + )), + Some(table) => { + let rename_table_input = TableInput::builder() + .name(dest_table_name) + .set_parameters(table.parameters.clone()) + .set_storage_descriptor(table.storage_descriptor.clone()) + .set_table_type(table.table_type.clone()) + .set_description(table.description.clone()) + .build() + .map_err(from_aws_build_error)?; + + let builder = self + .client + .0 + .create_table() + .database_name(&dest_db_name) + .table_input(rename_table_input); + let builder = with_catalog_id!(builder, self.config); + + builder.send().await.map_err(from_aws_sdk_error)?; + + let drop_src_table_result = self.drop_table(src).await; + + match drop_src_table_result { + Ok(_) => Ok(()), + Err(_) => { + let err_msg_src_table = format!( + "Failed to drop old table {}.{}.", + src_db_name, src_table_name + ); + + let drop_dest_table_result = self.drop_table(dest).await; + + match drop_dest_table_result { + Ok(_) => Err(Error::new( + ErrorKind::Unexpected, + format!( + "{} Rolled back table creation for {}.{}.", + err_msg_src_table, dest_db_name, dest_table_name + ), + )), + Err(_) => Err(Error::new( + ErrorKind::Unexpected, + format!( + "{} Failed to roll back table creation for {}.{}. Please clean up manually.", + err_msg_src_table, dest_db_name, dest_table_name + ), + )), + } + } + } + } + } } async fn update_table(&self, _commit: TableCommit) -> Result
{ - todo!() + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating a table is not supported yet", + )) } } diff --git a/crates/catalog/glue/src/lib.rs b/crates/catalog/glue/src/lib.rs index b274cf7215..2376573358 100644 --- a/crates/catalog/glue/src/lib.rs +++ b/crates/catalog/glue/src/lib.rs @@ -21,6 +21,7 @@ mod catalog; mod error; +mod schema; mod utils; pub use catalog::*; pub use utils::{ diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs new file mode 100644 index 0000000000..a126f2f29b --- /dev/null +++ b/crates/catalog/glue/src/schema.rs @@ -0,0 +1,485 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Property `iceberg.field.id` for `Column` +pub(crate) const ICEBERG_FIELD_ID: &str = "iceberg.field.id"; +/// Property `iceberg.field.optional` for `Column` +pub(crate) const ICEBERG_FIELD_OPTIONAL: &str = "iceberg.field.optional"; +/// Property `iceberg.field.current` for `Column` +pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current"; + +use std::collections::HashMap; + +use iceberg::{ + spec::{visit_schema, PrimitiveType, SchemaVisitor, TableMetadata}, + Error, ErrorKind, Result, +}; + +use aws_sdk_glue::types::Column; + +use crate::error::from_aws_build_error; + +type GlueSchema = Vec; + +#[derive(Debug, Default)] +pub(crate) struct GlueSchemaBuilder { + schema: GlueSchema, + is_current: bool, + depth: usize, +} + +impl GlueSchemaBuilder { + /// Creates a new `GlueSchemaBuilder` from iceberg `Schema` + pub fn from_iceberg(metadata: &TableMetadata) -> Result { + let current_schema = metadata.current_schema(); + + let mut builder = Self { + schema: Vec::new(), + is_current: true, + depth: 0, + }; + + visit_schema(current_schema, &mut builder)?; + + builder.is_current = false; + + for schema in metadata.schemas_iter() { + if schema.schema_id() == current_schema.schema_id() { + continue; + } + + visit_schema(schema, &mut builder)?; + } + + Ok(builder) + } + + /// Returns the newly converted `GlueSchema` + pub fn build(self) -> GlueSchema { + self.schema + } + + /// Check if is in `StructType` while traversing schema + fn is_inside_struct(&self) -> bool { + self.depth > 0 + } +} + +impl SchemaVisitor for GlueSchemaBuilder { + type T = String; + + fn schema( + &mut self, + _schema: &iceberg::spec::Schema, + value: Self::T, + ) -> iceberg::Result { + Ok(value) + } + + fn before_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> { + self.depth += 1; + Ok(()) + } + + fn r#struct( + &mut self, + r#_struct: &iceberg::spec::StructType, + results: Vec, + ) -> iceberg::Result { + Ok(format!("struct<{}>", results.join(", "))) + } + + fn after_struct_field(&mut self, _field: &iceberg::spec::NestedFieldRef) -> Result<()> { + self.depth -= 1; + Ok(()) + } + + fn field( + &mut self, + field: &iceberg::spec::NestedFieldRef, + value: String, + ) -> iceberg::Result { + if self.is_inside_struct() { + return Ok(format!("{}:{}", field.name, &value)); + } + + let parameters = HashMap::from([ + (ICEBERG_FIELD_ID.to_string(), format!("{}", field.id)), + ( + ICEBERG_FIELD_OPTIONAL.to_string(), + format!("{}", field.required).to_lowercase(), + ), + ( + ICEBERG_FIELD_CURRENT.to_string(), + format!("{}", self.is_current).to_lowercase(), + ), + ]); + + let mut builder = Column::builder() + .name(field.name.clone()) + .r#type(&value) + .set_parameters(Some(parameters)); + + if let Some(comment) = field.doc.as_ref() { + builder = builder.comment(comment); + } + + let column = builder.build().map_err(from_aws_build_error)?; + + self.schema.push(column); + + Ok(value) + } + + fn list(&mut self, _list: &iceberg::spec::ListType, value: String) -> iceberg::Result { + Ok(format!("array<{}>", value)) + } + + fn map( + &mut self, + _map: &iceberg::spec::MapType, + key_value: String, + value: String, + ) -> iceberg::Result { + Ok(format!("map<{},{}>", key_value, value)) + } + + fn primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result { + let glue_type = match p { + PrimitiveType::Boolean => "boolean".to_string(), + PrimitiveType::Int => "int".to_string(), + PrimitiveType::Long => "bigint".to_string(), + PrimitiveType::Float => "float".to_string(), + PrimitiveType::Double => "double".to_string(), + PrimitiveType::Date => "date".to_string(), + PrimitiveType::Timestamp => "timestamp".to_string(), + PrimitiveType::Time | PrimitiveType::String | PrimitiveType::Uuid => { + "string".to_string() + } + PrimitiveType::Binary | PrimitiveType::Fixed(_) => "binary".to_string(), + PrimitiveType::Decimal { precision, scale } => { + format!("decimal({},{})", precision, scale) + } + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Conversion from 'Timestamptz' is not supported", + )) + } + }; + + Ok(glue_type) + } +} + +#[cfg(test)] +mod tests { + use iceberg::{ + spec::{Schema, TableMetadataBuilder}, + TableCreation, + }; + + use super::*; + + fn create_metadata(schema: Schema) -> Result { + let table_creation = TableCreation::builder() + .name("my_table".to_string()) + .location("my_location".to_string()) + .schema(schema) + .build(); + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + + Ok(metadata) + } + + fn create_column( + name: impl Into, + r#type: impl Into, + id: impl Into, + ) -> Result { + let parameters = HashMap::from([ + (ICEBERG_FIELD_ID.to_string(), id.into()), + (ICEBERG_FIELD_OPTIONAL.to_string(), "true".to_string()), + (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()), + ]); + + Column::builder() + .name(name) + .r#type(r#type) + .set_comment(None) + .set_parameters(Some(parameters)) + .build() + .map_err(from_aws_build_error) + } + + #[test] + fn test_schema_with_simple_fields() -> Result<()> { + let record = r#"{ + "type": "struct", + "schema-id": 1, + "fields": [ + { + "id": 1, + "name": "c1", + "required": true, + "type": "boolean" + }, + { + "id": 2, + "name": "c2", + "required": true, + "type": "int" + }, + { + "id": 3, + "name": "c3", + "required": true, + "type": "long" + }, + { + "id": 4, + "name": "c4", + "required": true, + "type": "float" + }, + { + "id": 5, + "name": "c5", + "required": true, + "type": "double" + }, + { + "id": 6, + "name": "c6", + "required": true, + "type": "decimal(2,2)" + }, + { + "id": 7, + "name": "c7", + "required": true, + "type": "date" + }, + { + "id": 8, + "name": "c8", + "required": true, + "type": "time" + }, + { + "id": 9, + "name": "c9", + "required": true, + "type": "timestamp" + }, + { + "id": 10, + "name": "c10", + "required": true, + "type": "string" + }, + { + "id": 11, + "name": "c11", + "required": true, + "type": "uuid" + }, + { + "id": 12, + "name": "c12", + "required": true, + "type": "fixed[4]" + }, + { + "id": 13, + "name": "c13", + "required": true, + "type": "binary" + } + ] + }"#; + + let schema = serde_json::from_str::(record)?; + let metadata = create_metadata(schema)?; + + let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build(); + + let expected = vec![ + create_column("c1", "boolean", "1")?, + create_column("c2", "int", "2")?, + create_column("c3", "bigint", "3")?, + create_column("c4", "float", "4")?, + create_column("c5", "double", "5")?, + create_column("c6", "decimal(2,2)", "6")?, + create_column("c7", "date", "7")?, + create_column("c8", "string", "8")?, + create_column("c9", "timestamp", "9")?, + create_column("c10", "string", "10")?, + create_column("c11", "string", "11")?, + create_column("c12", "binary", "12")?, + create_column("c13", "binary", "13")?, + ]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_schema_with_structs() -> Result<()> { + let record = r#"{ + "type": "struct", + "schema-id": 1, + "fields": [ + { + "id": 1, + "name": "person", + "required": true, + "type": { + "type": "struct", + "fields": [ + { + "id": 2, + "name": "name", + "required": true, + "type": "string" + }, + { + "id": 3, + "name": "age", + "required": false, + "type": "int" + } + ] + } + } + ] + }"#; + + let schema = serde_json::from_str::(record)?; + let metadata = create_metadata(schema)?; + + let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build(); + + let expected = vec![create_column( + "person", + "struct", + "1", + )?]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_schema_with_struct_inside_list() -> Result<()> { + let record = r#" + { + "schema-id": 1, + "type": "struct", + "fields": [ + { + "id": 1, + "name": "location", + "required": true, + "type": { + "type": "list", + "element-id": 2, + "element-required": true, + "element": { + "type": "struct", + "fields": [ + { + "id": 3, + "name": "latitude", + "required": false, + "type": "float" + }, + { + "id": 4, + "name": "longitude", + "required": false, + "type": "float" + } + ] + } + } + } + ] + } + "#; + + let schema = serde_json::from_str::(record)?; + let metadata = create_metadata(schema)?; + + let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build(); + + let expected = vec![create_column( + "location", + "array>", + "1", + )?]; + + assert_eq!(result, expected); + + Ok(()) + } + + #[test] + fn test_schema_with_nested_maps() -> Result<()> { + let record = r#" + { + "schema-id": 1, + "type": "struct", + "fields": [ + { + "id": 1, + "name": "quux", + "required": true, + "type": { + "type": "map", + "key-id": 2, + "key": "string", + "value-id": 3, + "value-required": true, + "value": { + "type": "map", + "key-id": 4, + "key": "string", + "value-id": 5, + "value-required": true, + "value": "int" + } + } + } + ] + } + "#; + + let schema = serde_json::from_str::(record)?; + let metadata = create_metadata(schema)?; + + let result = GlueSchemaBuilder::from_iceberg(&metadata)?.build(); + + let expected = vec![create_column("quux", "map>", "1")?]; + + assert_eq!(result, expected); + + Ok(()) + } +} diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index fa9ebb8031..dcf6bef461 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -20,14 +20,14 @@ use std::collections::HashMap; use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_sdk_glue::{ config::Credentials, - types::{Database, DatabaseInput}, + types::{Database, DatabaseInput, StorageDescriptor, TableInput}, }; +use iceberg::spec::TableMetadata; use iceberg::{Error, ErrorKind, Namespace, NamespaceIdent, Result}; +use uuid::Uuid; -use crate::error::from_aws_build_error; +use crate::{error::from_aws_build_error, schema::GlueSchemaBuilder}; -const _GLUE_SKIP_ARCHIVE: &str = "glue.skip-archive"; -const _GLUE_SKIP_ARCHIVE_DEFAULT: bool = true; /// Property aws profile name pub const AWS_PROFILE_NAME: &str = "profile_name"; /// Property aws region @@ -42,6 +42,16 @@ pub const AWS_SESSION_TOKEN: &str = "aws_session_token"; const DESCRIPTION: &str = "description"; /// Parameter namespace location uri const LOCATION: &str = "location_uri"; +/// Property `metadata_location` for `TableInput` +const METADATA_LOCATION: &str = "metadata_location"; +/// Property `previous_metadata_location` for `TableInput` +const PREV_METADATA_LOCATION: &str = "previous_metadata_location"; +/// Property external table for `TableInput` +const EXTERNAL_TABLE: &str = "EXTERNAL_TABLE"; +/// Parameter key `table_type` for `TableInput` +const TABLE_TYPE: &str = "table_type"; +/// Parameter value `table_type` for `TableInput` +const ICEBERG: &str = "ICEBERG"; /// Creates an aws sdk configuration based on /// provided properties and an optional endpoint URL. @@ -125,6 +135,50 @@ pub(crate) fn convert_to_namespace(database: &Database) -> Namespace { Namespace::with_properties(NamespaceIdent::new(db_name), properties) } +/// Converts Iceberg table metadata into an +/// AWS Glue `TableInput` representation. +/// +/// This function facilitates the integration of Iceberg tables with AWS Glue +/// by converting Iceberg table metadata into a Glue-compatible `TableInput` +/// structure. +pub(crate) fn convert_to_glue_table( + table_name: impl Into, + metadata_location: String, + metadata: &TableMetadata, + properties: &HashMap, + prev_metadata_location: Option, +) -> Result { + let glue_schema = GlueSchemaBuilder::from_iceberg(metadata)?.build(); + + let storage_descriptor = StorageDescriptor::builder() + .set_columns(Some(glue_schema)) + .location(&metadata_location) + .build(); + + let mut parameters = HashMap::from([ + (TABLE_TYPE.to_string(), ICEBERG.to_string()), + (METADATA_LOCATION.to_string(), metadata_location), + ]); + + if let Some(prev) = prev_metadata_location { + parameters.insert(PREV_METADATA_LOCATION.to_string(), prev); + } + + let mut table_input_builder = TableInput::builder() + .name(table_name) + .set_parameters(Some(parameters)) + .storage_descriptor(storage_descriptor) + .table_type(EXTERNAL_TABLE); + + if let Some(description) = properties.get(DESCRIPTION) { + table_input_builder = table_input_builder.description(description); + } + + let table_input = table_input_builder.build().map_err(from_aws_build_error)?; + + Ok(table_input) +} + /// Checks if provided `NamespaceIdent` is valid pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result { let name = namespace.as_ref(); @@ -151,6 +205,73 @@ pub(crate) fn validate_namespace(namespace: &NamespaceIdent) -> Result { Ok(name) } +/// Get default table location from `Namespace` properties +pub(crate) fn get_default_table_location( + namespace: &Namespace, + db_name: impl AsRef, + table_name: impl AsRef, + warehouse: impl AsRef, +) -> String { + let properties = namespace.properties(); + + match properties.get(LOCATION) { + Some(location) => format!("{}/{}", location, table_name.as_ref()), + None => { + let warehouse_location = warehouse.as_ref().trim_end_matches('/'); + + format!( + "{}/{}.db/{}", + warehouse_location, + db_name.as_ref(), + table_name.as_ref() + ) + } + } +} + +/// Create metadata location from `location` and `version` +pub(crate) fn create_metadata_location(location: impl AsRef, version: i32) -> Result { + if version < 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table metadata version: '{}' must be a non-negative integer", + version + ), + )); + }; + + let version = format!("{:0>5}", version); + let id = Uuid::new_v4(); + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + location.as_ref(), + version, + id + ); + + Ok(metadata_location) +} + +/// Get metadata location from `GlueTable` parameters +pub(crate) fn get_metadata_location( + parameters: &Option>, +) -> Result { + match parameters { + Some(properties) => match properties.get(METADATA_LOCATION) { + Some(location) => Ok(location.to_string()), + None => Err(Error::new( + ErrorKind::DataInvalid, + format!("No '{}' set on table", METADATA_LOCATION), + )), + }, + None => Err(Error::new( + ErrorKind::DataInvalid, + "No 'parameters' set on table. Location of metadata is undefined", + )), + } +} + #[macro_export] /// Extends aws sdk builder with `catalog_id` if present macro_rules! with_catalog_id { @@ -165,11 +286,145 @@ macro_rules! with_catalog_id { #[cfg(test)] mod tests { - use aws_sdk_glue::config::ProvideCredentials; - use iceberg::{Namespace, Result}; + use aws_sdk_glue::{config::ProvideCredentials, types::Column}; + use iceberg::{ + spec::{NestedField, PrimitiveType, Schema, TableMetadataBuilder, Type}, + Namespace, Result, TableCreation, + }; + + use crate::schema::{ICEBERG_FIELD_CURRENT, ICEBERG_FIELD_ID, ICEBERG_FIELD_OPTIONAL}; use super::*; + fn create_metadata(schema: Schema) -> Result { + let table_creation = TableCreation::builder() + .name("my_table".to_string()) + .location("my_location".to_string()) + .schema(schema) + .build(); + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + + Ok(metadata) + } + + #[test] + fn test_get_metadata_location() -> Result<()> { + let params_valid = Some(HashMap::from([( + METADATA_LOCATION.to_string(), + "my_location".to_string(), + )])); + let params_missing_key = Some(HashMap::from([( + "not_here".to_string(), + "my_location".to_string(), + )])); + + let result_valid = get_metadata_location(¶ms_valid)?; + let result_missing_key = get_metadata_location(¶ms_missing_key); + let result_no_params = get_metadata_location(&None); + + assert_eq!(result_valid, "my_location"); + assert!(result_missing_key.is_err()); + assert!(result_no_params.is_err()); + + Ok(()) + } + + #[test] + fn test_convert_to_glue_table() -> Result<()> { + let table_name = "my_table".to_string(); + let location = "s3a://warehouse/hive".to_string(); + let metadata_location = create_metadata_location(location.clone(), 0)?; + let properties = HashMap::new(); + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build()?; + + let metadata = create_metadata(schema)?; + + let parameters = HashMap::from([ + (ICEBERG_FIELD_ID.to_string(), "1".to_string()), + (ICEBERG_FIELD_OPTIONAL.to_string(), "true".to_string()), + (ICEBERG_FIELD_CURRENT.to_string(), "true".to_string()), + ]); + + let column = Column::builder() + .name("foo") + .r#type("int") + .set_parameters(Some(parameters)) + .set_comment(None) + .build() + .map_err(from_aws_build_error)?; + + let storage_descriptor = StorageDescriptor::builder() + .set_columns(Some(vec![column])) + .location(&metadata_location) + .build(); + + let result = + convert_to_glue_table(&table_name, metadata_location, &metadata, &properties, None)?; + + assert_eq!(result.name(), &table_name); + assert_eq!(result.description(), None); + assert_eq!(result.storage_descriptor, Some(storage_descriptor)); + + Ok(()) + } + + #[test] + fn test_create_metadata_location() -> Result<()> { + let location = "my_base_location"; + let valid_version = 0; + let invalid_version = -1; + + let valid_result = create_metadata_location(location, valid_version)?; + let invalid_result = create_metadata_location(location, invalid_version); + + assert!(valid_result.starts_with("my_base_location/metadata/00000-")); + assert!(valid_result.ends_with(".metadata.json")); + assert!(invalid_result.is_err()); + + Ok(()) + } + + #[test] + fn test_get_default_table_location() -> Result<()> { + let properties = HashMap::from([(LOCATION.to_string(), "db_location".to_string())]); + + let namespace = + Namespace::with_properties(NamespaceIdent::new("default".into()), properties); + let db_name = validate_namespace(namespace.name())?; + let table_name = "my_table"; + + let expected = "db_location/my_table"; + let result = + get_default_table_location(&namespace, db_name, table_name, "warehouse_location"); + + assert_eq!(expected, result); + + Ok(()) + } + + #[test] + fn test_get_default_table_location_warehouse() -> Result<()> { + let namespace = Namespace::new(NamespaceIdent::new("default".into())); + let db_name = validate_namespace(namespace.name())?; + let table_name = "my_table"; + + let expected = "warehouse_location/default.db/my_table"; + let result = + get_default_table_location(&namespace, db_name, table_name, "warehouse_location"); + + assert_eq!(expected, result); + + Ok(()) + } + #[test] fn test_convert_to_namespace() -> Result<()> { let db = Database::builder() diff --git a/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml b/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml index c24d2d79a5..a0be22e30c 100644 --- a/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml +++ b/crates/catalog/glue/testdata/glue_catalog/docker-compose.yaml @@ -18,6 +18,28 @@ version: '3.8' services: + minio: + image: minio/minio:RELEASE.2024-03-07T00-43-48Z + expose: + - 9000 + - 9001 + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + command: [ "server", "/data", "--console-address", ":9001" ] + + mc: + depends_on: + - minio + image: minio/mc:RELEASE.2024-03-07T00-31-49Z + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " + moto: image: motoserver/moto:5.0.3 expose: diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index c44ffcd2bb..eb0cd96b98 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -19,7 +19,9 @@ use std::collections::HashMap; -use iceberg::{Catalog, Namespace, NamespaceIdent, Result}; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, TableIdent}; use iceberg_catalog_glue::{ GlueCatalog, GlueCatalogConfig, AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, }; @@ -29,6 +31,7 @@ use port_scanner::scan_port_addr; use tokio::time::sleep; const GLUE_CATALOG_PORT: u16 = 5000; +const MINIO_PORT: u16 = 9000; #[derive(Debug)] struct TestFixture { @@ -47,6 +50,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { docker_compose.run(); let glue_catalog_ip = docker_compose.get_container_ip("moto"); + let minio_ip = docker_compose.get_container_ip("minio"); let read_port = format!("{}:{}", glue_catalog_ip, GLUE_CATALOG_PORT); loop { @@ -65,14 +69,22 @@ async fn set_test_fixture(func: &str) -> TestFixture { "my_secret_key".to_string(), ), (AWS_REGION_NAME.to_string(), "us-east-1".to_string()), + ( + S3_ENDPOINT.to_string(), + format!("http://{}:{}", minio_ip, MINIO_PORT), + ), + (S3_ACCESS_KEY_ID.to_string(), "admin".to_string()), + (S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()), + (S3_REGION.to_string(), "us-east-1".to_string()), ]); let config = GlueCatalogConfig::builder() .uri(format!("http://{}:{}", glue_catalog_ip, GLUE_CATALOG_PORT)) - .props(props) + .warehouse("s3a://warehouse/hive".to_string()) + .props(props.clone()) .build(); - let glue_catalog = GlueCatalog::new(config).await; + let glue_catalog = GlueCatalog::new(config).await.unwrap(); TestFixture { _docker_compose: docker_compose, @@ -91,6 +103,176 @@ async fn set_test_namespace(fixture: &TestFixture, namespace: &NamespaceIdent) - Ok(()) } +fn set_table_creation(location: impl ToString, name: impl ToString) -> Result { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let creation = TableCreation::builder() + .location(location.to_string()) + .name(name.to_string()) + .properties(HashMap::new()) + .schema(schema) + .build(); + + Ok(creation) +} + +#[tokio::test] +async fn test_rename_table() -> Result<()> { + let fixture = set_test_fixture("test_rename_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("my_database".into())); + + fixture + .glue_catalog + .create_namespace(namespace.name(), HashMap::new()) + .await?; + + let table = fixture + .glue_catalog + .create_table(namespace.name(), creation) + .await?; + + let dest = TableIdent::new(namespace.name().clone(), "my_table_rename".to_string()); + + fixture + .glue_catalog + .rename_table(table.identifier(), &dest) + .await?; + + let table = fixture.glue_catalog.load_table(&dest).await?; + assert_eq!(table.identifier(), &dest); + + let src = TableIdent::new(namespace.name().clone(), "my_table".to_string()); + + let src_table_exists = fixture.glue_catalog.table_exists(&src).await?; + assert!(!src_table_exists); + + Ok(()) +} + +#[tokio::test] +async fn test_table_exists() -> Result<()> { + let fixture = set_test_fixture("test_table_exists").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("my_database".into())); + + fixture + .glue_catalog + .create_namespace(namespace.name(), HashMap::new()) + .await?; + + let ident = TableIdent::new(namespace.name().clone(), "my_table".to_string()); + + let exists = fixture.glue_catalog.table_exists(&ident).await?; + assert!(!exists); + + let table = fixture + .glue_catalog + .create_table(namespace.name(), creation) + .await?; + + let exists = fixture + .glue_catalog + .table_exists(table.identifier()) + .await?; + + assert!(exists); + + Ok(()) +} + +#[tokio::test] +async fn test_drop_table() -> Result<()> { + let fixture = set_test_fixture("test_drop_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("my_database".into())); + + fixture + .glue_catalog + .create_namespace(namespace.name(), HashMap::new()) + .await?; + + let table = fixture + .glue_catalog + .create_table(namespace.name(), creation) + .await?; + + fixture.glue_catalog.drop_table(table.identifier()).await?; + + let result = fixture + .glue_catalog + .table_exists(table.identifier()) + .await?; + + assert!(!result); + + Ok(()) +} + +#[tokio::test] +async fn test_load_table() -> Result<()> { + let fixture = set_test_fixture("test_load_table").await; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + let namespace = Namespace::new(NamespaceIdent::new("my_database".into())); + + fixture + .glue_catalog + .create_namespace(namespace.name(), HashMap::new()) + .await?; + + let expected = fixture + .glue_catalog + .create_table(namespace.name(), creation) + .await?; + + let result = fixture + .glue_catalog + .load_table(&TableIdent::new( + namespace.name().clone(), + "my_table".to_string(), + )) + .await?; + + assert_eq!(result.identifier(), expected.identifier()); + assert_eq!(result.metadata_location(), expected.metadata_location()); + assert_eq!(result.metadata(), expected.metadata()); + + Ok(()) +} + +#[tokio::test] +async fn test_create_table() -> Result<()> { + let fixture = set_test_fixture("test_create_table").await; + let namespace = NamespaceIdent::new("my_database".to_string()); + set_test_namespace(&fixture, &namespace).await?; + let creation = set_table_creation("s3a://warehouse/hive", "my_table")?; + + let result = fixture + .glue_catalog + .create_table(&namespace, creation) + .await?; + + assert_eq!(result.identifier().name(), "my_table"); + assert!(result + .metadata_location() + .is_some_and(|location| location.starts_with("s3a://warehouse/hive/metadata/00000-"))); + assert!( + fixture + .glue_catalog + .file_io() + .is_exist("s3a://warehouse/hive/metadata/") + .await? + ); + + Ok(()) +} + #[tokio::test] async fn test_list_tables() -> Result<()> { let fixture = set_test_fixture("test_list_tables").await;