-
Notifications
You must be signed in to change notification settings - Fork 340
feat(catalog): implement catalog loader for glue #1603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
f4cdab3
f11200e
f67ce6c
99da161
245eb16
ac69dd4
99e3017
7ec8aea
33be497
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,8 +26,8 @@ use iceberg::io::{ | |
| use iceberg::spec::{TableMetadata, TableMetadataBuilder}; | ||
| use iceberg::table::Table; | ||
| use iceberg::{ | ||
| Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, | ||
| TableCreation, TableIdent, | ||
| Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, | ||
| TableCommit, TableCreation, TableIdent, | ||
| }; | ||
| use typed_builder::TypedBuilder; | ||
|
|
||
|
|
@@ -40,14 +40,97 @@ use crate::{ | |
| AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, with_catalog_id, | ||
| }; | ||
|
|
||
| /// Glue catalog URI | ||
| pub const GLUE_CATALOG_PROP_URI: &str = "uri"; | ||
| /// Glue catalog id | ||
| pub const GLUE_CATALOG_PROP_CATALOG_ID: &str = "catalog_id"; | ||
| /// Glue catalog warehouse location | ||
| pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; | ||
|
|
||
| /// Builder for [`GlueCatalog`]. | ||
| #[derive(Debug)] | ||
| pub struct GlueCatalogBuilder(GlueCatalogConfig); | ||
|
|
||
| impl Default for GlueCatalogBuilder { | ||
| fn default() -> Self { | ||
| Self(GlueCatalogConfig { | ||
| name: None, | ||
| uri: None, | ||
| catalog_id: None, | ||
| warehouse: "".to_string(), | ||
| props: HashMap::new(), | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| impl CatalogBuilder for GlueCatalogBuilder { | ||
| type C = GlueCatalog; | ||
|
|
||
| fn load( | ||
| mut self, | ||
| name: impl Into<String>, | ||
| props: HashMap<String, String>, | ||
| ) -> impl Future<Output = Result<Self::C>> + Send { | ||
| self.0.name = Some(name.into()); | ||
|
|
||
| if props.contains_key(GLUE_CATALOG_PROP_URI) { | ||
| self.0.uri = props.get(GLUE_CATALOG_PROP_URI).cloned() | ||
| } | ||
|
|
||
| if props.contains_key(GLUE_CATALOG_PROP_CATALOG_ID) { | ||
| self.0.catalog_id = props.get(GLUE_CATALOG_PROP_CATALOG_ID).cloned() | ||
| } | ||
|
|
||
| if props.contains_key(GLUE_CATALOG_PROP_WAREHOUSE) { | ||
| self.0.warehouse = props | ||
| .get(GLUE_CATALOG_PROP_WAREHOUSE) | ||
| .cloned() | ||
| .unwrap_or_default(); | ||
| } | ||
|
|
||
| // Collect other remaining properties | ||
| self.0.props = props | ||
| .into_iter() | ||
| .filter(|(k, _)| { | ||
| k != GLUE_CATALOG_PROP_URI | ||
| && k != GLUE_CATALOG_PROP_CATALOG_ID | ||
| && k != GLUE_CATALOG_PROP_WAREHOUSE | ||
| }) | ||
| .collect(); | ||
|
|
||
| async move { | ||
| if self.0.name.is_none() { | ||
| return Err(Error::new( | ||
| ErrorKind::DataInvalid, | ||
| "Catalog name is required", | ||
| )); | ||
| } | ||
| if self.0.warehouse.is_empty() { | ||
| return Err(Error::new( | ||
| ErrorKind::DataInvalid, | ||
| "Catalog warehouse is required", | ||
| )); | ||
| } | ||
|
|
||
| GlueCatalog::new(self.0).await | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, TypedBuilder)] | ||
|
||
| /// Glue Catalog configuration | ||
| pub struct GlueCatalogConfig { | ||
|
||
| #[builder(default, setter(strip_option))] | ||
| name: Option<String>, | ||
|
|
||
| #[builder(default, setter(strip_option(fallback = uri_opt)))] | ||
| uri: Option<String>, | ||
|
|
||
| #[builder(default, setter(strip_option(fallback = catalog_id_opt)))] | ||
| catalog_id: Option<String>, | ||
|
|
||
| warehouse: String, | ||
|
|
||
| #[builder(default)] | ||
| props: HashMap<String, String>, | ||
| } | ||
|
|
@@ -71,7 +154,7 @@ impl Debug for GlueCatalog { | |
|
|
||
| impl GlueCatalog { | ||
| /// Create a new glue catalog | ||
| pub async fn new(config: GlueCatalogConfig) -> Result<Self> { | ||
| async fn new(config: GlueCatalogConfig) -> Result<Self> { | ||
| let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; | ||
| let mut file_io_props = config.props.clone(); | ||
| if !file_io_props.contains_key(S3_ACCESS_KEY_ID) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,30 @@ | |
| // under the License. | ||
|
|
||
| //! Iceberg Glue Catalog implementation. | ||
| //! | ||
| //! To build a glue catalog with configurations | ||
| //! # Example | ||
| //! | ||
| //! ```rust, no_run | ||
| //! use std::collections::HashMap; | ||
| //! | ||
| //! use iceberg::CatalogBuilder; | ||
| //! use iceberg_catalog_glue::{GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalogBuilder}; | ||
| //! | ||
| //! #[tokio::main] | ||
| //! async fn main() { | ||
| //! let catalog = GlueCatalogBuilder::default() | ||
| //! .load( | ||
| //! "glue", | ||
| //! HashMap::from([( | ||
| //! GLUE_CATALOG_PROP_WAREHOUSE.to_string(), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we need uri, name, catalog id? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, they are actually optional except for catalog name, which is set as |
||
| //! "s3://warehouse".to_string(), | ||
| //! )]), | ||
| //! ) | ||
| //! .await | ||
| //! .unwrap(); | ||
| //! } | ||
| //! ``` | ||
| #![deny(missing_docs)] | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ use std::sync::Arc; | |
|
|
||
| use async_trait::async_trait; | ||
| use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; | ||
| use iceberg_catalog_glue::GlueCatalogBuilder; | ||
| use iceberg_catalog_rest::RestCatalogBuilder; | ||
|
|
||
| #[async_trait] | ||
|
|
@@ -46,6 +47,7 @@ impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T { | |
| pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> { | ||
| match r#type { | ||
| "rest" => Ok(Box::new(RestCatalogBuilder::default()) as Box<dyn BoxedCatalogBuilder>), | ||
| "glue" => Ok(Box::new(GlueCatalogBuilder::default()) as Box<dyn BoxedCatalogBuilder>), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shoud we make these enum or static strings stored under the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think having a enum storing the strings make sense. We could try raising it in this PR or create a separate minor PR for it. |
||
| _ => Err(Error::new( | ||
| ErrorKind::FeatureUnsupported, | ||
| format!("Unsupported catalog type: {}", r#type), | ||
|
|
@@ -57,12 +59,12 @@ pub fn load(r#type: &str) -> Result<Box<dyn BoxedCatalogBuilder>> { | |
| mod tests { | ||
| use std::collections::HashMap; | ||
|
|
||
| use iceberg_catalog_rest::REST_CATALOG_PROP_URI; | ||
|
|
||
| use crate::load; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_load_rest_catalog() { | ||
| use iceberg_catalog_rest::REST_CATALOG_PROP_URI; | ||
|
|
||
| let catalog_loader = load("rest").unwrap(); | ||
| let catalog = catalog_loader | ||
| .load( | ||
|
|
@@ -79,4 +81,25 @@ mod tests { | |
|
|
||
| assert!(catalog.is_ok()); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_load_glue_catalog() { | ||
| use iceberg_catalog_glue::GLUE_CATALOG_PROP_WAREHOUSE; | ||
|
|
||
| let catalog_loader = load("glue").unwrap(); | ||
| let catalog = catalog_loader | ||
| .load( | ||
| "glue".to_string(), | ||
| HashMap::from([ | ||
| ( | ||
| GLUE_CATALOG_PROP_WAREHOUSE.to_string(), | ||
| "s3://test".to_string(), | ||
| ), | ||
| ("key".to_string(), "value".to_string()), | ||
| ]), | ||
| ) | ||
| .await; | ||
|
|
||
| assert!(catalog.is_ok()); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.