Skip to content

feat: adding unauthenticated HDFS storage support to catalogs (#85) #2322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/python/cli/command/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def options_get(key, f=lambda x: x):
catalog_role_session_name=options_get(Arguments.CATALOG_ROLE_SESSION_NAME),
catalog_external_id=options_get(Arguments.CATALOG_EXTERNAL_ID),
catalog_signing_region=options_get(Arguments.CATALOG_SIGNING_REGION),
catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME)
catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME),
hadoop_resources=options_get(Arguments.HADOOP_RESOURCES),
hadoop_username=options_get(Arguments.HADOOP_USERNAME)
)
elif options.command == Commands.PRINCIPALS:
from cli.command.principals import PrincipalsCommand
Expand Down
30 changes: 29 additions & 1 deletion client/python/cli/command/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from cli.options.option_tree import Argument
from polaris.management import PolarisDefaultApi, CreateCatalogRequest, UpdateCatalogRequest, \
StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \
PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \
HadoopStorageConfigInfo, PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \
OAuthClientCredentialsParameters, SigV4AuthenticationParameters, HadoopConnectionConfigInfo, \
IcebergRestConnectionConfigInfo, AwsIamServiceIdentityInfo

Expand Down Expand Up @@ -82,6 +82,8 @@ class CatalogsCommand(Command):
catalog_external_id: str
catalog_signing_region: str
catalog_signing_name: str
hadoop_resources: str = None
hadoop_username: str = None

def validate(self):
if self.catalogs_subcommand == Subcommands.CREATE:
Expand Down Expand Up @@ -158,10 +160,26 @@ def validate(self):
self._has_aws_storage_info()
or self._has_azure_storage_info()
or self._has_gcs_storage_info()
or self._has_hdfs_storage_info()
):
raise Exception(
"Storage type 'file' does not support any storage credentials"
)
elif self.storage_type == StorageType.HDFS.value:
if not self.hadoop_resources:
raise Exception(
f"Missing required argument for storage type 'hdfs':"
f" {Argument.to_flag_name(Arguments.HADOOP_RESOURCES)}"
)
if (
self._has_aws_storage_info()
or self._has_azure_storage_info()
or self._has_gcs_storage_info()
):
raise Exception(
"Storage type 'hdfs' supports the storage credential"
f" {Argument.to_flag_name(Arguments.HADOOP_RESOURCES)}"
)

def _has_aws_storage_info(self):
return self.role_arn or self.external_id or self.user_arn or self.region or self.endpoint or self.sts_endpoint or self.path_style_access
Expand All @@ -172,6 +190,9 @@ def _has_azure_storage_info(self):
def _has_gcs_storage_info(self):
return self.service_account

def _has_hdfs_storage_info(self):
return self.hadoop_resources or self.hadoop_username

def _build_storage_config_info(self):
config = None
if self.storage_type == StorageType.S3.value:
Expand Down Expand Up @@ -205,6 +226,13 @@ def _build_storage_config_info(self):
storage_type=self.storage_type.upper(),
allowed_locations=self.allowed_locations,
)
elif self.storage_type == StorageType.HDFS.value:
config = HadoopStorageConfigInfo(
storage_type=self.storage_type.upper(),
allowed_locations=self.allowed_locations,
resources=self.hadoop_resources,
username=self.hadoop_username,
)
return config

def _build_connection_config_info(self):
Expand Down
12 changes: 11 additions & 1 deletion client/python/cli/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@

class StorageType(Enum):
"""
Represents a Storage Type within the Polaris API -- `s3`, `azure`, `gcs`, or `file`.
Represents a Storage Type within the Polaris API -- `s3`, `azure`, `gcs`, `file`, or `hdfs`.
"""

S3 = "s3"
AZURE = "azure"
GCS = "gcs"
FILE = "file"
HDFS = "hdfs"


class CatalogType(Enum):
Expand Down Expand Up @@ -185,6 +186,8 @@ class Arguments:
CATALOG_EXTERNAL_ID = "catalog_external_id"
CATALOG_SIGNING_REGION = "catalog_signing_region"
CATALOG_SIGNING_NAME = "catalog_signing_name"
HADOOP_RESOURCES = "hadoop_resources"
HADOOP_USERNAME = "hadoop_username"


class Hints:
Expand Down Expand Up @@ -243,6 +246,13 @@ class Create:
"(Only for GCS) The service account to use when connecting to GCS"
)

HADOOP_RESOURCES = (
"(Required for HDFS) Comma-separated list of Hadoop configuration files (core-site.xml, hdfs-site.xml)"
)
HADOOP_USERNAME = (
"(Optional for HDFS) Username for HDFS operations. If not specified, uses process/global user authentication"
)

class Update:
DEFAULT_BASE_LOCATION = "A new default base location for the catalog"

Expand Down
2 changes: 2 additions & 0 deletions client/python/cli/options/option_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def get_tree() -> List[Option]:
Argument(Arguments.MULTI_TENANT_APP_NAME, str, Hints.Catalogs.Create.MULTI_TENANT_APP_NAME),
Argument(Arguments.CONSENT_URL, str, Hints.Catalogs.Create.CONSENT_URL),
Argument(Arguments.SERVICE_ACCOUNT, str, Hints.Catalogs.Create.SERVICE_ACCOUNT),
Argument(Arguments.HADOOP_RESOURCES, str, Hints.Catalogs.Create.HADOOP_RESOURCES),
Argument(Arguments.HADOOP_USERNAME, str, Hints.Catalogs.Create.HADOOP_USERNAME),
Argument(Arguments.PROPERTY, str, Hints.PROPERTY, allow_repeats=True),
] + OptionTree._FEDERATION_ARGS, input_name=Arguments.CATALOG),
Option(Subcommands.DELETE, input_name=Arguments.CATALOG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.iceberg.exceptions.BadRequestException;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.AzureStorageConfigInfo;
Expand All @@ -37,6 +38,7 @@
import org.apache.polaris.core.admin.model.ExternalCatalog;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.GcpStorageConfigInfo;
import org.apache.polaris.core.admin.model.HadoopStorageConfigInfo;
import org.apache.polaris.core.admin.model.PolarisCatalog;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.config.BehaviorChangeConfiguration;
Expand All @@ -48,6 +50,7 @@
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo;

/**
* Catalog specific subclass of the {@link PolarisEntity} that handles conversion from the {@link
Expand Down Expand Up @@ -166,6 +169,14 @@ private StorageConfigInfo getStorageInfo(Map<String, String> internalProperties)
return new FileStorageConfigInfo(
StorageConfigInfo.StorageTypeEnum.FILE, fileConfigModel.getAllowedLocations());
}
if (configInfo instanceof HadoopStorageConfigurationInfo) {
HadoopStorageConfigurationInfo hdfsConfigModel = (HadoopStorageConfigurationInfo) configInfo;
return HadoopStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.HDFS)
.setAllowedLocations(hdfsConfigModel.getAllowedLocations())
.setResources(hdfsConfigModel.getResources())
.build();
}
return null;
}
return null;
Expand Down Expand Up @@ -303,6 +314,15 @@ public Builder setStorageConfigurationInfo(
config =
FileStorageConfigurationInfo.builder().allowedLocations(allowedLocations).build();
break;
case HDFS:
HadoopStorageConfigInfo hadoopConfigModel = (HadoopStorageConfigInfo) storageConfigModel;
config =
HadoopStorageConfigurationInfo.builder()
.allowedLocations(allowedLocations)
.resources(hadoopConfigModel.getResources())
.username(hadoopConfigModel.getUsername())
.build();
break;
default:
throw new IllegalStateException(
"Unsupported storage type: " + storageConfigModel.getStorageType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo;
import org.immutables.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -66,6 +67,7 @@
@JsonSubTypes.Type(value = AzureStorageConfigurationInfo.class),
@JsonSubTypes.Type(value = GcpStorageConfigurationInfo.class),
@JsonSubTypes.Type(value = FileStorageConfigurationInfo.class),
@JsonSubTypes.Type(value = HadoopStorageConfigurationInfo.class),
})
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class PolarisStorageConfigurationInfo {
Expand Down Expand Up @@ -218,6 +220,7 @@ public enum StorageType {
AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
GCS("gs://"),
FILE("file://"),
HDFS("hdfs://"),
;

private final List<String> prefixes;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.storage;

/** Constants for internal properties used in storage integrations */
public class StorageInternalProperties {
/** Property key for identifying the storage type in internal properties */
public static final String STORAGE_TYPE_KEY = "storageType";

/** Property key for HDFS configuration resources in internal properties */
public static final String HDFS_CONFIG_RESOURCES_KEY = "hdfs.config-resources";

/** Property key for HDFS username in internal properties */
public static final String HDFS_USERNAME_KEY = "hdfs.username";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.polaris.core.storage.hadoop;

import jakarta.annotation.Nonnull;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.InMemoryStorageIntegration;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.StorageInternalProperties;

import java.util.Set;

/** Placeholder for Hadoop credential handling */
public class HadoopCredentialsStorageIntegration
extends InMemoryStorageIntegration<HadoopStorageConfigurationInfo> {

public HadoopCredentialsStorageIntegration(
HadoopStorageConfigurationInfo config
) {
super(config, HadoopCredentialsStorageIntegration.class.getName());
}

@Override
public AccessConfig getSubscopedCreds(@Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, @Nonnull Set<String> allowedWriteLocations) {
AccessConfig.Builder accessConfig = AccessConfig.builder();

// Set storage type for DefaultFileIOFactory
accessConfig.putInternalProperty(StorageInternalProperties.STORAGE_TYPE_KEY, PolarisStorageConfigurationInfo.StorageType.HDFS.name());

// Add Hadoop configuration resources as internal property
String resources = config().getResources();
if (resources != null && !resources.trim().isEmpty()) {
accessConfig.putInternalProperty(StorageInternalProperties.HDFS_CONFIG_RESOURCES_KEY, resources);
}

// Add HDFS username as internal property if specified
String username = config().getUsername();
if (username != null && !username.trim().isEmpty()) {
accessConfig.putInternalProperty(StorageInternalProperties.HDFS_USERNAME_KEY, username);
}
return accessConfig.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.polaris.core.storage.hadoop;

import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import jakarta.annotation.Nullable;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.immutables.PolarisImmutable;

/** HDFS Polaris Storage Configuration information */
@PolarisImmutable
@JsonSerialize(as = ImmutableHadoopStorageConfigurationInfo.class)
@JsonDeserialize(as = ImmutableHadoopStorageConfigurationInfo.class)
@JsonTypeName("HadoopStorageConfigurationInfo")
public abstract class HadoopStorageConfigurationInfo extends PolarisStorageConfigurationInfo {

public static ImmutableHadoopStorageConfigurationInfo.Builder builder() {
return ImmutableHadoopStorageConfigurationInfo.builder();
}

// TODO (RDS) : see if the validation can be done elsewhere
// /** Validate user supplied Hadoop resources are readable. */
// private void validateHadoopResources() {
// for (String resource : getResourcesArray()) {
// try {
// File file = new File(resource.trim());
// if (!file.exists()) {
// throw new IllegalArgumentException(
// "Hadoop resource supplied that does not exist: " + resource);
// }
// if (!file.canRead()) {
// throw new IllegalArgumentException(
// "Unreadable Hadoop resource supplied, please check permissions: " + resource);
// }
// } catch (IllegalArgumentException e) {
// throw e;
// } catch (Exception e) {
// throw new IllegalArgumentException("Error validating Hadoop resource: " + resource, e);
// }
// }
// }

@Override
public void validatePrefixForStorageType(String loc) {
if (!loc.startsWith("hdfs://")) {
throw new IllegalArgumentException(
String.format(
"Location prefix not allowed: '%s', expected prefix: hdfs://", loc));
}
}

@Override
public StorageType getStorageType() {
return StorageType.HDFS;
}

@Override
public String getFileIoImplClassName() {
return "org.apache.iceberg.hadoop.HadoopFileIO";
}

public abstract String getResources();

@Nullable
public abstract String getUsername();
}
2 changes: 1 addition & 1 deletion runtime/server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ tasks.named<QuarkusRun>("quarkusRun") {
"-Dpolaris.bootstrap.credentials=POLARIS,root,s3cr3t",
"-Dquarkus.console.color=true",
"-Dpolaris.features.\"ALLOW_INSECURE_STORAGE_TYPES\"=true",
"-Dpolaris.features.\"SUPPORTED_CATALOG_STORAGE_TYPES\"=[\"FILE\",\"S3\",\"GCS\",\"AZURE\"]",
"-Dpolaris.features.\"SUPPORTED_CATALOG_STORAGE_TYPES\"=[\"FILE\",\"S3\",\"GCS\",\"AZURE\",\"HDFS\"]",
"-Dpolaris.readiness.ignore-severe-issues=true",
)
}
Expand Down
Loading