Skip to content

feat(catalog): Implement catalog loader for hms #1612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 21, 2025
Merged
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ iceberg = { version = "0.6.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" }
iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" }
iceberg-catalog-s3tables = { version = "0.6.0", path = "./crates/catalog/s3tables" }
iceberg-catalog-hms = { version = "0.6.0", path = "./crates/catalog/hms" }
iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" }
indicatif = "0.17"
itertools = "0.13"
Expand Down
1 change: 0 additions & 1 deletion crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pilota = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
typed-builder = { workspace = true }
volo-thrift = { workspace = true }

# Transitive dependencies below
Expand Down
105 changes: 98 additions & 7 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,106 @@ use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent,
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
TableCommit, TableCreation, TableIdent,
};
use typed_builder::TypedBuilder;
use volo_thrift::MaybeException;

use super::utils::*;
use crate::error::{from_io_error, from_thrift_error, from_thrift_exception};

/// HMS catalog address
pub const HMS_CATALOG_PROP_URI: &str = "uri";

/// HMS Catalog thrift transport
pub const HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str = "thrift_transport";
/// HMS Catalog framed thrift transport
pub const THRIFT_TRANSPORT_FRAMED: &str = "framed";
/// HMS Catalog buffered thrift transport
pub const THRIFT_TRANSPORT_BUFFERED: &str = "buffered";

/// HMS Catalog warehouse location
pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";

/// Builder for [`RestCatalog`].
#[derive(Debug)]
pub struct HmsCatalogBuilder(HmsCatalogConfig);

impl Default for HmsCatalogBuilder {
fn default() -> Self {
Self(HmsCatalogConfig {
name: None,
address: "".to_string(),
thrift_transport: HmsThriftTransport::default(),
warehouse: "".to_string(),
props: HashMap::new(),
})
}
}

impl CatalogBuilder for HmsCatalogBuilder {
type C = HmsCatalog;

fn load(
mut self,
name: impl Into<String>,
props: HashMap<String, String>,
) -> impl Future<Output = Result<Self::C>> + Send {
self.0.name = Some(name.into());

if props.contains_key(HMS_CATALOG_PROP_URI) {
self.0.address = props.get(HMS_CATALOG_PROP_URI).cloned().unwrap_or_default();
}

if let Some(tt) = props.get(HMS_CATALOG_PROP_THRIFT_TRANSPORT) {
self.0.thrift_transport = match tt.to_lowercase().as_str() {
THRIFT_TRANSPORT_FRAMED => HmsThriftTransport::Framed,
THRIFT_TRANSPORT_BUFFERED => HmsThriftTransport::Buffered,
_ => HmsThriftTransport::default(),
};
}

if props.contains_key(HMS_CATALOG_PROP_WAREHOUSE) {
self.0.warehouse = props
.get(HMS_CATALOG_PROP_WAREHOUSE)
.cloned()
.unwrap_or_default();
}

self.0.props = props
.into_iter()
.filter(|(k, _)| {
k != HMS_CATALOG_PROP_URI
&& k != HMS_CATALOG_PROP_THRIFT_TRANSPORT
&& k != HMS_CATALOG_PROP_WAREHOUSE
})
.collect();

let result = {
if self.0.name.is_none() {
Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name is required",
))
} else if self.0.address.is_empty() {
Err(Error::new(
ErrorKind::DataInvalid,
"Catalog address is required",
))
} else if self.0.warehouse.is_empty() {
Err(Error::new(
ErrorKind::DataInvalid,
"Catalog warehouse is required",
))
} else {
HmsCatalog::new(self.0)
}
};

std::future::ready(result)
}
}

/// Which variant of the thrift transport to communicate with HMS
/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
#[derive(Debug, Default)]
Expand All @@ -50,12 +141,12 @@ pub enum HmsThriftTransport {
}

/// Hive metastore Catalog configuration.
#[derive(Debug, TypedBuilder)]
pub struct HmsCatalogConfig {
#[derive(Debug)]
pub(crate) struct HmsCatalogConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the TypedBuilder, it's not needed.

name: Option<String>,
address: String,
thrift_transport: HmsThriftTransport,
warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}

Expand All @@ -78,7 +169,7 @@ impl Debug for HmsCatalog {

impl HmsCatalog {
/// Create a new hms catalog.
pub fn new(config: HmsCatalogConfig) -> Result<Self> {
fn new(config: HmsCatalogConfig) -> Result<Self> {
let address = config
.address
.as_str()
Expand Down
29 changes: 29 additions & 0 deletions crates/catalog/hms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,35 @@
// under the License.

//! Iceberg Hive Metastore Catalog implementation.
//!
//! To build a hive metastore with configurations
//! # Example
//!
//! ```rust, no_run
//! use std::collections::HashMap;
//!
//! use iceberg::CatalogBuilder;
//! use iceberg_catalog_hms::{
//! HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, HmsCatalogBuilder,
//! };
//!
//! #[tokio::main]
//! async fn main() {
//! let catalog = HmsCatalogBuilder::default()
//! .load(
//! "hms",
//! HashMap::from([
//! (HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()),
//! (
//! HMS_CATALOG_PROP_WAREHOUSE.to_string(),
//! "s3://warehouse".to_string(),
//! ),
//! ]),
//! )
//! .await
//! .unwrap();
//! }
//! ```

#![deny(missing_docs)]

Expand Down
31 changes: 21 additions & 10 deletions crates/catalog/hms/tests/hms_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use std::sync::RwLock;
use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hms::{HmsCatalog, HmsCatalogConfig, HmsThriftTransport};
use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_hms::{
HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE,
HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED,
};
use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
use port_scanner::scan_port_addr;
Expand Down Expand Up @@ -79,6 +82,18 @@ async fn get_catalog() -> HmsCatalog {
}

let props = HashMap::from([
(
HMS_CATALOG_PROP_URI.to_string(),
hms_socket_addr.to_string(),
),
(
HMS_CATALOG_PROP_THRIFT_TRANSPORT.to_string(),
THRIFT_TRANSPORT_BUFFERED.to_string(),
),
(
HMS_CATALOG_PROP_WAREHOUSE.to_string(),
"s3a://warehouse/hive".to_string(),
),
(
S3_ENDPOINT.to_string(),
format!("http://{}", minio_socket_addr),
Expand Down Expand Up @@ -106,14 +121,10 @@ async fn get_catalog() -> HmsCatalog {
retries += 1;
}

let config = HmsCatalogConfig::builder()
.address(hms_socket_addr.to_string())
.thrift_transport(HmsThriftTransport::Buffered)
.warehouse("s3a://warehouse/hive".to_string())
.props(props)
.build();

HmsCatalog::new(config).unwrap()
HmsCatalogBuilder::default()
.load("hms", props)
.await
.unwrap()
}

async fn set_test_namespace(catalog: &HmsCatalog, namespace: &NamespaceIdent) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg-catalog-glue = { workspace = true }
iceberg-catalog-s3tables = { workspace = true }
iceberg-catalog-hms = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
64 changes: 44 additions & 20 deletions crates/catalog/loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
use iceberg_catalog_glue::GlueCatalogBuilder;
use iceberg_catalog_hms::HmsCatalogBuilder;
use iceberg_catalog_rest::RestCatalogBuilder;
use iceberg_catalog_s3tables::S3TablesCatalogBuilder;

Expand All @@ -32,6 +33,7 @@ static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[
("rest", || Box::new(RestCatalogBuilder::default())),
("glue", || Box::new(GlueCatalogBuilder::default())),
("s3tables", || Box::new(S3TablesCatalogBuilder::default())),
("hms", || Box::new(HmsCatalogBuilder::default())),
];

/// Return the list of supported catalog types.
Expand Down Expand Up @@ -109,17 +111,22 @@ mod tests {
use crate::{CatalogLoader, load};

#[tokio::test]
async fn test_load_glue_catalog() {
use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;
async fn test_load_unsupported_catalog() {
let result = load("unsupported");
assert!(result.is_err());
}

let catalog_loader = load("glue").unwrap();
let catalog = catalog_loader
#[tokio::test]
async fn test_catalog_loader_pattern() {
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;

let catalog = CatalogLoader::from("rest")
.load(
"glue".to_string(),
"rest".to_string(),
HashMap::from([
(
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
"s3://test".to_string(),
REST_CATALOG_PROP_URI.to_string(),
"http://localhost:8080".to_string(),
),
("key".to_string(), "value".to_string()),
]),
Expand All @@ -130,7 +137,7 @@ mod tests {
}

#[tokio::test]
async fn test_load_rest_catalog() {
async fn test_catalog_loader_pattern_rest_catalog() {
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;

let catalog_loader = load("rest").unwrap();
Expand All @@ -151,22 +158,17 @@ mod tests {
}

#[tokio::test]
async fn test_load_unsupported_catalog() {
let result = load("unsupported");
assert!(result.is_err());
}

#[tokio::test]
async fn test_catalog_loader_pattern() {
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
async fn test_catalog_loader_pattern_glue_catalog() {
use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;

let catalog = CatalogLoader::from("rest")
let catalog_loader = load("glue").unwrap();
let catalog = catalog_loader
.load(
"rest".to_string(),
"glue".to_string(),
HashMap::from([
(
REST_CATALOG_PROP_URI.to_string(),
"http://localhost:8080".to_string(),
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
"s3://test".to_string(),
),
("key".to_string(), "value".to_string()),
]),
Expand Down Expand Up @@ -196,6 +198,28 @@ mod tests {
assert!(catalog.is_ok());
}

#[tokio::test]
async fn test_catalog_loader_pattern_hms_catalog() {
use iceberg_catalog_hms::{HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE};

let catalog_loader = load("hms").unwrap();
let catalog = catalog_loader
.load(
"hms".to_string(),
HashMap::from([
(HMS_CATALOG_PROP_URI.to_string(), "127.0.0.1:1".to_string()),
(
HMS_CATALOG_PROP_WAREHOUSE.to_string(),
"s3://warehouse".to_string(),
),
("key".to_string(), "value".to_string()),
]),
)
.await;

assert!(catalog.is_ok());
}

#[tokio::test]
async fn test_error_message_includes_supported_types() {
let err = match load("does-not-exist") {
Expand Down
Loading