Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.AuthenticationType;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
Expand Down Expand Up @@ -58,4 +59,12 @@ public Catalog createCatalog(
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
return hadoopCatalog;
}

@Override
public GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) {
// TODO implement
throw new UnsupportedOperationException(
"Generic table federation to this catalog is not supported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.AuthenticationType;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
Expand Down Expand Up @@ -71,4 +72,12 @@ public Catalog createCatalog(
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
return hiveCatalog;
}

@Override
public GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) {
// TODO implement
throw new UnsupportedOperationException(
"Generic table federation to this catalog is not supported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,15 @@ public interface ExternalCatalogFactory {
*/
Catalog createCatalog(
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager);

/**
* Creates a generic table catalog for the given connection configuration.
*
* @param connectionConfig the connection configuration
* @param userSecretsManager the user secrets manager for handling credentials
* @return the initialized catalog
* @throws IllegalStateException if the connection configuration is invalid
*/
GenericTableCatalog createGenericCatalog(
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog.generic;
package org.apache.polaris.core.catalog;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public abstract class ConnectionConfigInfoDpo implements IcebergCatalogPropertie
public ConnectionConfigInfoDpo(
@JsonProperty(value = "connectionTypeCode", required = true) int connectionTypeCode,
@JsonProperty(value = "uri", required = true) @Nonnull String uri,
@JsonProperty(value = "authenticationParameters", required = true) @Nonnull
@JsonProperty(value = "authenticationParameters", required = true) @Nullable
AuthenticationParametersDpo authenticationParameters,
@JsonProperty(value = "serviceIdentity", required = false) @Nullable
ServiceIdentityInfoDpo serviceIdentity) {
Expand All @@ -86,7 +86,7 @@ public ConnectionConfigInfoDpo(
protected ConnectionConfigInfoDpo(
int connectionTypeCode,
@Nonnull String uri,
@Nonnull AuthenticationParametersDpo authenticationParameters,
@Nullable AuthenticationParametersDpo authenticationParameters,
@Nullable ServiceIdentityInfoDpo serviceIdentity,
boolean validateUri) {
this.connectionTypeCode = connectionTypeCode;
Expand Down Expand Up @@ -203,7 +203,10 @@ public static ConnectionConfigInfoDpo fromConnectionConfigInfoModelWithSecrets(
hiveConfigModel.getAuthenticationParameters(), secretReferences);
config =
new HiveConnectionConfigInfoDpo(
hiveConfigModel.getUri(), authenticationParameters, hiveConfigModel.getWarehouse());
hiveConfigModel.getUri(),
authenticationParameters,
hiveConfigModel.getWarehouse(),
null /*Service Identity Info*/);
break;
default:
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
import org.apache.polaris.core.secrets.UserSecretsManager;

/**
Expand All @@ -44,8 +45,10 @@ public HiveConnectionConfigInfoDpo(
@JsonProperty(value = "uri", required = true) @Nonnull String uri,
@JsonProperty(value = "authenticationParameters", required = false) @Nullable
AuthenticationParametersDpo authenticationParameters,
@JsonProperty(value = "warehouse", required = false) @Nullable String warehouse) {
super(ConnectionType.HIVE.getCode(), uri, authenticationParameters);
@JsonProperty(value = "warehouse", required = false) @Nullable String warehouse,
@JsonProperty(value = "serviceIdentity", required = false) @Nullable
ServiceIdentityInfoDpo serviceIdentity) {
super(ConnectionType.HIVE.getCode(), uri, authenticationParameters, serviceIdentity);
this.warehouse = warehouse;
}

Expand Down Expand Up @@ -77,6 +80,13 @@ public String toString() {
return properties;
}

@Override
public ConnectionConfigInfoDpo withServiceIdentity(
@Nonnull ServiceIdentityInfoDpo serviceIdentityInfo) {
return new HiveConnectionConfigInfoDpo(
getUri(), getAuthenticationParameters(), warehouse, serviceIdentityInfo);
}

@Override
public ConnectionConfigInfo asConnectionConfigInfoModel() {
return HiveConnectionConfigInfo.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.polaris.core.entity.PolarisEntitySubType.ICEBERG_TABLE;

import jakarta.enterprise.inject.Instance;
import jakarta.ws.rs.core.SecurityContext;
import java.util.Arrays;
import java.util.List;
Expand All @@ -34,6 +35,7 @@
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
import org.apache.polaris.core.auth.PolarisAuthorizer;
import org.apache.polaris.core.auth.PolarisPrincipal;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.PolarisEntitySubType;
Expand All @@ -43,6 +45,7 @@
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.service.types.PolicyIdentifier;

/**
Expand All @@ -58,6 +61,8 @@ public abstract class CatalogHandler {
protected final ResolutionManifestFactory resolutionManifestFactory;
protected final String catalogName;
protected final PolarisAuthorizer authorizer;
protected final UserSecretsManager userSecretsManager;
protected final Instance<ExternalCatalogFactory> externalCatalogFactories;

protected final CallContext callContext;
protected final PolarisPrincipal polarisPrincipal;
Expand All @@ -68,7 +73,9 @@ public CatalogHandler(
ResolutionManifestFactory resolutionManifestFactory,
SecurityContext securityContext,
String catalogName,
PolarisAuthorizer authorizer) {
PolarisAuthorizer authorizer,
UserSecretsManager userSecretsManager,
Instance<ExternalCatalogFactory> externalCatalogFactories) {
this.callContext = callContext;
this.resolutionManifestFactory = resolutionManifestFactory;
this.catalogName = catalogName;
Expand All @@ -82,6 +89,12 @@ public CatalogHandler(
this.securityContext = securityContext;
this.polarisPrincipal = (PolarisPrincipal) securityContext.getUserPrincipal();
this.authorizer = authorizer;
this.userSecretsManager = userSecretsManager;
this.externalCatalogFactories = externalCatalogFactories;
}

protected UserSecretsManager getUserSecretsManager() {
return userSecretsManager;
}

/** Initialize the catalog once authorized. Called after all `authorize...` methods. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package org.apache.polaris.service.catalog.generic;

import jakarta.enterprise.context.RequestScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.auth.PolarisAuthorizer;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.service.catalog.CatalogPrefixParser;
import org.apache.polaris.service.catalog.api.PolarisCatalogGenericTableApiService;
import org.apache.polaris.service.catalog.common.CatalogAdapter;
Expand All @@ -52,6 +56,8 @@ public class GenericTableCatalogAdapter
private final PolarisAuthorizer polarisAuthorizer;
private final ReservedProperties reservedProperties;
private final CatalogPrefixParser prefixParser;
private final UserSecretsManager userSecretsManager;
private final Instance<ExternalCatalogFactory> externalCatalogFactories;

@Inject
public GenericTableCatalogAdapter(
Expand All @@ -61,14 +67,18 @@ public GenericTableCatalogAdapter(
PolarisMetaStoreManager metaStoreManager,
PolarisAuthorizer polarisAuthorizer,
CatalogPrefixParser prefixParser,
ReservedProperties reservedProperties) {
ReservedProperties reservedProperties,
UserSecretsManager userSecretsManager,
@Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
this.realmContext = realmContext;
this.callContext = callContext;
this.resolutionManifestFactory = resolutionManifestFactory;
this.metaStoreManager = metaStoreManager;
this.polarisAuthorizer = polarisAuthorizer;
this.prefixParser = prefixParser;
this.reservedProperties = reservedProperties;
this.userSecretsManager = userSecretsManager;
this.externalCatalogFactories = externalCatalogFactories;
}

private GenericTableCatalogHandler newHandlerWrapper(
Expand All @@ -83,7 +93,9 @@ private GenericTableCatalogHandler newHandlerWrapper(
metaStoreManager,
securityContext,
prefixParser.prefixToCatalogName(realmContext, prefix),
polarisAuthorizer);
polarisAuthorizer,
userSecretsManager,
externalCatalogFactories);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,36 @@
*/
package org.apache.polaris.service.catalog.generic;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.inject.Instance;
import jakarta.ws.rs.core.SecurityContext;
import java.util.LinkedHashSet;
import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
import org.apache.polaris.core.auth.PolarisAuthorizer;
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
import org.apache.polaris.core.connection.ConnectionType;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.table.GenericTableEntity;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.service.catalog.common.CatalogHandler;
import org.apache.polaris.service.types.GenericTable;
import org.apache.polaris.service.types.ListGenericTablesResponse;
import org.apache.polaris.service.types.LoadGenericTableResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenericTableCatalogHandler extends CatalogHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericTableCatalogHandler.class);

private PolarisMetaStoreManager metaStoreManager;

Expand All @@ -47,16 +59,58 @@ public GenericTableCatalogHandler(
PolarisMetaStoreManager metaStoreManager,
SecurityContext securityContext,
String catalogName,
PolarisAuthorizer authorizer) {
super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer);
PolarisAuthorizer authorizer,
UserSecretsManager userSecretsManager,
Instance<ExternalCatalogFactory> externalCatalogFactories) {
super(
callContext,
resolutionManifestFactory,
securityContext,
catalogName,
authorizer,
userSecretsManager,
externalCatalogFactories);
this.metaStoreManager = metaStoreManager;
}

@Override
protected void initializeCatalog() {
this.genericTableCatalog =
new PolarisGenericTableCatalog(metaStoreManager, callContext, this.resolutionManifest);
this.genericTableCatalog.initialize(catalogName, Map.of());
CatalogEntity resolvedCatalogEntity =
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
ConnectionConfigInfoDpo connectionConfigInfoDpo =
resolvedCatalogEntity.getConnectionConfigInfoDpo();
if (connectionConfigInfoDpo != null) {
LOGGER
.atInfo()
.addKeyValue("remoteUrl", connectionConfigInfoDpo.getUri())
.log("Initializing federated catalog");
FeatureConfiguration.enforceFeatureEnabledOrThrow(
callContext.getRealmConfig(), FeatureConfiguration.ENABLE_CATALOG_FEDERATION);

GenericTableCatalog federatedCatalog;
ConnectionType connectionType =
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());

// Use the unified factory pattern for all external catalog types
Instance<ExternalCatalogFactory> externalCatalogFactory =
externalCatalogFactories.select(
Identifier.Literal.of(connectionType.getFactoryIdentifier()));
if (externalCatalogFactory.isResolvable()) {
federatedCatalog =
externalCatalogFactory
.get()
.createGenericCatalog(connectionConfigInfoDpo, getUserSecretsManager());
} else {
throw new UnsupportedOperationException(
"External catalog factory for type '" + connectionType + "' is unavailable.");
}
this.genericTableCatalog = federatedCatalog;
} else {
LOGGER.atInfo().log("Initializing non-federated catalog");
this.genericTableCatalog =
new PolarisGenericTableCatalog(metaStoreManager, callContext, this.resolutionManifest);
this.genericTableCatalog.initialize(catalogName, Map.of());
}
}

public ListGenericTablesResponse listGenericTables(Namespace parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.polaris.core.catalog.GenericTableCatalog;
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.CatalogEntity;
Expand Down
Loading