Skip to content

Commit 040a855

Browse files
committed
feat: adding unauthenticated HDFS storage support to catalogs (#85)
Initially contains per catalog resources and username configurations that enable Polaris to configure and impersonate against an HDFS cluster
1 parent af69d9f commit 040a855

File tree

15 files changed

+409
-8
lines changed

15 files changed

+409
-8
lines changed

client/python/cli/command/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ def options_get(key, f=lambda x: x):
8282
catalog_role_session_name=options_get(Arguments.CATALOG_ROLE_SESSION_NAME),
8383
catalog_external_id=options_get(Arguments.CATALOG_EXTERNAL_ID),
8484
catalog_signing_region=options_get(Arguments.CATALOG_SIGNING_REGION),
85-
catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME)
85+
catalog_signing_name=options_get(Arguments.CATALOG_SIGNING_NAME),
86+
hadoop_resources=options_get(Arguments.HADOOP_RESOURCES),
87+
hadoop_username=options_get(Arguments.HADOOP_USERNAME)
8688
)
8789
elif options.command == Commands.PRINCIPALS:
8890
from cli.command.principals import PrincipalsCommand

client/python/cli/command/catalogs.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from cli.options.option_tree import Argument
2828
from polaris.management import PolarisDefaultApi, CreateCatalogRequest, UpdateCatalogRequest, \
2929
StorageConfigInfo, ExternalCatalog, AwsStorageConfigInfo, AzureStorageConfigInfo, GcpStorageConfigInfo, \
30-
PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \
30+
HadoopStorageConfigInfo, PolarisCatalog, CatalogProperties, BearerAuthenticationParameters, ImplicitAuthenticationParameters, \
3131
OAuthClientCredentialsParameters, SigV4AuthenticationParameters, HadoopConnectionConfigInfo, \
3232
IcebergRestConnectionConfigInfo, AwsIamServiceIdentityInfo
3333

@@ -82,6 +82,8 @@ class CatalogsCommand(Command):
8282
catalog_external_id: str
8383
catalog_signing_region: str
8484
catalog_signing_name: str
85+
hadoop_resources: str = None
86+
hadoop_username: str = None
8587

8688
def validate(self):
8789
if self.catalogs_subcommand == Subcommands.CREATE:
@@ -158,10 +160,26 @@ def validate(self):
158160
self._has_aws_storage_info()
159161
or self._has_azure_storage_info()
160162
or self._has_gcs_storage_info()
163+
or self._has_hdfs_storage_info()
161164
):
162165
raise Exception(
163166
"Storage type 'file' does not support any storage credentials"
164167
)
168+
elif self.storage_type == StorageType.HDFS.value:
169+
if not self.hadoop_resources:
170+
raise Exception(
171+
f"Missing required argument for storage type 'hdfs':"
172+
f" {Argument.to_flag_name(Arguments.HADOOP_RESOURCES)}"
173+
)
174+
if (
175+
self._has_aws_storage_info()
176+
or self._has_azure_storage_info()
177+
or self._has_gcs_storage_info()
178+
):
179+
raise Exception(
180+
"Storage type 'hdfs' supports the storage credential"
181+
f" {Argument.to_flag_name(Arguments.HADOOP_RESOURCES)}"
182+
)
165183

166184
def _has_aws_storage_info(self):
167185
return self.role_arn or self.external_id or self.user_arn or self.region or self.endpoint or self.sts_endpoint or self.path_style_access
@@ -172,6 +190,9 @@ def _has_azure_storage_info(self):
172190
def _has_gcs_storage_info(self):
173191
return self.service_account
174192

193+
def _has_hdfs_storage_info(self):
194+
return self.hadoop_resources or self.hadoop_username
195+
175196
def _build_storage_config_info(self):
176197
config = None
177198
if self.storage_type == StorageType.S3.value:
@@ -205,6 +226,13 @@ def _build_storage_config_info(self):
205226
storage_type=self.storage_type.upper(),
206227
allowed_locations=self.allowed_locations,
207228
)
229+
elif self.storage_type == StorageType.HDFS.value:
230+
config = HadoopStorageConfigInfo(
231+
storage_type=self.storage_type.upper(),
232+
allowed_locations=self.allowed_locations,
233+
resources=self.hadoop_resources,
234+
username=self.hadoop_username,
235+
)
208236
return config
209237

210238
def _build_connection_config_info(self):

client/python/cli/constants.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222

2323
class StorageType(Enum):
2424
"""
25-
Represents a Storage Type within the Polaris API -- `s3`, `azure`, `gcs`, or `file`.
25+
Represents a Storage Type within the Polaris API -- `s3`, `azure`, `gcs`, `file`, or `hdfs`.
2626
"""
2727

2828
S3 = "s3"
2929
AZURE = "azure"
3030
GCS = "gcs"
3131
FILE = "file"
32+
HDFS = "hdfs"
3233

3334

3435
class CatalogType(Enum):
@@ -185,6 +186,8 @@ class Arguments:
185186
CATALOG_EXTERNAL_ID = "catalog_external_id"
186187
CATALOG_SIGNING_REGION = "catalog_signing_region"
187188
CATALOG_SIGNING_NAME = "catalog_signing_name"
189+
HADOOP_RESOURCES = "hadoop_resources"
190+
HADOOP_USERNAME = "hadoop_username"
188191

189192

190193
class Hints:
@@ -243,6 +246,13 @@ class Create:
243246
"(Only for GCS) The service account to use when connecting to GCS"
244247
)
245248

249+
HADOOP_RESOURCES = (
250+
"(Required for HDFS) Comma-separated list of Hadoop configuration files (core-site.xml, hdfs-site.xml)"
251+
)
252+
HADOOP_USERNAME = (
253+
"(Optional for HDFS) Username for HDFS operations. If not specified, uses process/global user authentication"
254+
)
255+
246256
class Update:
247257
DEFAULT_BASE_LOCATION = "A new default base location for the catalog"
248258

client/python/cli/options/option_tree.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ def get_tree() -> List[Option]:
128128
Argument(Arguments.MULTI_TENANT_APP_NAME, str, Hints.Catalogs.Create.MULTI_TENANT_APP_NAME),
129129
Argument(Arguments.CONSENT_URL, str, Hints.Catalogs.Create.CONSENT_URL),
130130
Argument(Arguments.SERVICE_ACCOUNT, str, Hints.Catalogs.Create.SERVICE_ACCOUNT),
131+
Argument(Arguments.HADOOP_RESOURCES, str, Hints.Catalogs.Create.HADOOP_RESOURCES),
132+
Argument(Arguments.HADOOP_USERNAME, str, Hints.Catalogs.Create.HADOOP_USERNAME),
131133
Argument(Arguments.PROPERTY, str, Hints.PROPERTY, allow_repeats=True),
132134
] + OptionTree._FEDERATION_ARGS, input_name=Arguments.CATALOG),
133135
Option(Subcommands.DELETE, input_name=Arguments.CATALOG),

polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Optional;
3030
import java.util.Set;
31+
3132
import org.apache.iceberg.exceptions.BadRequestException;
3233
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
3334
import org.apache.polaris.core.admin.model.AzureStorageConfigInfo;
@@ -37,6 +38,7 @@
3738
import org.apache.polaris.core.admin.model.ExternalCatalog;
3839
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
3940
import org.apache.polaris.core.admin.model.GcpStorageConfigInfo;
41+
import org.apache.polaris.core.admin.model.HadoopStorageConfigInfo;
4042
import org.apache.polaris.core.admin.model.PolarisCatalog;
4143
import org.apache.polaris.core.admin.model.StorageConfigInfo;
4244
import org.apache.polaris.core.config.BehaviorChangeConfiguration;
@@ -48,6 +50,7 @@
4850
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
4951
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
5052
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
53+
import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo;
5154

5255
/**
5356
* Catalog specific subclass of the {@link PolarisEntity} that handles conversion from the {@link
@@ -166,6 +169,14 @@ private StorageConfigInfo getStorageInfo(Map<String, String> internalProperties)
166169
return new FileStorageConfigInfo(
167170
StorageConfigInfo.StorageTypeEnum.FILE, fileConfigModel.getAllowedLocations());
168171
}
172+
if (configInfo instanceof HadoopStorageConfigurationInfo) {
173+
HadoopStorageConfigurationInfo hdfsConfigModel = (HadoopStorageConfigurationInfo) configInfo;
174+
return HadoopStorageConfigInfo.builder()
175+
.setStorageType(StorageConfigInfo.StorageTypeEnum.HDFS)
176+
.setAllowedLocations(hdfsConfigModel.getAllowedLocations())
177+
.setResources(hdfsConfigModel.getResources())
178+
.build();
179+
}
169180
return null;
170181
}
171182
return null;
@@ -303,6 +314,15 @@ public Builder setStorageConfigurationInfo(
303314
config =
304315
FileStorageConfigurationInfo.builder().allowedLocations(allowedLocations).build();
305316
break;
317+
case HDFS:
318+
HadoopStorageConfigInfo hadoopConfigModel = (HadoopStorageConfigInfo) storageConfigModel;
319+
config =
320+
HadoopStorageConfigurationInfo.builder()
321+
.allowedLocations(allowedLocations)
322+
.resources(hadoopConfigModel.getResources())
323+
.username(hadoopConfigModel.getUsername())
324+
.build();
325+
break;
306326
default:
307327
throw new IllegalStateException(
308328
"Unsupported storage type: " + storageConfigModel.getStorageType());

polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
4848
import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo;
4949
import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo;
50+
import org.apache.polaris.core.storage.hadoop.HadoopStorageConfigurationInfo;
5051
import org.immutables.value.Value;
5152
import org.slf4j.Logger;
5253
import org.slf4j.LoggerFactory;
@@ -66,6 +67,7 @@
6667
@JsonSubTypes.Type(value = AzureStorageConfigurationInfo.class),
6768
@JsonSubTypes.Type(value = GcpStorageConfigurationInfo.class),
6869
@JsonSubTypes.Type(value = FileStorageConfigurationInfo.class),
70+
@JsonSubTypes.Type(value = HadoopStorageConfigurationInfo.class),
6971
})
7072
@JsonIgnoreProperties(ignoreUnknown = true)
7173
public abstract class PolarisStorageConfigurationInfo {
@@ -218,6 +220,7 @@ public enum StorageType {
218220
AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
219221
GCS("gs://"),
220222
FILE("file://"),
223+
HDFS("hdfs://"),
221224
;
222225

223226
private final List<String> prefixes;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.polaris.core.storage;
20+
21+
/** Constants for internal properties used in storage integrations */
22+
public class StorageInternalProperties {
23+
/** Property key for identifying the storage type in internal properties */
24+
public static final String STORAGE_TYPE_KEY = "storageType";
25+
26+
/** Property key for HDFS configuration resources in internal properties */
27+
public static final String HDFS_CONFIG_RESOURCES_KEY = "hdfs.config-resources";
28+
29+
/** Property key for HDFS username in internal properties */
30+
public static final String HDFS_USERNAME_KEY = "hdfs.username";
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2024 Snowflake Computing Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.polaris.core.storage.hadoop;
17+
18+
import jakarta.annotation.Nonnull;
19+
import org.apache.polaris.core.config.RealmConfig;
20+
import org.apache.polaris.core.storage.AccessConfig;
21+
import org.apache.polaris.core.storage.InMemoryStorageIntegration;
22+
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
23+
import org.apache.polaris.core.storage.StorageInternalProperties;
24+
25+
import java.util.Set;
26+
27+
/** Placeholder for Hadoop credential handling */
28+
public class HadoopCredentialsStorageIntegration
29+
extends InMemoryStorageIntegration<HadoopStorageConfigurationInfo> {
30+
31+
public HadoopCredentialsStorageIntegration(
32+
HadoopStorageConfigurationInfo config
33+
) {
34+
super(config, HadoopCredentialsStorageIntegration.class.getName());
35+
}
36+
37+
@Override
38+
public AccessConfig getSubscopedCreds(@Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set<String> allowedReadLocations, @Nonnull Set<String> allowedWriteLocations) {
39+
AccessConfig.Builder accessConfig = AccessConfig.builder();
40+
41+
// Set storage type for DefaultFileIOFactory
42+
accessConfig.putInternalProperty(StorageInternalProperties.STORAGE_TYPE_KEY, PolarisStorageConfigurationInfo.StorageType.HDFS.name());
43+
44+
// Add Hadoop configuration resources as internal property
45+
String resources = config().getResources();
46+
if (resources != null && !resources.trim().isEmpty()) {
47+
accessConfig.putInternalProperty(StorageInternalProperties.HDFS_CONFIG_RESOURCES_KEY, resources);
48+
}
49+
50+
// Add HDFS username as internal property if specified
51+
String username = config().getUsername();
52+
if (username != null && !username.trim().isEmpty()) {
53+
accessConfig.putInternalProperty(StorageInternalProperties.HDFS_USERNAME_KEY, username);
54+
}
55+
return accessConfig.build();
56+
}
57+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2024 Snowflake Computing Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.polaris.core.storage.hadoop;
17+
18+
import com.fasterxml.jackson.annotation.JsonTypeName;
19+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
20+
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
21+
import jakarta.annotation.Nullable;
22+
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
23+
import org.apache.polaris.immutables.PolarisImmutable;
24+
25+
/** HDFS Polaris Storage Configuration information */
26+
@PolarisImmutable
27+
@JsonSerialize(as = ImmutableHadoopStorageConfigurationInfo.class)
28+
@JsonDeserialize(as = ImmutableHadoopStorageConfigurationInfo.class)
29+
@JsonTypeName("HadoopStorageConfigurationInfo")
30+
public abstract class HadoopStorageConfigurationInfo extends PolarisStorageConfigurationInfo {
31+
32+
public static ImmutableHadoopStorageConfigurationInfo.Builder builder() {
33+
return ImmutableHadoopStorageConfigurationInfo.builder();
34+
}
35+
36+
// TODO (RDS) : see if the validation can be done elsewhere
37+
// /** Validate user supplied Hadoop resources are readable. */
38+
// private void validateHadoopResources() {
39+
// for (String resource : getResourcesArray()) {
40+
// try {
41+
// File file = new File(resource.trim());
42+
// if (!file.exists()) {
43+
// throw new IllegalArgumentException(
44+
// "Hadoop resource supplied that does not exist: " + resource);
45+
// }
46+
// if (!file.canRead()) {
47+
// throw new IllegalArgumentException(
48+
// "Unreadable Hadoop resource supplied, please check permissions: " + resource);
49+
// }
50+
// } catch (IllegalArgumentException e) {
51+
// throw e;
52+
// } catch (Exception e) {
53+
// throw new IllegalArgumentException("Error validating Hadoop resource: " + resource, e);
54+
// }
55+
// }
56+
// }
57+
58+
@Override
59+
public void validatePrefixForStorageType(String loc) {
60+
if (!loc.startsWith("hdfs://")) {
61+
throw new IllegalArgumentException(
62+
String.format(
63+
"Location prefix not allowed: '%s', expected prefix: hdfs://", loc));
64+
}
65+
}
66+
67+
@Override
68+
public StorageType getStorageType() {
69+
return StorageType.HDFS;
70+
}
71+
72+
@Override
73+
public String getFileIoImplClassName() {
74+
return "org.apache.iceberg.hadoop.HadoopFileIO";
75+
}
76+
77+
public abstract String getResources();
78+
79+
@Nullable
80+
public abstract String getUsername();
81+
}

runtime/server/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ tasks.named<QuarkusRun>("quarkusRun") {
7979
"-Dpolaris.bootstrap.credentials=POLARIS,root,s3cr3t",
8080
"-Dquarkus.console.color=true",
8181
"-Dpolaris.features.\"ALLOW_INSECURE_STORAGE_TYPES\"=true",
82-
"-Dpolaris.features.\"SUPPORTED_CATALOG_STORAGE_TYPES\"=[\"FILE\",\"S3\",\"GCS\",\"AZURE\"]",
82+
"-Dpolaris.features.\"SUPPORTED_CATALOG_STORAGE_TYPES\"=[\"FILE\",\"S3\",\"GCS\",\"AZURE\",\"HDFS\"]",
8383
"-Dpolaris.readiness.ignore-severe-issues=true",
8484
)
8585
}

0 commit comments

Comments
 (0)