diff --git a/Cargo.lock b/Cargo.lock index 48a995242f..5b3ba0df36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3583,7 +3583,6 @@ dependencies = [ "serde_json", "tokio", "tracing", - "typed-builder 0.20.1", "volo", "volo-thrift", ] @@ -3595,6 +3594,7 @@ dependencies = [ "async-trait", "iceberg", "iceberg-catalog-glue", + "iceberg-catalog-hms", "iceberg-catalog-rest", "iceberg-catalog-s3tables", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 868284e751..385693edf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ iceberg = { version = "0.6.0", path = "./crates/iceberg" } iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" } iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" } iceberg-catalog-s3tables = { version = "0.6.0", path = "./crates/catalog/s3tables" } +iceberg-catalog-hms = { version = "0.6.0", path = "./crates/catalog/hms" } iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 707f3ed6a4..549dbb9c02 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -38,7 +38,6 @@ pilota = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -typed-builder = { workspace = true } volo-thrift = { workspace = true } # Transitive dependencies below diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 899811f969..c8f046cb7e 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -29,15 +29,106 @@ use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, + TableCommit, TableCreation, TableIdent, }; -use typed_builder::TypedBuilder; use volo_thrift::MaybeException; use super::utils::*; use crate::error::{from_io_error, from_thrift_error, from_thrift_exception}; +/// HMS catalog address +pub const HMS_CATALOG_PROP_URI: &str = "uri"; + +/// HMS Catalog thrift transport +pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport"; +/// HMS Catalog framed thrift transport +pub const THRIFT_TRANSPORT_FRAMED: &str = "framed"; +/// HMS Catalog buffered thrift transport +pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered"; + +/// HMS Catalog warehouse location +pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; + +/// Builder for [`RestCatalog`]. +#[derive(Debug)] +pub struct HmsCatalogBuilder(HmsCatalogConfig); + +impl Default for HmsCatalogBuilder { + fn default() -> Self { + Self(HmsCatalogConfig { + name: None, + address: "".to_string(), + thrift_transport: HmsThriftTransport::default(), + warehouse: "".to_string(), + props: HashMap::new(), + }) + } +} + +impl CatalogBuilder for HmsCatalogBuilder { + type C = HmsCatalog; + + fn load( + mut self, + name: impl Into, + props: HashMap, + ) -> impl Future> + Send { + self.0.name = Some(name.into()); + + if props.contains_key(HMS_CATALOG_PROP_URI) { + self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default(); + } + + if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) { + self.0.thrift_transport = match tt.to_lowercase().as_str() { + THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed, + THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered, + _ => HmsThriftTransport::default(), + }; + } + + if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) { + self.0.warehouse = props + .get(HMS_CATALOG_PROP_WAREHOUSE) + .cloned() + .unwrap_or_default(); + } + + self.0.props = props + .into_iter() + .filter(|(k, _)| { + k != HMS_CATALOG_PROP_URI + && k != HMS_CATALOG_PROP_THRIFT_TRANSPORT + && k != HMS_CATALOG_PROP_WAREHOUSE + }) + .collect(); + + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else if self.0.address.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog address is required", + )) + } else if self.0.warehouse.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog warehouse is required", + )) + } else { + HmsCatalog::new(self.0) + } + }; + + std::future::ready(result) + } +} + /// Which variant of the thrift transport to communicate with HMS /// See: #[derive(Debug, Default)] @@ -50,12 +141,12 @@ pub enum HmsThriftTransport { } /// Hive metastore Catalog configuration. -#[derive(Debug, TypedBuilder)] -pub struct HmsCatalogConfig { +#[derive(Debug)] +pub(crate) struct HmsCatalogConfig { + name: Option, address: String, thrift_transport: HmsThriftTransport, warehouse: String, - #[builder(default)] props: HashMap, } @@ -78,7 +169,7 @@ impl Debug for HmsCatalog { impl HmsCatalog { /// Create a new hms catalog. - pub fn new(config: HmsCatalogConfig) -> Result { + fn new(config: HmsCatalogConfig) -> Result { let address = config .address .as_str() diff --git a/crates/catalog/hms/src/lib.rs b/crates/catalog/hms/src/lib.rs index db0034d46b..ea87400f08 100644 --- a/crates/catalog/hms/src/lib.rs +++ b/crates/catalog/hms/src/lib.rs @@ -16,6 +16,35 @@ // under the License. //! Iceberg Hive Metastore Catalog implementation. +//! +//! To build a hive metastore with configurations +//! # Example +//! +//! ```rust, no_run +//! use std::collections::HashMap; +//! +//! use iceberg::CatalogBuilder; +//! use iceberg_catalog_hms::{ +//! HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, HmsCatalogBuilder, +//! }; +//! +//! #[tokio::main] +//! async fn main() { +//! let catalog = HmsCatalogBuilder::default() +//! .load( +//! "hms", +//! HashMap::from([ +//! (HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()), +//! ( +//! HMS_CATALOG_PROP_WAREHOUSE.to_string(), +//! "s3://warehouse".to_string(), +//! ), +//! ]), +//! ) +//! .await +//! .unwrap(); +//! } +//! ``` #![deny(missing_docs)] diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 74f1e3c14e..2bf12779b1 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -24,8 +24,11 @@ use std::sync::RwLock; use ctor::{ctor, dtor}; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; -use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; -use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport}; +use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; +use iceberg_catalog_hms::{ + HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, + HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED, +}; use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use port_scanner::scan_port_addr; @@ -79,6 +82,18 @@ async fn get_catalog() -> HmsCatalog { } let props = HashMap::from([ + ( + HMS_CATALOG_PROP_URI.to_string(), + hms_socket_addr.to_string(), + ), + ( + HMS_CATALOG_PROP_THRIFT_TRANSPORT.to_string(), + THRIFT_TRANSPORT_BUFFERED.to_string(), + ), + ( + HMS_CATALOG_PROP_WAREHOUSE.to_string(), + "s3a://warehouse/hive".to_string(), + ), ( S3_ENDPOINT.to_string(), format!("http://{}", minio_socket_addr), @@ -106,14 +121,10 @@ async fn get_catalog() -> HmsCatalog { retries += 1; } - let config = HmsCatalogConfig::builder() - .address(hms_socket_addr.to_string()) - .thrift_transport(HmsThriftTransport::Buffered) - .warehouse("s3a://warehouse/hive".to_string()) - .props(props) - .build(); - - HmsCatalog::new(config).unwrap() + HmsCatalogBuilder::default() + .load("hms", props) + .await + .unwrap() } async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent) -> Result<()> { diff --git a/crates/catalog/loader/Cargo.toml b/crates/catalog/loader/Cargo.toml index 75cb5ff746..014f84a3a1 100644 --- a/crates/catalog/loader/Cargo.toml +++ b/crates/catalog/loader/Cargo.toml @@ -33,5 +33,6 @@ iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } iceberg-catalog-glue = { workspace = true } iceberg-catalog-s3tables = { workspace = true } +iceberg-catalog-hms = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index d39a750917..9c18ab4e5f 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use async_trait::async_trait; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_glue::GlueCatalogBuilder; +use iceberg_catalog_hms::HmsCatalogBuilder; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_catalog_s3tables::S3TablesCatalogBuilder; @@ -32,6 +33,7 @@ static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[ ("rest", || Box::new(RestCatalogBuilder::default())), ("glue", || Box::new(GlueCatalogBuilder::default())), ("s3tables", || Box::new(S3TablesCatalogBuilder::default())), + ("hms", || Box::new(HmsCatalogBuilder::default())), ]; /// Return the list of supported catalog types. @@ -109,17 +111,22 @@ mod tests { use crate::{CatalogLoader, load}; #[tokio::test] - async fn test_load_glue_catalog() { - use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE; + async fn test_load_unsupported_catalog() { + let result = load("unsupported"); + assert!(result.is_err()); + } - let catalog_loader = load("glue").unwrap(); - let catalog = catalog_loader + #[tokio::test] + async fn test_catalog_loader_pattern() { + use iceberg_catalog_rest::REST_CATALOG_PROP_URI; + + let catalog = CatalogLoader::from("rest") .load( - "glue".to_string(), + "rest".to_string(), HashMap::from([ ( - GLUE_CATALOG_PROP_WAREHOUSE.to_string(), - "s3://test".to_string(), + REST_CATALOG_PROP_URI.to_string(), + "http://localhost:8080".to_string(), ), ("key".to_string(), "value".to_string()), ]), @@ -130,7 +137,7 @@ mod tests { } #[tokio::test] - async fn test_load_rest_catalog() { + async fn test_catalog_loader_pattern_rest_catalog() { use iceberg_catalog_rest::REST_CATALOG_PROP_URI; let catalog_loader = load("rest").unwrap(); @@ -151,22 +158,17 @@ mod tests { } #[tokio::test] - async fn test_load_unsupported_catalog() { - let result = load("unsupported"); - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_catalog_loader_pattern() { - use iceberg_catalog_rest::REST_CATALOG_PROP_URI; + async fn test_catalog_loader_pattern_glue_catalog() { + use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE; - let catalog = CatalogLoader::from("rest") + let catalog_loader = load("glue").unwrap(); + let catalog = catalog_loader .load( - "rest".to_string(), + "glue".to_string(), HashMap::from([ ( - REST_CATALOG_PROP_URI.to_string(), - "http://localhost:8080".to_string(), + GLUE_CATALOG_PROP_WAREHOUSE.to_string(), + "s3://test".to_string(), ), ("key".to_string(), "value".to_string()), ]), @@ -196,6 +198,28 @@ mod tests { assert!(catalog.is_ok()); } + #[tokio::test] + async fn test_catalog_loader_pattern_hms_catalog() { + use iceberg_catalog_hms::{HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE}; + + let catalog_loader = load("hms").unwrap(); + let catalog = catalog_loader + .load( + "hms".to_string(), + HashMap::from([ + (HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()), + ( + HMS_CATALOG_PROP_WAREHOUSE.to_string(), + "s3://warehouse".to_string(), + ), + ("key".to_string(), "value".to_string()), + ]), + ) + .await; + + assert!(catalog.is_ok()); + } + #[tokio::test] async fn test_error_message_includes_supported_types() { let err = match load("does-not-exist") {