|
33 | 33 | import java.util.Optional;
|
34 | 34 | import java.util.Set;
|
35 | 35 | import java.util.stream.Collectors;
|
| 36 | +import org.apache.hadoop.conf.Configuration; |
36 | 37 | import org.apache.iceberg.BaseMetadataTable;
|
37 | 38 | import org.apache.iceberg.BaseTable;
|
38 | 39 | import org.apache.iceberg.MetadataUpdate;
|
|
76 | 77 | import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
|
77 | 78 | import org.apache.polaris.core.auth.PolarisAuthorizer;
|
78 | 79 | import org.apache.polaris.core.config.FeatureConfiguration;
|
| 80 | +import org.apache.polaris.core.connection.AuthenticationParametersDpo; |
| 81 | +import org.apache.polaris.core.connection.AuthenticationType; |
79 | 82 | import org.apache.polaris.core.connection.ConnectionConfigInfoDpo;
|
80 | 83 | import org.apache.polaris.core.connection.ConnectionType;
|
81 | 84 | import org.apache.polaris.core.connection.hadoop.HadoopConnectionConfigInfoDpo;
|
@@ -232,9 +235,22 @@ protected void initializeCatalog() {
|
232 | 235 | connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
|
233 | 236 | break;
|
234 | 237 | case HADOOP:
|
235 |
| - federatedCatalog = new HadoopCatalog(); |
| 238 | + // Currently, Polaris supports Hadoop federation only via IMPLICIT authentication. |
| 239 | + // Hence, prior to initializing the configuration, ensure that the catalog uses |
| 240 | + // IMPLICIT authentication. |
| 241 | + AuthenticationParametersDpo authenticationParametersDpo = |
| 242 | + connectionConfigInfoDpo.getAuthenticationParameters(); |
| 243 | + if (authenticationParametersDpo.getAuthenticationTypeCode() |
| 244 | + != AuthenticationType.IMPLICIT.getCode()) { |
| 245 | + throw new IllegalStateException( |
| 246 | + "Hadoop federation only supports IMPLICIT authentication."); |
| 247 | + } |
| 248 | + Configuration conf = new Configuration(); |
| 249 | + String warehouse = |
| 250 | + ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(); |
| 251 | + federatedCatalog = new HadoopCatalog(conf, warehouse); |
236 | 252 | federatedCatalog.initialize(
|
237 |
| - ((HadoopConnectionConfigInfoDpo) connectionConfigInfoDpo).getWarehouse(), |
| 253 | + warehouse, |
238 | 254 | connectionConfigInfoDpo.asIcebergCatalogProperties(getUserSecretsManager()));
|
239 | 255 | break;
|
240 | 256 | default:
|
|
0 commit comments