Skip to content

feat(catalog): implement catalog loader for glue #1603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ hive_metastore = "0.1"
http = "1.2"
iceberg = { version = "0.6.0", path = "./crates/iceberg" }
iceberg-catalog-rest = { version = "0.6.0", path = "./crates/catalog/rest" }
iceberg-catalog-glue = { version = "0.6.0", path = "./crates/catalog/glue" }
iceberg-datafusion = { version = "0.6.0", path = "./crates/integrations/datafusion" }
indicatif = "0.17"
itertools = "0.13"
Expand Down
1 change: 0 additions & 1 deletion crates/catalog/glue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ iceberg = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
typed-builder = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
Expand Down
92 changes: 83 additions & 9 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ use iceberg::io::{
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent,
Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
TableCommit, TableCreation, TableIdent,
};
use typed_builder::TypedBuilder;

use crate::error::{from_aws_build_error, from_aws_sdk_error};
use crate::utils::{
Expand All @@ -40,15 +39,90 @@ use crate::{
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id,
};

#[derive(Debug, TypedBuilder)]
/// Glue catalog URI
pub const GLUE_CATALOG_PROP_URI: &str = "uri";
/// Glue catalog id
pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id";
/// Glue catalog warehouse location
pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";

/// Builder for [`GlueCatalog`].
#[derive(Debug)]
pub struct GlueCatalogBuilder(GlueCatalogConfig);

impl Default for GlueCatalogBuilder {
fn default() -> Self {
Self(GlueCatalogConfig {
name: None,
uri: None,
catalog_id: None,
warehouse: "".to_string(),
props: HashMap::new(),
})
}
}

impl CatalogBuilder for GlueCatalogBuilder {
type C = GlueCatalog;

fn load(
mut self,
name: impl Into<String>,
props: HashMap<String, String>,
) -> impl Future<Output = Result<Self::C>> + Send {
self.0.name = Some(name.into());

if props.contains_key(GLUE_CATALOG_PROP_URI) {
self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned()
}

if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) {
self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned()
}

if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) {
self.0.warehouse = props
.get(GLUE_CATALOG_PROP_WAREHOUSE)
.cloned()
.unwrap_or_default();
}

// Collect other remaining properties
self.0.props = props
.into_iter()
.filter(|(k, _)| {
k != GLUE_CATALOG_PROP_URI
&& k != GLUE_CATALOG_PROP_CATALOG_ID
&& k != GLUE_CATALOG_PROP_WAREHOUSE
})
.collect();

async move {
if self.0.name.is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name is required",
));
}
if self.0.warehouse.is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog warehouse is required",
));
}

GlueCatalog::new(self.0).await
}
}
}

#[derive(Debug)]
/// Glue Catalog configuration
pub struct GlueCatalogConfig {
#[builder(default, setter(strip_option(fallback = uri_opt)))]
pub(crate) struct GlueCatalogConfig {
name: Option<String>,
uri: Option<String>,
#[builder(default, setter(strip_option(fallback = catalog_id_opt)))]
catalog_id: Option<String>,
warehouse: String,
#[builder(default)]
props: HashMap<String, String>,
}

Expand All @@ -71,7 +145,7 @@ impl Debug for GlueCatalog {

impl GlueCatalog {
/// Create a new glue catalog
pub async fn new(config: GlueCatalogConfig) -> Result<Self> {
async fn new(config: GlueCatalogConfig) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
let mut file_io_props = config.props.clone();
if !file_io_props.contains_key(S3_ACCESS_KEY_ID) {
Expand Down
24 changes: 24 additions & 0 deletions crates/catalog/glue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,30 @@
// under the License.

//! Iceberg Glue Catalog implementation.
//!
//! To build a glue catalog with configurations
//! # Example
//!
//! ```rust, no_run
//! use std::collections::HashMap;
//!
//! use iceberg::CatalogBuilder;
//! use iceberg_catalog_glue::{GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalogBuilder};
//!
//! #[tokio::main]
//! async fn main() {
//! let catalog = GlueCatalogBuilder::default()
//! .load(
//! "glue",
//! HashMap::from([(
//! GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need uri, name, catalog id?

Copy link
Contributor Author

@lliangyu-lin lliangyu-lin Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, they are actually optional except for catalog name, which is set as glue in .load("glue", xxx)

//! "s3://warehouse".to_string(),
//! )]),
//! )
//! .await
//! .unwrap();
//! }
//! ```

#![deny(missing_docs)]

Expand Down
28 changes: 20 additions & 8 deletions crates/catalog/glue/tests/glue_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ use std::sync::RwLock;
use ctor::{ctor, dtor};
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCreation, TableIdent};
use iceberg::{
Catalog, CatalogBuilder, Namespace, NamespaceIdent, Result, TableCreation, TableIdent,
};
use iceberg_catalog_glue::{
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GlueCatalog, GlueCatalogConfig,
AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GLUE_CATALOG_PROP_URI,
GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder,
};
use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
Expand Down Expand Up @@ -112,13 +115,22 @@ async fn get_catalog() -> GlueCatalog {
retries += 1;
}

let config = GlueCatalogConfig::builder()
.uri(format!("http://{}", glue_socket_addr))
.warehouse("s3a://warehouse/hive".to_string())
.props(props.clone())
.build();
let mut glue_props = HashMap::from([
(
GLUE_CATALOG_PROP_URI.to_string(),
format!("http://{}", glue_socket_addr),
),
(
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
"s3a://warehouse/hive".to_string(),
),
]);
glue_props.extend(props.clone());

GlueCatalog::new(config).await.unwrap()
GlueCatalogBuilder::default()
.load("glue", glue_props)
.await
.unwrap()
}

async fn set_test_namespace(catalog: &GlueCatalog, namespace: &NamespaceIdent) -> Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions crates/catalog/loader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ repository = { workspace = true }

[dependencies]
iceberg = { workspace = true }
iceberg-catalog-rest = {workspace = true}
iceberg-catalog-rest = { workspace = true }
iceberg-catalog-glue = { workspace = true }
tokio = { workspace = true }
async-trait = {workspace = true}
async-trait = { workspace = true }
27 changes: 25 additions & 2 deletions crates/catalog/loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
use iceberg_catalog_glue::GlueCatalogBuilder;
use iceberg_catalog_rest::RestCatalogBuilder;

#[async_trait]
Expand All @@ -46,6 +47,7 @@ impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
match r#type {
"rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box<dyn BoxedCatalogBuilder>),
"glue" => Ok(Box::new(GlueCatalogBuilder::default()) as Box<dyn BoxedCatalogBuilder>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoud we make these enum or static strings stored under the iceberg/src/catalog/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think having a enum storing the strings make sense. We could try raising it in this PR or create a separate minor PR for it.

_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Unsupported catalog type: {}", r#type),
Expand All @@ -57,12 +59,12 @@ pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> {
mod tests {
use std::collections::HashMap;

use iceberg_catalog_rest::REST_CATALOG_PROP_URI;

use crate::load;

#[tokio::test]
async fn test_load_rest_catalog() {
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;

let catalog_loader = load("rest").unwrap();
let catalog = catalog_loader
.load(
Expand All @@ -79,4 +81,25 @@ mod tests {

assert!(catalog.is_ok());
}

#[tokio::test]
async fn test_load_glue_catalog() {
use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE;

let catalog_loader = load("glue").unwrap();
let catalog = catalog_loader
.load(
"glue".to_string(),
HashMap::from([
(
GLUE_CATALOG_PROP_WAREHOUSE.to_string(),
"s3://test".to_string(),
),
("key".to_string(), "value".to_string()),
]),
)
.await;

assert!(catalog.is_ok());
}
}
Loading