Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,7 +43,9 @@ public class HadoopFederatedCatalogFactory implements ExternalCatalogFactory {

@Override
public Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) {
ConnectionConfigInfoDpo connectionConfigInfoDpo,
UserSecretsManager userSecretsManager,
PolarisCredentialManager polarisCredentialManager) {
// Currently, Polaris supports Hadoop federation only via IMPLICIT authentication.
// Hence, prior to initializing the configuration, ensure that the catalog uses
// IMPLICIT authentication.
Expand All @@ -56,7 +59,9 @@ public Catalog createCatalog(
String warehouse = ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse();
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, warehouse);
hadoopCatalog.initialize(
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
warehouse,
connectionConfigInfoDpo.asIcebergCatalogProperties(
userSecretsManager, polarisCredentialManager));
return hadoopCatalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.connection.hive.HiveConnectionConfigInfoDpo;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +42,9 @@ public class HiveFederatedCatalogFactory implements ExternalCatalogFactory {

@Override
public Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfigInfoDpo, UserSecretsManager userSecretsManager) {
ConnectionConfigInfoDpo connectionConfigInfoDpo,
UserSecretsManager userSecretsManager,
PolarisCredentialManager polarisCredentialManager) {
// Currently, Polaris supports Hive federation only via IMPLICIT authentication.
// Hence, prior to initializing the configuration, ensure that the catalog uses
// IMPLICIT authentication.
Expand Down Expand Up @@ -69,7 +72,9 @@ public Catalog createCatalog(
// Kerberos instances are not suitable because Kerberos ties a single identity to the server.
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.initialize(
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
warehouse,
connectionConfigInfoDpo.asIcebergCatalogProperties(
userSecretsManager, polarisCredentialManager));
return hiveCatalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iceberg.catalog.Catalog;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand All @@ -35,11 +36,14 @@ public interface ExternalCatalogFactory {
*
* @param connectionConfig the connection configuration
* @param userSecretsManager the user secrets manager for handling credentials
* @param polarisCredentialManager the Polaris credential manager for handling credentials
* @return the initialized catalog
* @throws IllegalStateException if the connection configuration is invalid
*/
Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager);
ConnectionConfigInfoDpo connectionConfig,
UserSecretsManager userSecretsManager,
PolarisCredentialManager polarisCredentialManager);

/**
* Creates a generic table catalog for the given connection configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ public static void enforceFeatureEnabledOrThrow(
.defaultValue(
List.of(
AuthenticationParameters.AuthenticationTypeEnum.OAUTH.name(),
AuthenticationParameters.AuthenticationTypeEnum.BEARER.name()))
AuthenticationParameters.AuthenticationTypeEnum.BEARER.name(),
AuthenticationParameters.AuthenticationTypeEnum.SIGV4.name()))
.buildFeatureConfiguration();

public static final FeatureConfiguration<Integer> ICEBERG_COMMIT_MAX_RETRIES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.SecretReference;
import org.apache.polaris.core.secrets.UserSecretsManager;

Expand All @@ -50,7 +51,7 @@ public BearerAuthenticationParametersDpo(

@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager) {
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager) {
String bearerToken = secretsManager.readSecret(getBearerTokenReference());
return Map.of(OAuth2Properties.TOKEN, bearerToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.polaris.core.connection.iceberg.IcebergCatalogPropertiesProvider;
import org.apache.polaris.core.connection.iceberg.IcebergRestConnectionConfigInfoDpo;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.registry.ServiceIdentityRegistry;
import org.apache.polaris.core.secrets.SecretReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -229,5 +230,6 @@ public abstract ConnectionConfigInfoDpo withServiceIdentity(
* fields are one-to-one direct mappings, but some fields, such as secretReferences, might only be
* applicable/present in the persistence object, but not the API model object.
*/
public abstract ConnectionConfigInfo asConnectionConfigInfoModel();
public abstract ConnectionConfigInfo asConnectionConfigInfoModel(
ServiceIdentityRegistry serviceIdentityRegistry);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.polaris.core.connection;

import com.google.common.base.MoreObjects;
import jakarta.annotation.Nonnull;
import java.util.Map;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.ImplicitAuthenticationParameters;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand All @@ -35,12 +37,13 @@ public ImplicitAuthenticationParametersDpo() {
}

@Override
public Map<String, String> asIcebergCatalogProperties(UserSecretsManager secretsManager) {
public @Nonnull Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager) {
return Map.of();
}

@Override
public AuthenticationParameters asAuthenticationParametersModel() {
public @Nonnull AuthenticationParameters asAuthenticationParametersModel() {
return ImplicitAuthenticationParameters.builder()
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.IMPLICIT)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.SecretReference;
import org.apache.polaris.core.secrets.UserSecretsManager;

Expand Down Expand Up @@ -104,7 +105,7 @@ public OAuthClientCredentialsParametersDpo(

@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager) {
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager) {
HashMap<String, String> properties = new HashMap<>();
if (getTokenUri() != null) {
properties.put(OAuth2Properties.OAUTH2_SERVER_URI, getTokenUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import com.google.common.collect.ImmutableMap;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.EnumMap;
import java.util.Map;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.rest.auth.AuthProperties;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.SigV4AuthenticationParameters;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.credentials.connection.ConnectionCredentialProperty;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand Down Expand Up @@ -93,15 +96,19 @@ public SigV4AuthenticationParametersDpo(

@Nonnull
@Override
public Map<String, String> asIcebergCatalogProperties(UserSecretsManager secretsManager) {
public Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.put(AuthProperties.AUTH_TYPE, AuthProperties.AUTH_TYPE_SIGV4);
builder.put(AwsProperties.REST_SIGNER_REGION, getSigningRegion());
if (getSigningName() != null) {
builder.put(AwsProperties.REST_SIGNING_NAME, getSigningName());
}

// TODO: Add a credential manager to assume the role and get the aws session credentials
EnumMap<ConnectionCredentialProperty, String> connectionCredentialProperties =
credentialManager.getConnectionCredentials(null, this);
connectionCredentialProperties.forEach(
(key, value) -> builder.put(key.getPropertyName(), value));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.registry.ServiceIdentityRegistry;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand Down Expand Up @@ -70,13 +72,15 @@ public String toString() {

@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager) {
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
if (getWarehouse() != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
}
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
properties.putAll(
getAuthenticationParameters()
.asIcebergCatalogProperties(secretsManager, credentialManager));
return properties;
}

Expand All @@ -88,7 +92,8 @@ public ConnectionConfigInfoDpo withServiceIdentity(
}

@Override
public ConnectionConfigInfo asConnectionConfigInfoModel() {
public ConnectionConfigInfo asConnectionConfigInfoModel(
ServiceIdentityRegistry serviceIdentityRegistry) {
return HadoopConnectionConfigInfo.builder()
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.HADOOP)
.setUri(getUri())
Expand All @@ -97,7 +102,9 @@ public ConnectionConfigInfo asConnectionConfigInfoModel() {
getAuthenticationParameters().asAuthenticationParametersModel())
.setServiceIdentity(
Optional.ofNullable(getServiceIdentity())
.map(ServiceIdentityInfoDpo::asServiceIdentityInfoModel)
.map(
serviceIdentityInfoDpo ->
serviceIdentityInfoDpo.asServiceIdentityInfoModel(serviceIdentityRegistry))
.orElse(null))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import jakarta.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.CatalogProperties;
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
import org.apache.polaris.core.admin.model.HiveConnectionConfigInfo;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.registry.ServiceIdentityRegistry;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand Down Expand Up @@ -68,14 +71,16 @@ public String toString() {

@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager) {
UserSecretsManager secretsManager, PolarisCredentialManager polarisCredentialManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
if (getWarehouse() != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouse());
}
if (getAuthenticationParameters() != null) {
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
properties.putAll(
getAuthenticationParameters()
.asIcebergCatalogProperties(secretsManager, polarisCredentialManager));
}
return properties;
}
Expand All @@ -88,13 +93,20 @@ public ConnectionConfigInfoDpo withServiceIdentity(
}

@Override
public ConnectionConfigInfo asConnectionConfigInfoModel() {
public ConnectionConfigInfo asConnectionConfigInfoModel(
ServiceIdentityRegistry serviceIdentityRegistry) {
return HiveConnectionConfigInfo.builder()
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.HIVE)
.setUri(getUri())
.setWarehouse(getWarehouse())
.setAuthenticationParameters(
getAuthenticationParameters().asAuthenticationParametersModel())
.setServiceIdentity(
Optional.ofNullable(getServiceIdentity())
.map(
serviceIdentityInfoDpo ->
serviceIdentityInfoDpo.asServiceIdentityInfoModel(serviceIdentityRegistry))
.orElse(null))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import jakarta.annotation.Nonnull;
import java.util.Map;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand All @@ -30,5 +31,6 @@
*/
public interface IcebergCatalogPropertiesProvider {
@Nonnull
Map<String, String> asIcebergCatalogProperties(UserSecretsManager secretsManager);
Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.credentials.PolarisCredentialManager;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.identity.registry.ServiceIdentityRegistry;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand Down Expand Up @@ -62,13 +64,19 @@ public String getRemoteCatalogName() {

@Override
public @Nonnull Map<String, String> asIcebergCatalogProperties(
UserSecretsManager secretsManager) {
UserSecretsManager secretsManager, PolarisCredentialManager credentialManager) {
HashMap<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.URI, getUri());
if (getRemoteCatalogName() != null) {
properties.put(CatalogProperties.WAREHOUSE_LOCATION, getRemoteCatalogName());
}
properties.putAll(getAuthenticationParameters().asIcebergCatalogProperties(secretsManager));
properties.putAll(
getAuthenticationParameters()
.asIcebergCatalogProperties(
secretsManager,
(serviceIdentity, authenticationParameters) ->
credentialManager.getConnectionCredentials(
getServiceIdentity(), authenticationParameters)));
return properties;
}

Expand All @@ -80,7 +88,8 @@ public ConnectionConfigInfoDpo withServiceIdentity(
}

@Override
public ConnectionConfigInfo asConnectionConfigInfoModel() {
public ConnectionConfigInfo asConnectionConfigInfoModel(
ServiceIdentityRegistry serviceIdentityRegistry) {
return IcebergRestConnectionConfigInfo.builder()
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
.setUri(getUri())
Expand All @@ -89,7 +98,9 @@ public ConnectionConfigInfo asConnectionConfigInfoModel() {
getAuthenticationParameters().asAuthenticationParametersModel())
.setServiceIdentity(
Optional.ofNullable(getServiceIdentity())
.map(ServiceIdentityInfoDpo::asServiceIdentityInfoModel)
.map(
serviceIdentityInfoDpo ->
serviceIdentityInfoDpo.asServiceIdentityInfoModel(serviceIdentityRegistry))
.orElse(null))
.build();
}
Expand Down
Loading