diff --git a/client/python/cli/command/__init__.py b/client/python/cli/command/__init__.py index 659a9b9e2c..d22f1ebb5d 100644 --- a/client/python/cli/command/__init__.py +++ b/client/python/cli/command/__init__.py @@ -82,7 +82,9 @@ def options_get(key, f=lambda x: x): catalog_role_session_name=options_get(Arguments.CATALOG_ROLE_SESSION_NAME), catalog_external_id=options_get(Arguments.CATALOG_EXTERNAL_ID), catalog_signing_region=options_get(Arguments.CATALOG_SIGNING_REGION), - catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME) + catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME), + hadoop_resources=options_get(Arguments.HADOOP_RESOURCES), + hadoop_username=options_get(Arguments.HADOOP_USERNAME) ) elif options.command == Commands.PRINCIPALS: from cli.command.principals import PrincipalsCommand diff --git a/client/python/cli/command/catalogs.py b/client/python/cli/command/catalogs.py index 3708bb5d63..c69354cebd 100644 --- a/client/python/cli/command/catalogs.py +++ b/client/python/cli/command/catalogs.py @@ -27,7 +27,7 @@ from cli.options.option_tree import Argument from polaris.management import PolarisDefaultApi, CreateCatalogRequest, UpdateCatalogRequest, \ StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \ - PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \ + HadoopStorageConfigInfo, PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \ OAuthClientCredentialsParameters, SigV4AuthenticationParameters, HadoopConnectionConfigInfo, \ IcebergRestConnectionConfigInfo, AwsIamServiceIdentityInfo @@ -82,6 +82,8 @@ class CatalogsCommand(Command): catalog_external_id: str catalog_signing_region: str catalog_signing_name: str + hadoop_resources: str = None + hadoop_username: str = None def validate(self): if self.catalogs_subcommand == Subcommands.CREATE: @@ -158,10 +160,26 @@ def validate(self): self._has_aws_storage_info() or self._has_azure_storage_info() or self._has_gcs_storage_info() + or self._has_hdfs_storage_info() ): raise Exception( "Storage type 'file' does not support any storage credentials" ) + elif self.storage_type == StorageType.HDFS.value: + if not self.hadoop_resources: + raise Exception( + f"Missing required argument for storage type 'hdfs':" + f" {Argument.to_flag_name(Arguments.HADOOP_RESOURCES)}" + ) + if ( + self._has_aws_storage_info() + or self._has_azure_storage_info() + or self._has_gcs_storage_info() + ): + raise Exception( + "Storage type 'hdfs' supports the storage credential" + f" {Argument.to_flag_name(Arguments.HADOOP_RESOURCES)}" + ) def _has_aws_storage_info(self): return self.role_arn or self.external_id or self.user_arn or self.region or self.endpoint or self.sts_endpoint or self.path_style_access @@ -172,6 +190,9 @@ def _has_azure_storage_info(self): def _has_gcs_storage_info(self): return self.service_account + def _has_hdfs_storage_info(self): + return self.hadoop_resources or self.hadoop_username + def _build_storage_config_info(self): config = None if self.storage_type == StorageType.S3.value: @@ -205,6 +226,13 @@ def _build_storage_config_info(self): storage_type=self.storage_type.upper(), allowed_locations=self.allowed_locations, ) + elif self.storage_type == StorageType.HDFS.value: + config = HadoopStorageConfigInfo( + storage_type=self.storage_type.upper(), + allowed_locations=self.allowed_locations, + resources=self.hadoop_resources, + username=self.hadoop_username, + ) return config def _build_connection_config_info(self): diff --git a/client/python/cli/constants.py b/client/python/cli/constants.py index d3027009a4..e85ade5619 100644 --- a/client/python/cli/constants.py +++ b/client/python/cli/constants.py @@ -22,13 +22,14 @@ class StorageType(Enum): """ - Represents a Storage Type within the Polaris API -- `s3`, `azure`, `gcs`, or `file`. + Represents a Storage Type within the Polaris API -- `s3`, `azure`, `gcs`, `file`, or `hdfs`. """ S3 = "s3" AZURE = "azure" GCS = "gcs" FILE = "file" + HDFS = "hdfs" class CatalogType(Enum): @@ -185,6 +186,8 @@ class Arguments: CATALOG_EXTERNAL_ID = "catalog_external_id" CATALOG_SIGNING_REGION = "catalog_signing_region" CATALOG_SIGNING_NAME = "catalog_signing_name" + HADOOP_RESOURCES = "hadoop_resources" + HADOOP_USERNAME = "hadoop_username" class Hints: @@ -243,6 +246,13 @@ class Create: "(Only for GCS) The service account to use when connecting to GCS" ) + HADOOP_RESOURCES = ( + "(Required for HDFS) Comma-separated list of Hadoop configuration files (core-site.xml, hdfs-site.xml)" + ) + HADOOP_USERNAME = ( + "(Optional for HDFS) Username for HDFS operations. If not specified, uses process/global user authentication" + ) + class Update: DEFAULT_BASE_LOCATION = "A new default base location for the catalog" diff --git a/client/python/cli/options/option_tree.py b/client/python/cli/options/option_tree.py index 7b10a64ea6..4def3bccee 100644 --- a/client/python/cli/options/option_tree.py +++ b/client/python/cli/options/option_tree.py @@ -128,6 +128,8 @@ def get_tree() -> List[Option]: Argument(Arguments.MULTI_TENANT_APP_NAME, str, Hints.Catalogs.Create.MULTI_TENANT_APP_NAME), Argument(Arguments.CONSENT_URL, str, Hints.Catalogs.Create.CONSENT_URL), Argument(Arguments.SERVICE_ACCOUNT, str, Hints.Catalogs.Create.SERVICE_ACCOUNT), + Argument(Arguments.HADOOP_RESOURCES, str, Hints.Catalogs.Create.HADOOP_RESOURCES), + Argument(Arguments.HADOOP_USERNAME, str, Hints.Catalogs.Create.HADOOP_USERNAME), Argument(Arguments.PROPERTY, str, Hints.PROPERTY, allow_repeats=True), ] + OptionTree._FEDERATION_ARGS, input_name=Arguments.CATALOG), Option(Subcommands.DELETE, input_name=Arguments.CATALOG), diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java index 622df1fca6..dcf3eb2f77 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; + import org.apache.iceberg.exceptions.BadRequestException; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; import org.apache.polaris.core.admin.model.AzureStorageConfigInfo; @@ -37,6 +38,7 @@ import org.apache.polaris.core.admin.model.ExternalCatalog; import org.apache.polaris.core.admin.model.FileStorageConfigInfo; import org.apache.polaris.core.admin.model.GcpStorageConfigInfo; +import org.apache.polaris.core.admin.model.HadoopStorageConfigInfo; import org.apache.polaris.core.admin.model.PolarisCatalog; import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.config.BehaviorChangeConfiguration; @@ -48,6 +50,7 @@ import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo; import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo; +import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo; /** * Catalog specific subclass of the {@link PolarisEntity} that handles conversion from the {@link @@ -166,6 +169,14 @@ private StorageConfigInfo getStorageInfo(Map internalProperties) return new FileStorageConfigInfo( StorageConfigInfo.StorageTypeEnum.FILE, fileConfigModel.getAllowedLocations()); } + if (configInfo instanceof HadoopStorageConfigurationInfo) { + HadoopStorageConfigurationInfo hdfsConfigModel = (HadoopStorageConfigurationInfo) configInfo; + return HadoopStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.HDFS) + .setAllowedLocations(hdfsConfigModel.getAllowedLocations()) + .setResources(hdfsConfigModel.getResources()) + .build(); + } return null; } return null; @@ -303,6 +314,15 @@ public Builder setStorageConfigurationInfo( config = FileStorageConfigurationInfo.builder().allowedLocations(allowedLocations).build(); break; + case HDFS: + HadoopStorageConfigInfo hadoopConfigModel = (HadoopStorageConfigInfo) storageConfigModel; + config = + HadoopStorageConfigurationInfo.builder() + .allowedLocations(allowedLocations) + .resources(hadoopConfigModel.getResources()) + .username(hadoopConfigModel.getUsername()) + .build(); + break; default: throw new IllegalStateException( "Unsupported storage type: " + storageConfigModel.getStorageType()); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java index 414156fb48..1477c7d08d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java @@ -47,6 +47,7 @@ import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo; import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo; +import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo; import org.immutables.value.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,7 @@ @JsonSubTypes.Type(value = AzureStorageConfigurationInfo.class), @JsonSubTypes.Type(value = GcpStorageConfigurationInfo.class), @JsonSubTypes.Type(value = FileStorageConfigurationInfo.class), + @JsonSubTypes.Type(value = HadoopStorageConfigurationInfo.class), }) @JsonIgnoreProperties(ignoreUnknown = true) public abstract class PolarisStorageConfigurationInfo { @@ -218,6 +220,7 @@ public enum StorageType { AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")), GCS("gs://"), FILE("file://"), + HDFS("hdfs://"), ; private final List prefixes; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageInternalProperties.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageInternalProperties.java new file mode 100644 index 0000000000..2ba334a5e6 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageInternalProperties.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage; + +/** Constants for internal properties used in storage integrations */ +public class StorageInternalProperties { + /** Property key for identifying the storage type in internal properties */ + public static final String STORAGE_TYPE_KEY = "storageType"; + + /** Property key for HDFS configuration resources in internal properties */ + public static final String HDFS_CONFIG_RESOURCES_KEY = "hdfs.config-resources"; + + /** Property key for HDFS username in internal properties */ + public static final String HDFS_USERNAME_KEY = "hdfs.username"; +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/hadoop/HadoopCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/hadoop/HadoopCredentialsStorageIntegration.java new file mode 100644 index 0000000000..2860e9d71e --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/hadoop/HadoopCredentialsStorageIntegration.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.storage.hadoop; + +import jakarta.annotation.Nonnull; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.storage.AccessConfig; +import org.apache.polaris.core.storage.InMemoryStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageInternalProperties; + +import java.util.Set; + +/** Placeholder for Hadoop credential handling */ +public class HadoopCredentialsStorageIntegration + extends InMemoryStorageIntegration { + + public HadoopCredentialsStorageIntegration( + HadoopStorageConfigurationInfo config + ) { + super(config, HadoopCredentialsStorageIntegration.class.getName()); + } + + @Override + public AccessConfig getSubscopedCreds(@Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations) { + AccessConfig.Builder accessConfig = AccessConfig.builder(); + + // Set storage type for DefaultFileIOFactory + accessConfig.putInternalProperty(StorageInternalProperties.STORAGE_TYPE_KEY, PolarisStorageConfigurationInfo.StorageType.HDFS.name()); + + // Add Hadoop configuration resources as internal property + String resources = config().getResources(); + if (resources != null && !resources.trim().isEmpty()) { + accessConfig.putInternalProperty(StorageInternalProperties.HDFS_CONFIG_RESOURCES_KEY, resources); + } + + // Add HDFS username as internal property if specified + String username = config().getUsername(); + if (username != null && !username.trim().isEmpty()) { + accessConfig.putInternalProperty(StorageInternalProperties.HDFS_USERNAME_KEY, username); + } + return accessConfig.build(); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/hadoop/HadoopStorageConfigurationInfo.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/hadoop/HadoopStorageConfigurationInfo.java new file mode 100644 index 0000000000..6db8e23906 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/hadoop/HadoopStorageConfigurationInfo.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.polaris.core.storage.hadoop; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import jakarta.annotation.Nullable; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.immutables.PolarisImmutable; + +/** HDFS Polaris Storage Configuration information */ +@PolarisImmutable +@JsonSerialize(as = ImmutableHadoopStorageConfigurationInfo.class) +@JsonDeserialize(as = ImmutableHadoopStorageConfigurationInfo.class) +@JsonTypeName("HadoopStorageConfigurationInfo") +public abstract class HadoopStorageConfigurationInfo extends PolarisStorageConfigurationInfo { + + public static ImmutableHadoopStorageConfigurationInfo.Builder builder() { + return ImmutableHadoopStorageConfigurationInfo.builder(); + } + +// TODO (RDS) : see if the validation can be done elsewhere +// /** Validate user supplied Hadoop resources are readable. */ +// private void validateHadoopResources() { +// for (String resource : getResourcesArray()) { +// try { +// File file = new File(resource.trim()); +// if (!file.exists()) { +// throw new IllegalArgumentException( +// "Hadoop resource supplied that does not exist: " + resource); +// } +// if (!file.canRead()) { +// throw new IllegalArgumentException( +// "Unreadable Hadoop resource supplied, please check permissions: " + resource); +// } +// } catch (IllegalArgumentException e) { +// throw e; +// } catch (Exception e) { +// throw new IllegalArgumentException("Error validating Hadoop resource: " + resource, e); +// } +// } +// } + + @Override + public void validatePrefixForStorageType(String loc) { + if (!loc.startsWith("hdfs://")) { + throw new IllegalArgumentException( + String.format( + "Location prefix not allowed: '%s', expected prefix: hdfs://", loc)); + } + } + + @Override + public StorageType getStorageType() { + return StorageType.HDFS; + } + + @Override + public String getFileIoImplClassName() { + return "org.apache.iceberg.hadoop.HadoopFileIO"; + } + + public abstract String getResources(); + + @Nullable + public abstract String getUsername(); +} diff --git a/runtime/server/build.gradle.kts b/runtime/server/build.gradle.kts index 2ebd153812..d7719d1842 100644 --- a/runtime/server/build.gradle.kts +++ b/runtime/server/build.gradle.kts @@ -79,7 +79,7 @@ tasks.named("quarkusRun") { "-Dpolaris.bootstrap.credentials=POLARIS,root,s3cr3t", "-Dquarkus.console.color=true", "-Dpolaris.features.\"ALLOW_INSECURE_STORAGE_TYPES\"=true", - "-Dpolaris.features.\"SUPPORTED_CATALOG_STORAGE_TYPES\"=[\"FILE\",\"S3\",\"GCS\",\"AZURE\"]", + "-Dpolaris.features.\"SUPPORTED_CATALOG_STORAGE_TYPES\"=[\"FILE\",\"S3\",\"GCS\",\"AZURE\",\"HDFS\"]", "-Dpolaris.readiness.ignore-severe-issues=true", ) } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index d2c73e2684..1559b35bfc 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; @@ -39,6 +40,8 @@ import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisCredentialVendor; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.StorageInternalProperties; import org.apache.polaris.core.storage.cache.StorageCredentialCache; /** @@ -109,7 +112,39 @@ public FileIO loadFileIO( @VisibleForTesting FileIO loadFileIOInternal( @Nonnull String ioImplClassName, @Nonnull Map properties) { - return new ExceptionMappingFileIO( - CatalogUtil.loadFileIO(ioImplClassName, properties, new Configuration())); + Configuration hadoopConfig = createHadoopConfiguration(properties); + FileIO fileIO = CatalogUtil.loadFileIO(ioImplClassName, properties, hadoopConfig); + + // Check for HDFS username and wrap with impersonation if present + String hdfsUsername = properties.get(StorageInternalProperties.HDFS_USERNAME_KEY); + if (hdfsUsername != null && !hdfsUsername.trim().isEmpty()) { + fileIO = new HadoopImpersonatingFileIO(fileIO, hdfsUsername); + } + + return new ExceptionMappingFileIO(fileIO); + } + + private Configuration createHadoopConfiguration(@Nonnull Map properties) { + String storageType = properties.get(StorageInternalProperties.STORAGE_TYPE_KEY); + String resources = properties.get(StorageInternalProperties.HDFS_CONFIG_RESOURCES_KEY); + + if (storageType != null && + PolarisStorageConfigurationInfo.StorageType.HDFS.name().equalsIgnoreCase(storageType) && + resources != null && !resources.trim().isEmpty()) { + + Configuration hadoopConfig = new Configuration(false); + + String[] resourcePaths = resources.split(","); + for (String resourcePath : resourcePaths) { + String trimmedPath = resourcePath.trim(); + if (!trimmedPath.isEmpty()) { + hadoopConfig.addResource(new Path(trimmedPath)); + } + } + + return hadoopConfig; + } + + return new Configuration(true); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/HadoopImpersonatingFileIO.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/HadoopImpersonatingFileIO.java new file mode 100644 index 0000000000..acf27dc946 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/HadoopImpersonatingFileIO.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.catalog.io; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +/** + * A delegating FileIO wrapper that executes all operations under a specific HDFS user context + * using UserGroupInformation.doAs(). This enables per-catalog user impersonation for HDFS access. + */ +public class HadoopImpersonatingFileIO implements FileIO { + + private final FileIO delegate; + private final UserGroupInformation ugi; + + public HadoopImpersonatingFileIO(FileIO delegate, String username) { + this.delegate = delegate; + this.ugi = UserGroupInformation.createRemoteUser(username); + } + + @Override + public InputFile newInputFile(String path) { + try { + return ugi.doAs((PrivilegedExceptionAction) () -> delegate.newInputFile(path)); + } catch (Exception e) { + throw new RuntimeException("Failed to create InputFile with user impersonation", e); + } + } + + @Override + public OutputFile newOutputFile(String path) { + try { + return ugi.doAs((PrivilegedExceptionAction) () -> delegate.newOutputFile(path)); + } catch (Exception e) { + throw new RuntimeException("Failed to create OutputFile with user impersonation", e); + } + } + + @Override + public void deleteFile(String path) { + try { + ugi.doAs((PrivilegedExceptionAction) () -> { + delegate.deleteFile(path); + return null; + }); + } catch (Exception e) { + throw new RuntimeException("Failed to delete file with user impersonation", e); + } + } + + @Override + public Map properties() { + try { + return ugi.doAs((PrivilegedExceptionAction>) () -> delegate.properties()); + } catch (Exception e) { + throw new RuntimeException("Failed to get properties with user impersonation", e); + } + } + + @Override + public void initialize(Map properties) { + try { + ugi.doAs((PrivilegedExceptionAction) () -> { + delegate.initialize(properties); + return null; + }); + } catch (Exception e) { + throw new RuntimeException("Failed to initialize with user impersonation", e); + } + } + + @Override + public void close() { + try { + ugi.doAs((PrivilegedExceptionAction) () -> { + delegate.close(); + return null; + }); + } catch (Exception e) { + throw new RuntimeException("Failed to close with user impersonation", e); + } + } +} + diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/validation/StorageTypeFileIO.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/validation/StorageTypeFileIO.java index 55684ab2d2..e237d411bb 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/validation/StorageTypeFileIO.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/validation/StorageTypeFileIO.java @@ -32,6 +32,8 @@ enum StorageTypeFileIO { FILE("org.apache.iceberg.hadoop.HadoopFileIO", false), + HDFS("org.apache.iceberg.hadoop.HadoopFileIO", false), + // Iceberg tests IN_MEMORY("org.apache.iceberg.inmemory.InMemoryFileIO", false, false), ; diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index e07bdd0823..4161671c6f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -44,6 +44,8 @@ import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo; import org.apache.polaris.core.storage.gcp.GcpCredentialsStorageIntegration; import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo; +import org.apache.polaris.core.storage.hadoop.HadoopCredentialsStorageIntegration; +import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @ApplicationScoped @@ -105,6 +107,12 @@ public PolarisStorageIntegrationProviderImpl( new AzureCredentialsStorageIntegration( (AzureStorageConfigurationInfo) polarisStorageConfigurationInfo); break; + case HDFS: + storageIntegration = + (PolarisStorageIntegration) + new HadoopCredentialsStorageIntegration( + (HadoopStorageConfigurationInfo) polarisStorageConfigurationInfo); + break; case FILE: storageIntegration = new PolarisStorageIntegration<>((T) polarisStorageConfigurationInfo, "file") { diff --git a/spec/polaris-management-service.yml b/spec/polaris-management-service.yml index a7398bd392..c9b7002f23 100644 --- a/spec/polaris-management-service.yml +++ b/spec/polaris-management-service.yml @@ -1010,12 +1010,13 @@ components: - GCS - AZURE - FILE - description: The cloud provider type this storage is built on. FILE is supported for testing purposes only + - HDFS + description: The provider type this storage is built on. FILE is supported for testing purposes only allowedLocations: type: array items: type: string - example: "For AWS [s3://bucketname/prefix/], for AZURE [abfss://container@storageaccount.blob.core.windows.net/prefix/], for GCP [gs://bucketname/prefix/]" + example: "For AWS [s3://bucketname/prefix/], for AZURE [abfss://container@storageaccount.blob.core.windows.net/prefix/], for GCP [gs://bucketname/prefix/], for HDFS [hdfs://nameservice/prefix/]" required: - storageType discriminator: @@ -1025,6 +1026,7 @@ components: AZURE: "#/components/schemas/AzureStorageConfigInfo" GCS: "#/components/schemas/GcpStorageConfigInfo" FILE: "#/components/schemas/FileStorageConfigInfo" + HDFS: "#/components/schemas/HadoopStorageConfigInfo" AwsStorageConfigInfo: type: object @@ -1140,6 +1142,19 @@ components: example: "arn:aws:iam::111122223333:user/polaris-service-user" required: - iamArn + HadoopStorageConfigInfo: + type: object + description: hadoop storage configuration info + allOf: + - $ref: '#/components/schemas/StorageConfigInfo' + - type: object + properties: + resources: + type: string + description: an optional comma-separated list of Hadoop configuration files + username: + type: string + description: an optional username for HDFS operations UpdateCatalogRequest: description: Updates to apply to a Catalog. Any fields which are required in the Catalog @@ -1602,3 +1617,4 @@ components: $ref: "#/components/schemas/GrantResource" required: - grants +