Skip to content

Commit 83d09cc

Browse files
authored
Modularize generic table federation (#2379)
In #2369 Iceberg table federation was refactored around the new `IcebergRESTExternalCatalogFactory` type based on discussion in the community sync. This has unblocked the ability to federate to more non-Iceberg catalogs, such as in #2355. This PR refactors generic table federation to go through the same mechanism. After this, we can go through and implement generic table federation for the existing `IcebergRESTExternalCatalogFactory` implementations.
1 parent f647034 commit 83d09cc

File tree

16 files changed

+187
-30
lines changed

16 files changed

+187
-30
lines changed

extensions/federation/hadoop/src/main/java/org/apache/polaris/extensions/federation/hadoop/HadoopFederatedCatalogFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iceberg.catalog.Catalog;
2525
import org.apache.iceberg.hadoop.HadoopCatalog;
2626
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
27+
import org.apache.polaris.core.catalog.GenericTableCatalog;
2728
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
2829
import org.apache.polaris.core.connection.AuthenticationType;
2930
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
@@ -58,4 +59,12 @@ public Catalog createCatalog(
5859
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
5960
return hadoopCatalog;
6061
}
62+
63+
@Override
64+
public GenericTableCatalog createGenericCatalog(
65+
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) {
66+
// TODO implement
67+
throw new UnsupportedOperationException(
68+
"Generic table federation to this catalog is not supported.");
69+
}
6170
}

extensions/federation/hive/src/main/java/org/apache/polaris/extensions/federation/hive/HiveFederatedCatalogFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iceberg.catalog.Catalog;
2424
import org.apache.iceberg.hive.HiveCatalog;
2525
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
26+
import org.apache.polaris.core.catalog.GenericTableCatalog;
2627
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
2728
import org.apache.polaris.core.connection.AuthenticationType;
2829
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
@@ -71,4 +72,12 @@ public Catalog createCatalog(
7172
warehouse, connectionConfigInfoDpo.asIcebergCatalogProperties(userSecretsManager));
7273
return hiveCatalog;
7374
}
75+
76+
@Override
77+
public GenericTableCatalog createGenericCatalog(
78+
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager) {
79+
// TODO implement
80+
throw new UnsupportedOperationException(
81+
"Generic table federation to this catalog is not supported.");
82+
}
7483
}

polaris-core/src/main/java/org/apache/polaris/core/catalog/ExternalCatalogFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,15 @@ public interface ExternalCatalogFactory {
4040
*/
4141
Catalog createCatalog(
4242
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager);
43+
44+
/**
45+
* Creates a generic table catalog for the given connection configuration.
46+
*
47+
* @param connectionConfig the connection configuration
48+
* @param userSecretsManager the user secrets manager for handling credentials
49+
* @return the initialized catalog
50+
* @throws IllegalStateException if the connection configuration is invalid
51+
*/
52+
GenericTableCatalog createGenericCatalog(
53+
ConnectionConfigInfoDpo connectionConfig, UserSecretsManager userSecretsManager);
4354
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.polaris.service.catalog.generic;
19+
package org.apache.polaris.core.catalog;
2020

2121
import java.util.List;
2222
import java.util.Map;

polaris-core/src/main/java/org/apache/polaris/core/connection/ConnectionConfigInfoDpo.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public abstract class ConnectionConfigInfoDpo implements IcebergCatalogPropertie
7676
public ConnectionConfigInfoDpo(
7777
@JsonProperty(value = "connectionTypeCode", required = true) int connectionTypeCode,
7878
@JsonProperty(value = "uri", required = true) @Nonnull String uri,
79-
@JsonProperty(value = "authenticationParameters", required = true) @Nonnull
79+
@JsonProperty(value = "authenticationParameters", required = true) @Nullable
8080
AuthenticationParametersDpo authenticationParameters,
8181
@JsonProperty(value = "serviceIdentity", required = false) @Nullable
8282
ServiceIdentityInfoDpo serviceIdentity) {
@@ -86,7 +86,7 @@ public ConnectionConfigInfoDpo(
8686
protected ConnectionConfigInfoDpo(
8787
int connectionTypeCode,
8888
@Nonnull String uri,
89-
@Nonnull AuthenticationParametersDpo authenticationParameters,
89+
@Nullable AuthenticationParametersDpo authenticationParameters,
9090
@Nullable ServiceIdentityInfoDpo serviceIdentity,
9191
boolean validateUri) {
9292
this.connectionTypeCode = connectionTypeCode;
@@ -203,7 +203,10 @@ public static ConnectionConfigInfoDpo fromConnectionConfigInfoModelWithSecrets(
203203
hiveConfigModel.getAuthenticationParameters(), secretReferences);
204204
config =
205205
new HiveConnectionConfigInfoDpo(
206-
hiveConfigModel.getUri(), authenticationParameters, hiveConfigModel.getWarehouse());
206+
hiveConfigModel.getUri(),
207+
authenticationParameters,
208+
hiveConfigModel.getWarehouse(),
209+
null /*Service Identity Info*/);
207210
break;
208211
default:
209212
throw new IllegalStateException(

polaris-core/src/main/java/org/apache/polaris/core/connection/hive/HiveConnectionConfigInfoDpo.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.polaris.core.connection.AuthenticationParametersDpo;
3131
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
3232
import org.apache.polaris.core.connection.ConnectionType;
33+
import org.apache.polaris.core.identity.dpo.ServiceIdentityInfoDpo;
3334
import org.apache.polaris.core.secrets.UserSecretsManager;
3435

3536
/**
@@ -44,8 +45,10 @@ public HiveConnectionConfigInfoDpo(
4445
@JsonProperty(value = "uri", required = true) @Nonnull String uri,
4546
@JsonProperty(value = "authenticationParameters", required = false) @Nullable
4647
AuthenticationParametersDpo authenticationParameters,
47-
@JsonProperty(value = "warehouse", required = false) @Nullable String warehouse) {
48-
super(ConnectionType.HIVE.getCode(), uri, authenticationParameters);
48+
@JsonProperty(value = "warehouse", required = false) @Nullable String warehouse,
49+
@JsonProperty(value = "serviceIdentity", required = false) @Nullable
50+
ServiceIdentityInfoDpo serviceIdentity) {
51+
super(ConnectionType.HIVE.getCode(), uri, authenticationParameters, serviceIdentity);
4952
this.warehouse = warehouse;
5053
}
5154

@@ -77,6 +80,13 @@ public String toString() {
7780
return properties;
7881
}
7982

83+
@Override
84+
public ConnectionConfigInfoDpo withServiceIdentity(
85+
@Nonnull ServiceIdentityInfoDpo serviceIdentityInfo) {
86+
return new HiveConnectionConfigInfoDpo(
87+
getUri(), getAuthenticationParameters(), warehouse, serviceIdentityInfo);
88+
}
89+
8090
@Override
8191
public ConnectionConfigInfo asConnectionConfigInfoModel() {
8292
return HiveConnectionConfigInfo.builder()

runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import static org.apache.polaris.core.entity.PolarisEntitySubType.ICEBERG_TABLE;
2222

23+
import jakarta.enterprise.inject.Instance;
2324
import jakarta.ws.rs.core.SecurityContext;
2425
import java.util.Arrays;
2526
import java.util.List;
@@ -34,6 +35,7 @@
3435
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
3536
import org.apache.polaris.core.auth.PolarisAuthorizer;
3637
import org.apache.polaris.core.auth.PolarisPrincipal;
38+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
3739
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
3840
import org.apache.polaris.core.context.CallContext;
3941
import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -43,6 +45,7 @@
4345
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
4446
import org.apache.polaris.core.persistence.resolver.ResolverPath;
4547
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
48+
import org.apache.polaris.core.secrets.UserSecretsManager;
4649
import org.apache.polaris.service.types.PolicyIdentifier;
4750

4851
/**
@@ -58,6 +61,8 @@ public abstract class CatalogHandler {
5861
protected final ResolutionManifestFactory resolutionManifestFactory;
5962
protected final String catalogName;
6063
protected final PolarisAuthorizer authorizer;
64+
protected final UserSecretsManager userSecretsManager;
65+
protected final Instance<ExternalCatalogFactory> externalCatalogFactories;
6166

6267
protected final PolarisDiagnostics diagnostics;
6368
protected final CallContext callContext;
@@ -69,7 +74,9 @@ public CatalogHandler(
6974
ResolutionManifestFactory resolutionManifestFactory,
7075
SecurityContext securityContext,
7176
String catalogName,
72-
PolarisAuthorizer authorizer) {
77+
PolarisAuthorizer authorizer,
78+
UserSecretsManager userSecretsManager,
79+
Instance<ExternalCatalogFactory> externalCatalogFactories) {
7380
this.callContext = callContext;
7481
this.diagnostics = callContext.getPolarisCallContext().getDiagServices();
7582
this.resolutionManifestFactory = resolutionManifestFactory;
@@ -83,6 +90,12 @@ public CatalogHandler(
8390
this.securityContext = securityContext;
8491
this.polarisPrincipal = (PolarisPrincipal) securityContext.getUserPrincipal();
8592
this.authorizer = authorizer;
93+
this.userSecretsManager = userSecretsManager;
94+
this.externalCatalogFactories = externalCatalogFactories;
95+
}
96+
97+
protected UserSecretsManager getUserSecretsManager() {
98+
return userSecretsManager;
8699
}
87100

88101
/** Initialize the catalog once authorized. Called after all `authorize...` methods. */

runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogAdapter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919
package org.apache.polaris.service.catalog.generic;
2020

2121
import jakarta.enterprise.context.RequestScoped;
22+
import jakarta.enterprise.inject.Any;
23+
import jakarta.enterprise.inject.Instance;
2224
import jakarta.inject.Inject;
2325
import jakarta.ws.rs.core.Response;
2426
import jakarta.ws.rs.core.SecurityContext;
2527
import org.apache.iceberg.catalog.TableIdentifier;
2628
import org.apache.polaris.core.auth.PolarisAuthorizer;
29+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
2730
import org.apache.polaris.core.config.FeatureConfiguration;
2831
import org.apache.polaris.core.context.CallContext;
2932
import org.apache.polaris.core.context.RealmContext;
3033
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
3134
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
35+
import org.apache.polaris.core.secrets.UserSecretsManager;
3236
import org.apache.polaris.service.catalog.CatalogPrefixParser;
3337
import org.apache.polaris.service.catalog.api.PolarisCatalogGenericTableApiService;
3438
import org.apache.polaris.service.catalog.common.CatalogAdapter;
@@ -52,6 +56,8 @@ public class GenericTableCatalogAdapter
5256
private final PolarisAuthorizer polarisAuthorizer;
5357
private final ReservedProperties reservedProperties;
5458
private final CatalogPrefixParser prefixParser;
59+
private final UserSecretsManager userSecretsManager;
60+
private final Instance<ExternalCatalogFactory> externalCatalogFactories;
5561

5662
@Inject
5763
public GenericTableCatalogAdapter(
@@ -61,14 +67,18 @@ public GenericTableCatalogAdapter(
6167
PolarisMetaStoreManager metaStoreManager,
6268
PolarisAuthorizer polarisAuthorizer,
6369
CatalogPrefixParser prefixParser,
64-
ReservedProperties reservedProperties) {
70+
ReservedProperties reservedProperties,
71+
UserSecretsManager userSecretsManager,
72+
@Any Instance<ExternalCatalogFactory> externalCatalogFactories) {
6573
this.realmContext = realmContext;
6674
this.callContext = callContext;
6775
this.resolutionManifestFactory = resolutionManifestFactory;
6876
this.metaStoreManager = metaStoreManager;
6977
this.polarisAuthorizer = polarisAuthorizer;
7078
this.prefixParser = prefixParser;
7179
this.reservedProperties = reservedProperties;
80+
this.userSecretsManager = userSecretsManager;
81+
this.externalCatalogFactories = externalCatalogFactories;
7282
}
7383

7484
private GenericTableCatalogHandler newHandlerWrapper(
@@ -83,7 +93,9 @@ private GenericTableCatalogHandler newHandlerWrapper(
8393
metaStoreManager,
8494
securityContext,
8595
prefixParser.prefixToCatalogName(realmContext, prefix),
86-
polarisAuthorizer);
96+
polarisAuthorizer,
97+
userSecretsManager,
98+
externalCatalogFactories);
8799
}
88100

89101
@Override

runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,36 @@
1818
*/
1919
package org.apache.polaris.service.catalog.generic;
2020

21+
import io.smallrye.common.annotation.Identifier;
22+
import jakarta.enterprise.inject.Instance;
2123
import jakarta.ws.rs.core.SecurityContext;
2224
import java.util.LinkedHashSet;
2325
import java.util.Map;
2426
import org.apache.iceberg.catalog.Namespace;
2527
import org.apache.iceberg.catalog.TableIdentifier;
2628
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
2729
import org.apache.polaris.core.auth.PolarisAuthorizer;
30+
import org.apache.polaris.core.catalog.ExternalCatalogFactory;
31+
import org.apache.polaris.core.catalog.GenericTableCatalog;
32+
import org.apache.polaris.core.config.FeatureConfiguration;
33+
import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
34+
import org.apache.polaris.core.connection.ConnectionType;
2835
import org.apache.polaris.core.context.CallContext;
36+
import org.apache.polaris.core.entity.CatalogEntity;
2937
import org.apache.polaris.core.entity.PolarisEntitySubType;
3038
import org.apache.polaris.core.entity.table.GenericTableEntity;
3139
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
3240
import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
41+
import org.apache.polaris.core.secrets.UserSecretsManager;
3342
import org.apache.polaris.service.catalog.common.CatalogHandler;
3443
import org.apache.polaris.service.types.GenericTable;
3544
import org.apache.polaris.service.types.ListGenericTablesResponse;
3645
import org.apache.polaris.service.types.LoadGenericTableResponse;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
3748

3849
public class GenericTableCatalogHandler extends CatalogHandler {
50+
private static final Logger LOGGER = LoggerFactory.getLogger(GenericTableCatalogHandler.class);
3951

4052
private PolarisMetaStoreManager metaStoreManager;
4153

@@ -47,16 +59,58 @@ public GenericTableCatalogHandler(
4759
PolarisMetaStoreManager metaStoreManager,
4860
SecurityContext securityContext,
4961
String catalogName,
50-
PolarisAuthorizer authorizer) {
51-
super(callContext, resolutionManifestFactory, securityContext, catalogName, authorizer);
62+
PolarisAuthorizer authorizer,
63+
UserSecretsManager userSecretsManager,
64+
Instance<ExternalCatalogFactory> externalCatalogFactories) {
65+
super(
66+
callContext,
67+
resolutionManifestFactory,
68+
securityContext,
69+
catalogName,
70+
authorizer,
71+
userSecretsManager,
72+
externalCatalogFactories);
5273
this.metaStoreManager = metaStoreManager;
5374
}
5475

5576
@Override
5677
protected void initializeCatalog() {
57-
this.genericTableCatalog =
58-
new PolarisGenericTableCatalog(metaStoreManager, callContext, this.resolutionManifest);
59-
this.genericTableCatalog.initialize(catalogName, Map.of());
78+
CatalogEntity resolvedCatalogEntity =
79+
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
80+
ConnectionConfigInfoDpo connectionConfigInfoDpo =
81+
resolvedCatalogEntity.getConnectionConfigInfoDpo();
82+
if (connectionConfigInfoDpo != null) {
83+
LOGGER
84+
.atInfo()
85+
.addKeyValue("remoteUrl", connectionConfigInfoDpo.getUri())
86+
.log("Initializing federated catalog");
87+
FeatureConfiguration.enforceFeatureEnabledOrThrow(
88+
callContext.getRealmConfig(), FeatureConfiguration.ENABLE_CATALOG_FEDERATION);
89+
90+
GenericTableCatalog federatedCatalog;
91+
ConnectionType connectionType =
92+
ConnectionType.fromCode(connectionConfigInfoDpo.getConnectionTypeCode());
93+
94+
// Use the unified factory pattern for all external catalog types
95+
Instance<ExternalCatalogFactory> externalCatalogFactory =
96+
externalCatalogFactories.select(
97+
Identifier.Literal.of(connectionType.getFactoryIdentifier()));
98+
if (externalCatalogFactory.isResolvable()) {
99+
federatedCatalog =
100+
externalCatalogFactory
101+
.get()
102+
.createGenericCatalog(connectionConfigInfoDpo, getUserSecretsManager());
103+
} else {
104+
throw new UnsupportedOperationException(
105+
"External catalog factory for type '" + connectionType + "' is unavailable.");
106+
}
107+
this.genericTableCatalog = federatedCatalog;
108+
} else {
109+
LOGGER.atInfo().log("Initializing non-federated catalog");
110+
this.genericTableCatalog =
111+
new PolarisGenericTableCatalog(metaStoreManager, callContext, this.resolutionManifest);
112+
this.genericTableCatalog.initialize(catalogName, Map.of());
113+
}
60114
}
61115

62116
public ListGenericTablesResponse listGenericTables(Namespace parent) {

runtime/service/src/main/java/org/apache/polaris/service/catalog/generic/PolarisGenericTableCatalog.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iceberg.exceptions.AlreadyExistsException;
2626
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
2727
import org.apache.iceberg.exceptions.NoSuchTableException;
28+
import org.apache.polaris.core.catalog.GenericTableCatalog;
2829
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
2930
import org.apache.polaris.core.context.CallContext;
3031
import org.apache.polaris.core.entity.CatalogEntity;

0 commit comments

Comments
 (0)