diff --git a/Cargo.lock b/Cargo.lock index a0b74a8786..17319c3c2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3613,6 +3613,7 @@ version = "0.6.0" dependencies = [ "async-trait", "iceberg", + "iceberg-catalog-glue", "iceberg-catalog-rest", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index a585be7d77..4f03a2021b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ hive_metastore = "0.1" http = "1.2" iceberg = { version = "0.6.0", path = "./crates/iceberg" } iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" } +iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" } iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml index 613160e468..b6126021f5 100644 --- a/crates/catalog/glue/Cargo.toml +++ b/crates/catalog/glue/Cargo.toml @@ -37,7 +37,6 @@ iceberg = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -typed-builder = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index fb4bd36b8d..c7584b596c 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -26,10 +26,9 @@ use iceberg::io::{ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, + TableCommit, TableCreation, TableIdent, }; -use typed_builder::TypedBuilder; use crate::error::{from_aws_build_error, from_aws_sdk_error}; use crate::utils::{ @@ -40,15 +39,90 @@ use crate::{ AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id, }; -#[derive(Debug, TypedBuilder)] +/// Glue catalog URI +pub const GLUE_CATALOG_PROP_URI: &str = "uri"; +/// Glue catalog id +pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id"; +/// Glue catalog warehouse location +pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; + +/// Builder for [`GlueCatalog`]. +#[derive(Debug)] +pub struct GlueCatalogBuilder(GlueCatalogConfig); + +impl Default for GlueCatalogBuilder { + fn default() -> Self { + Self(GlueCatalogConfig { + name: None, + uri: None, + catalog_id: None, + warehouse: "".to_string(), + props: HashMap::new(), + }) + } +} + +impl CatalogBuilder for GlueCatalogBuilder { + type C = GlueCatalog; + + fn load( + mut self, + name: impl Into, + props: HashMap, + ) -> impl Future> + Send { + self.0.name = Some(name.into()); + + if props.contains_key(GLUE_CATALOG_PROP_URI) { + self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned() + } + + if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) { + self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned() + } + + if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) { + self.0.warehouse = props + .get(GLUE_CATALOG_PROP_WAREHOUSE) + .cloned() + .unwrap_or_default(); + } + + // Collect other remaining properties + self.0.props = props + .into_iter() + .filter(|(k, _)| { + k != GLUE_CATALOG_PROP_URI + && k != GLUE_CATALOG_PROP_CATALOG_ID + && k != GLUE_CATALOG_PROP_WAREHOUSE + }) + .collect(); + + async move { + if self.0.name.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )); + } + if self.0.warehouse.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Catalog warehouse is required", + )); + } + + GlueCatalog::new(self.0).await + } + } +} + +#[derive(Debug)] /// Glue Catalog configuration -pub struct GlueCatalogConfig { - #[builder(default, setter(strip_option(fallback = uri_opt)))] +pub(crate) struct GlueCatalogConfig { + name: Option, uri: Option, - #[builder(default, setter(strip_option(fallback = catalog_id_opt)))] catalog_id: Option, warehouse: String, - #[builder(default)] props: HashMap, } @@ -71,7 +145,7 @@ impl Debug for GlueCatalog { impl GlueCatalog { /// Create a new glue catalog - pub async fn new(config: GlueCatalogConfig) -> Result { + async fn new(config: GlueCatalogConfig) -> Result { let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; let mut file_io_props = config.props.clone(); if !file_io_props.contains_key(S3_ACCESS_KEY_ID) { diff --git a/crates/catalog/glue/src/lib.rs b/crates/catalog/glue/src/lib.rs index 2376573358..1b9efe3770 100644 --- a/crates/catalog/glue/src/lib.rs +++ b/crates/catalog/glue/src/lib.rs @@ -16,6 +16,30 @@ // under the License. //! Iceberg Glue Catalog implementation. +//! +//! To build a glue catalog with configurations +//! # Example +//! +//! ```rust, no_run +//! use std::collections::HashMap; +//! +//! use iceberg::CatalogBuilder; +//! use iceberg_catalog_glue::{GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalogBuilder}; +//! +//! #[tokio::main] +//! async fn main() { +//! let catalog = GlueCatalogBuilder::default() +//! .load( +//! "glue", +//! HashMap::from([( +//! GLUE_CATALOG_PROP_WAREHOUSE.to_string(), +//! "s3://warehouse".to_string(), +//! )]), +//! ) +//! .await +//! .unwrap(); +//! } +//! ``` #![deny(missing_docs)] diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index 2f7b1052ca..14d894ae12 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -24,9 +24,12 @@ use std::sync::RwLock; use ctor::{ctor, dtor}; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; -use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, TableIdent}; +use iceberg::{ + Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation, TableIdent, +}; use iceberg_catalog_glue::{ - AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GlueCatalog, GlueCatalogConfig, + AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GLUE_CATALOG_PROP_URI, + GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder, }; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; @@ -112,13 +115,22 @@ async fn get_catalog() -> GlueCatalog { retries += 1; } - let config = GlueCatalogConfig::builder() - .uri(format!("http://{}", glue_socket_addr)) - .warehouse("s3a://warehouse/hive".to_string()) - .props(props.clone()) - .build(); + let mut glue_props = HashMap::from([ + ( + GLUE_CATALOG_PROP_URI.to_string(), + format!("http://{}", glue_socket_addr), + ), + ( + GLUE_CATALOG_PROP_WAREHOUSE.to_string(), + "s3a://warehouse/hive".to_string(), + ), + ]); + glue_props.extend(props.clone()); - GlueCatalog::new(config).await.unwrap() + GlueCatalogBuilder::default() + .load("glue", glue_props) + .await + .unwrap() } async fn set_test_namespace(catalog: &GlueCatalog, namespace: &NamespaceIdent) -> Result<()> { diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index d29edad051..136847d971 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -30,6 +30,7 @@ repository = { workspace = true } [dependencies] iceberg = { workspace = true } -iceberg-catalog-rest = {workspace = true} +iceberg-catalog-rest = { workspace = true } +iceberg-catalog-glue = { workspace = true } tokio = { workspace = true } -async-trait = {workspace = true} +async-trait = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index e5fce46822..c1e88b0cea 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use async_trait::async_trait; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; +use iceberg_catalog_glue::GlueCatalogBuilder; use iceberg_catalog_rest::RestCatalogBuilder; #[async_trait] @@ -46,6 +47,7 @@ impl BoxedCatalogBuilder for T { pub fn load(r#type: &str) -> Result> { match r#type { "rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box), + "glue" => Ok(Box::new(GlueCatalogBuilder::default()) as Box), _ => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Unsupported catalog type: {}", r#type), @@ -57,12 +59,12 @@ pub fn load(r#type: &str) -> Result> { mod tests { use std::collections::HashMap; - use iceberg_catalog_rest::REST_CATALOG_PROP_URI; - use crate::load; #[tokio::test] async fn test_load_rest_catalog() { + use iceberg_catalog_rest::REST_CATALOG_PROP_URI; + let catalog_loader = load("rest").unwrap(); let catalog = catalog_loader .load( @@ -79,4 +81,25 @@ mod tests { assert!(catalog.is_ok()); } + + #[tokio::test] + async fn test_load_glue_catalog() { + use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE; + + let catalog_loader = load("glue").unwrap(); + let catalog = catalog_loader + .load( + "glue".to_string(), + HashMap::from([ + ( + GLUE_CATALOG_PROP_WAREHOUSE.to_string(), + "s3://test".to_string(), + ), + ("key".to_string(), "value".to_string()), + ]), + ) + .await; + + assert!(catalog.is_ok()); + } }