diff --git a/extensions/federation/hive/README.md b/extensions/federation/hive/README.md new file mode 100644 index 0000000000..4536a87951 --- /dev/null +++ b/extensions/federation/hive/README.md @@ -0,0 +1,31 @@ + +### Using the `HiveFederatedCatalogFactory` + +This `HiveFederatedCatalogFactory` module is an independent compilation unit and will be built into the Polaris binary only when the following flag is set in the gradle.properties file: +``` +NonRESTCatalogs=HIVE, +``` + +The other option is to pass it as an argument to the gradle JVM as follows: +``` +./gradlew build -DNonRESTCatalogs=HIVE +``` + +Without this flag, the Hive factory won't be compiled into Polaris and therefore Polaris will not load the class at runtime, throwing an unsupported exception for federated catalog calls. \ No newline at end of file diff --git a/extensions/federation/hive/build.gradle.kts b/extensions/federation/hive/build.gradle.kts new file mode 100644 index 0000000000..37b2b687da --- /dev/null +++ b/extensions/federation/hive/build.gradle.kts @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("polaris-client") + alias(libs.plugins.jandex) +} + +dependencies { + // Polaris dependencies + implementation(project(":polaris-core")) + + implementation(platform(libs.iceberg.bom)) + implementation("org.apache.iceberg:iceberg-api") + implementation("org.apache.iceberg:iceberg-core") + implementation("org.apache.iceberg:iceberg-common") + // Use iceberg-hive-metastore but exclude conflicting hive dependencies + implementation("org.apache.iceberg:iceberg-hive-metastore") { exclude(group = "org.apache.hive") } + // Add our own Hive 4.1.0 dependencies + implementation(libs.hive.metastore) { + exclude("org.slf4j", "slf4j-reload4j") + exclude("org.slf4j", "slf4j-log4j12") + exclude("ch.qos.reload4j", "reload4j") + exclude("log4j", "log4j") + exclude("org.apache.zookeeper", "zookeeper") + } + + // Hadoop dependencies + implementation(libs.hadoop.common) { + exclude("org.slf4j", "slf4j-reload4j") + exclude("org.slf4j", "slf4j-log4j12") + exclude("ch.qos.reload4j", "reload4j") + exclude("log4j", "log4j") + exclude("org.apache.zookeeper", "zookeeper") + exclude("org.apache.hadoop.thirdparty", "hadoop-shaded-protobuf_3_25") + exclude("com.github.pjfanning", "jersey-json") + exclude("com.sun.jersey", "jersey-core") + exclude("com.sun.jersey", "jersey-server") + exclude("com.sun.jersey", "jersey-servlet") + exclude("io.dropwizard.metrics", "metrics-core") + } + + // CDI dependencies for runtime discovery + implementation(libs.jakarta.enterprise.cdi.api) + implementation(libs.smallrye.common.annotation) + + // Logging + implementation(libs.slf4j.api) +} diff --git a/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java new file mode 100644 index 0000000000..3e607e12ec --- /dev/null +++ b/extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.extensions.federation.hive; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.polaris.core.catalog.ExternalCatalogFactory; +import org.apache.polaris.core.connection.AuthenticationParametersDpo; +import org.apache.polaris.core.connection.AuthenticationType; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo; +import org.apache.polaris.core.secrets.UserSecretsManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Factory class for creating a Hive catalog handle based on connection configuration. */ +@ApplicationScoped +@Identifier(ConnectionType.HIVE_FACTORY_IDENTIFIER) +public class HiveFederatedCatalogFactory implements ExternalCatalogFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(HiveFederatedCatalogFactory.class); + + @Override + public Catalog createCatalog( + ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) { + // Currently, Polaris supports Hive federation only via IMPLICIT authentication. + // Hence, prior to initializing the configuration, ensure that the catalog uses + // IMPLICIT authentication. + AuthenticationParametersDpo authenticationParametersDpo = + connectionConfigInfoDpo.getAuthenticationParameters(); + if (authenticationParametersDpo.getAuthenticationTypeCode() + != AuthenticationType.IMPLICIT.getCode()) { + throw new IllegalStateException("Hive federation only supports IMPLICIT authentication."); + } + String warehouse = ((HiveConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); + // Unlike Hadoop, HiveCatalog does not require us to create a Configuration object, the iceberg + // rest library find the default configuration by reading hive-site.xml in the classpath + // (including HADOOP_CONF_DIR classpath). + + // TODO: In the future, we could support multiple HiveCatalog instances based on polaris/catalog + // properties. + // A brief set of setps involved (and the options): + // 1. Create a configuration without default properties. + // `Configuration conf = new Configuration(boolean loadDefaults=false);` + // 2a. Specify the hive-site.xml file path in the configuration. + // `conf.addResource(new Path(hiveSiteXmlPath));` + // 2b. Specify individual properties in the configuration. + // `conf.set(property, value);` + // Polaris could support federating to multiple LDAP based Hive metastores. Multiple + // Kerberos instances are not suitable because Kerberos ties a single identity to the server. + HiveCatalog hiveCatalog = new HiveCatalog(); + hiveCatalog.initialize( + warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager)); + return hiveCatalog; + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1697eafc52..58c2972c27 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -20,6 +20,7 @@ [versions] checkstyle = "10.25.0" hadoop = "3.4.1" +hive = "3.1.3" iceberg = "1.9.2" # Ensure to update the iceberg version in regtests to keep regtests up-to-date quarkus = "3.25.0" immutables = "2.11.3" @@ -56,6 +57,7 @@ dnsjava = { module = "dnsjava:dnsjava", version = "3.6.3" } hadoop-client-api = { module = "org.apache.hadoop:hadoop-client-api", version.ref = "hadoop" } hadoop-client-runtime = { module = "org.apache.hadoop:hadoop-client-runtime", version.ref = "hadoop" } hadoop-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop" } +hive-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive" } hadoop-hdfs-client = { module = "org.apache.hadoop:hadoop-hdfs-client", version.ref = "hadoop" } hawkular-agent-prometheus-scraper = { module = "org.hawkular.agent:prometheus-scraper", version = "0.23.0.Final" } immutables-builder = { module = "org.immutables:builder", version.ref = "immutables" } diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 1b74232b59..27fdae3556 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -43,6 +43,7 @@ polaris-version=tools/version polaris-misc-types=tools/misc-types polaris-persistence-varint=nosql/persistence/varint polaris-extensions-federation-hadoop=extensions/federation/hadoop +polaris-extensions-federation-hive=extensions/federation/hive polaris-config-docs-annotations=tools/config-docs/annotations polaris-config-docs-generator=tools/config-docs/generator diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java index fe4d183303..e5959755b6 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java @@ -32,8 +32,10 @@ import java.util.Map; import org.apache.polaris.core.admin.model.ConnectionConfigInfo; import org.apache.polaris.core.admin.model.HadoopConnectionConfigInfo; +import org.apache.polaris.core.admin.model.HiveConnectionConfigInfo; import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo; import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo; import org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider; import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo; import org.apache.polaris.core.secrets.UserSecretReference; @@ -51,6 +53,7 @@ @JsonSubTypes({ @JsonSubTypes.Type(value = IcebergRestConnectionConfigInfoDpo.class, name = "1"), @JsonSubTypes.Type(value = HadoopConnectionConfigInfoDpo.class, name = "2"), + @JsonSubTypes.Type(value = HiveConnectionConfigInfoDpo.class, name = "3"), }) public abstract class ConnectionConfigInfoDpo implements IcebergCatalogPropertiesProvider { private static final Logger logger = LoggerFactory.getLogger(ConnectionConfigInfoDpo.class); @@ -125,6 +128,11 @@ public static ConnectionConfigInfoDpo deserialize(final @Nonnull String jsonStr) protected void validateUri(String uri) { try { URI uriObj = URI.create(uri); + if (connectionTypeCode == ConnectionType.HIVE.getCode() + && uriObj.getScheme().equals("thrift")) { + // Hive metastore runs a thrift server. + return; + } URL url = uriObj.toURL(); } catch (IllegalArgumentException | MalformedURLException e) { throw new IllegalArgumentException("Invalid remote URI: " + uri, e); @@ -166,6 +174,16 @@ public static ConnectionConfigInfoDpo fromConnectionConfigInfoModelWithSecrets( authenticationParameters, hadoopConfigModel.getWarehouse()); break; + case HIVE: + HiveConnectionConfigInfo hiveConfigModel = + (HiveConnectionConfigInfo) connectionConfigurationModel; + authenticationParameters = + AuthenticationParametersDpo.fromAuthenticationParametersModelWithSecrets( + hiveConfigModel.getAuthenticationParameters(), secretReferences); + config = + new HiveConnectionConfigInfoDpo( + hiveConfigModel.getUri(), authenticationParameters, hiveConfigModel.getWarehouse()); + break; default: throw new IllegalStateException( "Unsupported connection type: " + connectionConfigurationModel.getConnectionType()); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java index 7c5092c431..2df6b74282 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionType.java @@ -33,10 +33,12 @@ public enum ConnectionType { NULL_TYPE(0), ICEBERG_REST(1), HADOOP(2), + HIVE(3), ; public static final String ICEBERG_REST_FACTORY_IDENTIFIER = "iceberg_rest"; public static final String HADOOP_FACTORY_IDENTIFIER = "hadoop"; + public static final String HIVE_FACTORY_IDENTIFIER = "hive"; private static final ConnectionType[] REVERSE_MAPPING_ARRAY; @@ -93,6 +95,8 @@ public String getFactoryIdentifier() { return ICEBERG_REST_FACTORY_IDENTIFIER; case HADOOP: return HADOOP_FACTORY_IDENTIFIER; + case HIVE: + return HIVE_FACTORY_IDENTIFIER; default: throw new UnsupportedOperationException( "No factory identifier for connection type: " + this); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java b/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java new file mode 100644 index 0000000000..aaf5753776 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.connection.hive; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.MoreObjects; +import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.polaris.core.admin.model.ConnectionConfigInfo; +import org.apache.polaris.core.admin.model.HiveConnectionConfigInfo; +import org.apache.polaris.core.connection.AuthenticationParametersDpo; +import org.apache.polaris.core.connection.ConnectionConfigInfoDpo; +import org.apache.polaris.core.connection.ConnectionType; +import org.apache.polaris.core.secrets.UserSecretsManager; + +/** + * The internal persistence-object counterpart to {@link + * org.apache.polaris.core.admin.model.HiveConnectionConfigInfo} defined in the API model. + */ +public class HiveConnectionConfigInfoDpo extends ConnectionConfigInfoDpo { + + private final String warehouse; + + public HiveConnectionConfigInfoDpo( + @JsonProperty(value = "uri", required = true) @Nonnull String uri, + @JsonProperty(value = "authenticationParameters", required = false) @Nullable + AuthenticationParametersDpo authenticationParameters, + @JsonProperty(value = "warehouse", required = false) @Nullable String warehouse) { + super(ConnectionType.HIVE.getCode(), uri, authenticationParameters); + this.warehouse = warehouse; + } + + public String getWarehouse() { + return warehouse; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("connectionTypeCode", getConnectionTypeCode()) + .add("uri", getUri()) + .add("warehouse", getWarehouse()) + .add("authenticationParameters", getAuthenticationParameters().toString()) + .toString(); + } + + @Override + public @Nonnull Map asIcebergCatalogProperties( + UserSecretsManager secretsManager) { + HashMap properties = new HashMap<>(); + properties.put(CatalogProperties.URI, getUri()); + if (getWarehouse() != null) { + properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse()); + } + if (getAuthenticationParameters() != null) { + properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager)); + } + return properties; + } + + @Override + public ConnectionConfigInfo asConnectionConfigInfoModel() { + return HiveConnectionConfigInfo.builder() + .setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.HIVE) + .setUri(getUri()) + .setWarehouse(getWarehouse()) + .setAuthenticationParameters( + getAuthenticationParameters().asAuthenticationParametersModel()) + .build(); + } +} diff --git a/runtime/server/build.gradle.kts b/runtime/server/build.gradle.kts index c645e0bc0a..e8a86197b2 100644 --- a/runtime/server/build.gradle.kts +++ b/runtime/server/build.gradle.kts @@ -50,6 +50,10 @@ dependencies { runtimeOnly("io.quarkus:quarkus-jdbc-postgresql") runtimeOnly(project(":polaris-extensions-federation-hadoop")) + if ((project.findProperty("NonRESTCatalogs") as String?)?.contains("HIVE") == true) { + runtimeOnly(project(":polaris-extensions-federation-hive")) + } + // enforce the Quarkus _platform_ here, to get a consistent and validated set of dependencies implementation(enforcedPlatform(libs.quarkus.bom)) implementation("io.quarkus:quarkus-container-image-docker") diff --git a/spec/polaris-management-service.yml b/spec/polaris-management-service.yml index 00c2a88093..9ec981bc40 100644 --- a/spec/polaris-management-service.yml +++ b/spec/polaris-management-service.yml @@ -869,6 +869,7 @@ components: enum: - ICEBERG_REST - HADOOP + - HIVE description: The type of remote catalog service represented by this connection uri: type: string @@ -884,6 +885,7 @@ components: mapping: ICEBERG_REST: "#/components/schemas/IcebergRestConnectionConfigInfo" HADOOP: "#/components/schemas/HadoopConnectionConfigInfo" + HIVE: "#/components/schemas/HiveConnectionConfigInfo" IcebergRestConnectionConfigInfo: type: object @@ -906,6 +908,16 @@ components: warehouse: type: string description: The file path to where this catalog should store tables + + HiveConnectionConfigInfo: + type: object + description: Configuration necessary for connecting to a Hive Catalog + allOf: + - $ref: '#/components/schemas/ConnectionConfigInfo' + properties: + warehouse: + type: string + description: The warehouse location for the hive catalog. AuthenticationParameters: type: object