Skip to content

Commit ed029d1

Browse files
authored
[Catalog Federation] Block credential vending for remote tables outside allowed location list (#2791)
1 parent 491a9e3 commit ed029d1

File tree

4 files changed

+164
-45
lines changed

4 files changed

+164
-45
lines changed

integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ public class CatalogFederationIntegrationTest {
8686
private static String federatedCatalogName;
8787
private static String localCatalogRoleName;
8888
private static String federatedCatalogRoleName;
89-
private static URI storageBase;
89+
private static URI localStorageBase;
90+
private static URI remoteStorageBase;
91+
private static URI remoteStorageExtraAllowedLocationNs1;
92+
private static URI remoteStorageExtraAllowedLocationNs2;
9093
private static String endpoint;
9194

9295
private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
@@ -99,7 +102,6 @@ public class CatalogFederationIntegrationTest {
99102

100103
@TempDir static java.nio.file.Path warehouseDir;
101104

102-
private URI baseLocation;
103105
private PrincipalWithCredentials newUserCredentials;
104106

105107
@BeforeAll
@@ -112,8 +114,15 @@ static void setup(
112114
String adminToken = client.obtainToken(credentials);
113115
managementApi = client.managementApi(adminToken);
114116
catalogApi = client.catalogApi(adminToken);
115-
storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX);
116117
endpoint = minioAccess.s3endpoint();
118+
119+
localStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog");
120+
remoteStorageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/federated_catalog");
121+
// Allow credential vending for tables located under ns1
122+
remoteStorageExtraAllowedLocationNs1 =
123+
minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns1");
124+
remoteStorageExtraAllowedLocationNs2 =
125+
minioAccess.s3BucketUri(BUCKET_URI_PREFIX + "/local_catalog/ns2");
117126
}
118127

119128
@AfterAll
@@ -144,18 +153,17 @@ void after() {
144153
}
145154

146155
private void setupCatalogs() {
147-
baseLocation = storageBase;
148156
newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);
149157

150158
AwsStorageConfigInfo storageConfig =
151159
AwsStorageConfigInfo.builder()
152160
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
153161
.setPathStyleAccess(true)
154162
.setEndpoint(endpoint)
155-
.setAllowedLocations(List.of(baseLocation.toString()))
163+
.setAllowedLocations(List.of(localStorageBase.toString()))
156164
.build();
157165

158-
CatalogProperties catalogProperties = new CatalogProperties(baseLocation.toString());
166+
CatalogProperties catalogProperties = new CatalogProperties(localStorageBase.toString());
159167

160168
localCatalogName = "test_catalog_local_" + UUID.randomUUID().toString().replace("-", "");
161169
localCatalogRoleName = "test-catalog-role_" + UUID.randomUUID().toString().replace("-", "");
@@ -193,13 +201,26 @@ private void setupCatalogs() {
193201
.setRemoteCatalogName(localCatalogName)
194202
.setAuthenticationParameters(authParams)
195203
.build();
204+
CatalogProperties externalCatalogProperties =
205+
new CatalogProperties(remoteStorageBase.toString());
206+
AwsStorageConfigInfo externalStorageConfig =
207+
AwsStorageConfigInfo.builder()
208+
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
209+
.setPathStyleAccess(true)
210+
.setEndpoint(endpoint)
211+
.setAllowedLocations(
212+
List.of(
213+
remoteStorageBase.toString(),
214+
remoteStorageExtraAllowedLocationNs1.toString(),
215+
remoteStorageExtraAllowedLocationNs2.toString()))
216+
.build();
196217
ExternalCatalog externalCatalog =
197218
ExternalCatalog.builder()
198219
.setType(Catalog.TypeEnum.EXTERNAL)
199220
.setName(federatedCatalogName)
200221
.setConnectionConfigInfo(connectionConfig)
201-
.setProperties(catalogProperties)
202-
.setStorageConfigInfo(storageConfig)
222+
.setProperties(externalCatalogProperties)
223+
.setStorageConfigInfo(externalStorageConfig)
203224
.build();
204225
managementApi.createCatalog(externalCatalog);
205226
managementApi.createCatalogRole(federatedCatalogName, federatedCatalogRoleName);
@@ -244,6 +265,11 @@ private void setupExampleNamespacesAndTables() {
244265
spark.sql("INSERT INTO ns2.test_table VALUES (1, 'Apache Spark')");
245266
spark.sql("INSERT INTO ns2.test_table VALUES (2, 'Apache Iceberg')");
246267

268+
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns3");
269+
spark.sql("CREATE TABLE IF NOT EXISTS ns3.test_table (id int, name string)");
270+
spark.sql("INSERT INTO ns3.test_table VALUES (1, 'Apache Spark')");
271+
spark.sql("INSERT INTO ns3.test_table VALUES (2, 'Apache Iceberg')");
272+
247273
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1.ns1a");
248274
spark.sql("CREATE TABLE IF NOT EXISTS ns1.ns1a.test_table (id int, name string)");
249275
spark.sql("INSERT INTO ns1.ns1a.test_table VALUES (1, 'Alice')");
@@ -256,7 +282,7 @@ private void setupExampleNamespacesAndTables() {
256282
void testFederatedCatalogBasicReadWriteOperations() {
257283
spark.sql("USE " + federatedCatalogName);
258284
List<Row> namespaces = spark.sql("SHOW NAMESPACES").collectAsList();
259-
assertThat(namespaces).hasSize(2);
285+
assertThat(namespaces).hasSize(3);
260286
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
261287
List<Row> refNs1Data =
262288
spark
@@ -428,4 +454,45 @@ void testFederatedCatalogWithCredentialVending() {
428454
assertThat(localData.get(2).getInt(0)).isEqualTo(3);
429455
assertThat(localData.get(2).getString(1)).isEqualTo("Charlie");
430456
}
457+
458+
@Test
459+
void testFederatedCatalogNotVendCredentialForTablesOutsideAllowedLocations() {
460+
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
461+
462+
spark.sql("USE " + federatedCatalogName);
463+
464+
// Case 1: Only have TABLE_READ_DATA privilege
465+
TableGrant tableReadDataGrant =
466+
TableGrant.builder()
467+
.setType(GrantResource.TypeEnum.TABLE)
468+
.setPrivilege(TablePrivilege.TABLE_READ_DATA)
469+
.setNamespace(List.of("ns3"))
470+
.setTableName("test_table")
471+
.build();
472+
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);
473+
474+
// Verify that credential vending is blocked for table under ns3, even with enough privilege
475+
assertThatThrownBy(() -> spark.sql("SELECT * FROM ns3.test_table ORDER BY id").collectAsList())
476+
.isInstanceOf(ForbiddenException.class)
477+
.hasMessageContaining(
478+
"Table 'ns3.test_table' in remote catalog has locations outside catalog's allowed locations:");
479+
480+
// Case 3: TABLE_WRITE_DATA
481+
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);
482+
TableGrant tableWriteDataGrant =
483+
TableGrant.builder()
484+
.setType(GrantResource.TypeEnum.TABLE)
485+
.setPrivilege(TablePrivilege.TABLE_WRITE_DATA)
486+
.setNamespace(List.of("ns3"))
487+
.setTableName("test_table")
488+
.build();
489+
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableWriteDataGrant);
490+
491+
// Verify that credential vending is blocked for table under ns3, even with enough privilege
492+
assertThatThrownBy(
493+
() -> spark.sql("INSERT INTO ns3.test_table VALUES (3, 'Charlie')").collectAsList())
494+
.isInstanceOf(ForbiddenException.class)
495+
.hasMessageContaining(
496+
"Table 'ns3.test_table' in remote catalog has locations outside catalog's allowed locations:");
497+
}
431498
}

runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogUtils.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@
1919

2020
package org.apache.polaris.service.catalog.common;
2121

22+
import java.util.List;
23+
import java.util.Set;
24+
import java.util.stream.Collectors;
2225
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.exceptions.ForbiddenException;
27+
import org.apache.polaris.core.admin.model.StorageConfigInfo;
28+
import org.apache.polaris.core.config.RealmConfig;
2329
import org.apache.polaris.core.entity.PolarisEntitySubType;
2430
import org.apache.polaris.core.entity.PolarisEntityType;
2531
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
2632
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
33+
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
2734

2835
/** Utility methods for working with Polaris catalog entities. */
2936
public class CatalogUtils {
@@ -45,4 +52,45 @@ public static PolarisResolvedPathWrapper findResolvedStorageEntity(
4552
}
4653
return resolvedEntityView.getResolvedPath(tableIdentifier.namespace());
4754
}
55+
56+
/**
57+
* Validates that the specified {@code locations} are valid for whatever storage config is found
58+
* for the given entity's parent hierarchy.
59+
*
60+
* @param realmConfig the realm configuration
61+
* @param identifier the table identifier (for error messages)
62+
* @param locations the set of locations to validate (base location + write.data.path +
63+
* write.metadata.path)
64+
* @param resolvedStorageEntity the resolved path wrapper containing storage configuration
65+
* @throws ForbiddenException if any location is outside the allowed locations or if file
66+
* locations are not allowed
67+
*/
68+
public static void validateLocationsForTableLike(
69+
RealmConfig realmConfig,
70+
TableIdentifier identifier,
71+
Set<String> locations,
72+
PolarisResolvedPathWrapper resolvedStorageEntity) {
73+
74+
PolarisStorageConfigurationInfo.forEntityPath(
75+
realmConfig, resolvedStorageEntity.getRawFullPath())
76+
.ifPresentOrElse(
77+
restrictions -> restrictions.validate(realmConfig, identifier, locations),
78+
() -> {
79+
List<String> allowedStorageTypes =
80+
realmConfig.getConfig("SUPPORTED_CATALOG_STORAGE_TYPES");
81+
if (allowedStorageTypes != null
82+
&& !allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
83+
List<String> invalidLocations =
84+
locations.stream()
85+
.filter(
86+
location -> location.startsWith("file:") || location.startsWith("http"))
87+
.collect(Collectors.toList());
88+
if (!invalidLocations.isEmpty()) {
89+
throw new ForbiddenException(
90+
"Invalid locations '%s' for identifier '%s': File locations are not allowed",
91+
invalidLocations, identifier);
92+
}
93+
}
94+
});
95+
}
4896
}

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
import org.apache.iceberg.view.ViewUtil;
9191
import org.apache.polaris.core.PolarisCallContext;
9292
import org.apache.polaris.core.PolarisDiagnostics;
93-
import org.apache.polaris.core.admin.model.StorageConfigInfo;
9493
import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
9594
import org.apache.polaris.core.config.BehaviorChangeConfiguration;
9695
import org.apache.polaris.core.config.FeatureConfiguration;
@@ -122,7 +121,6 @@
122121
import org.apache.polaris.core.persistence.resolver.ResolverPath;
123122
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
124123
import org.apache.polaris.core.storage.PolarisStorageActions;
125-
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
126124
import org.apache.polaris.core.storage.StorageLocation;
127125
import org.apache.polaris.core.storage.StorageUtil;
128126
import org.apache.polaris.service.catalog.SupportsNotifications;
@@ -959,38 +957,8 @@ private void validateLocationForTableLike(
959957
TableIdentifier identifier,
960958
String location,
961959
PolarisResolvedPathWrapper resolvedStorageEntity) {
962-
validateLocationsForTableLike(identifier, Set.of(location), resolvedStorageEntity);
963-
}
964-
965-
/**
966-
* Validates that the specified {@code locations} are valid for whatever storage config is found
967-
* for this TableLike's parent hierarchy.
968-
*/
969-
private void validateLocationsForTableLike(
970-
TableIdentifier identifier,
971-
Set<String> locations,
972-
PolarisResolvedPathWrapper resolvedStorageEntity) {
973-
974-
PolarisStorageConfigurationInfo.forEntityPath(
975-
realmConfig, resolvedStorageEntity.getRawFullPath())
976-
.ifPresentOrElse(
977-
restrictions -> restrictions.validate(realmConfig, identifier, locations),
978-
() -> {
979-
List<String> allowedStorageTypes =
980-
realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_STORAGE_TYPES);
981-
if (!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
982-
List<String> invalidLocations =
983-
locations.stream()
984-
.filter(
985-
location -> location.startsWith("file:") || location.startsWith("http"))
986-
.collect(Collectors.toList());
987-
if (!invalidLocations.isEmpty()) {
988-
throw new ForbiddenException(
989-
"Invalid locations '%s' for identifier '%s': File locations are not allowed",
990-
invalidLocations, identifier);
991-
}
992-
}
993-
});
960+
CatalogUtils.validateLocationsForTableLike(
961+
realmConfig, identifier, Set.of(location), resolvedStorageEntity);
994962
}
995963

996964
/**
@@ -1486,7 +1454,8 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
14861454
// for the storage configuration inherited under this entity's path.
14871455
Set<String> dataLocations =
14881456
StorageUtil.getLocationsUsedByTable(metadata.location(), metadata.properties());
1489-
validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity);
1457+
CatalogUtils.validateLocationsForTableLike(
1458+
realmConfig, tableIdentifier, dataLocations, resolvedStorageEntity);
14901459
// also validate that the table location doesn't overlap an existing table
14911460
dataLocations.forEach(
14921461
location ->

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -810,11 +810,19 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
810810
if (baseCatalog instanceof IcebergCatalog
811811
|| realmConfig.getConfig(
812812
ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) {
813+
814+
Set<String> tableLocations = StorageUtil.getLocationsUsedByTable(tableMetadata);
815+
816+
// For non polaris' catalog, validate that table locations are within allowed locations
817+
if (!(baseCatalog instanceof IcebergCatalog)) {
818+
validateRemoteTableLocations(tableIdentifier, tableLocations, resolvedStoragePath);
819+
}
820+
813821
AccessConfig accessConfig =
814822
accessConfigProvider.getAccessConfig(
815823
callContext,
816824
tableIdentifier,
817-
StorageUtil.getLocationsUsedByTable(tableMetadata),
825+
tableLocations,
818826
actions,
819827
refreshCredentialsEndpoint,
820828
resolvedStoragePath);
@@ -842,6 +850,33 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) {
842850
return responseBuilder;
843851
}
844852

853+
private void validateRemoteTableLocations(
854+
TableIdentifier tableIdentifier,
855+
Set<String> tableLocations,
856+
PolarisResolvedPathWrapper resolvedStoragePath) {
857+
858+
try {
859+
// Delegate to common validation logic
860+
CatalogUtils.validateLocationsForTableLike(
861+
realmConfig, tableIdentifier, tableLocations, resolvedStoragePath);
862+
863+
LOGGER
864+
.atInfo()
865+
.addKeyValue("tableIdentifier", tableIdentifier)
866+
.addKeyValue("tableLocations", tableLocations)
867+
.log("Validated federated table locations");
868+
} catch (ForbiddenException e) {
869+
LOGGER
870+
.atError()
871+
.addKeyValue("tableIdentifier", tableIdentifier)
872+
.addKeyValue("tableLocations", tableLocations)
873+
.log("Federated table locations validation failed");
874+
throw new ForbiddenException(
875+
"Table '%s' in remote catalog has locations outside catalog's allowed locations: %s",
876+
tableIdentifier, e.getMessage());
877+
}
878+
}
879+
845880
private UpdateTableRequest applyUpdateFilters(UpdateTableRequest request) {
846881
// Certain MetadataUpdates need to be explicitly transformed to achieve the same behavior
847882
// as using a local Catalog client via TableBuilder.

0 commit comments

Comments
 (0)