diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index c52e9273f..4b3a23c84 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -1566,7 +1566,8 @@ private void revokeGrantRecord( PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { // get meta store session we should be using BasePersistence ms = callCtx.getMetaStore(); @@ -1608,7 +1609,8 @@ private void revokeGrantRecord( callCtx.getRealmConfig(), allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); return new ScopedCredentialsResult(accessConfig); } catch (Exception ex) { return new ScopedCredentialsResult( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index f707d1c29..1f38ebd8e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -315,7 +315,8 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { return delegate.getSubscopedCredsForEntity( callCtx, catalogId, @@ -323,7 +324,8 @@ public ScopedCredentialsResult getSubscopedCredsForEntity( entityType, allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); } @Override diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 7e1a4ce45..de27765e5 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -1976,7 +1976,8 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { // get meta store session we should be using TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); @@ -2009,7 +2010,8 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( callCtx.getRealmConfig(), allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); return new ScopedCredentialsResult(accessConfig); } catch (Exception ex) { return new ScopedCredentialsResult( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java index 8a30d7962..16eea08da 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java @@ -57,6 +57,17 @@ public String genericTables(Namespace ns) { "polaris", "v1", prefix, "namespaces", RESTUtil.encodeNamespace(ns), "generic-tables"); } + public String credentialsPath(TableIdentifier ident) { + return SLASH.join( + "v1", + prefix, + "namespaces", + RESTUtil.encodeNamespace(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "credentials"); + } + public String genericTable(TableIdentifier ident) { return SLASH.join( "polaris", diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java index 04022d233..a0060d43a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.storage; import jakarta.annotation.Nonnull; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.PolarisEntityType; @@ -47,5 +48,6 @@ ScopedCredentialsResult getSubscopedCredsForEntity( PolarisEntityType entityType, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations); + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java index c98982091..28df510ab 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nonnull; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.config.RealmConfig; @@ -61,7 +62,8 @@ public abstract AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations); + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint); /** * Validate access for the provided operation actions and locations. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java index 33526d2e2..80c63c347 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageAccessProperty.java @@ -18,6 +18,8 @@ */ package org.apache.polaris.core.storage; +import org.apache.iceberg.aws.AwsClientProperties; + /** * A subset of Iceberg catalog properties recognized by Polaris. * @@ -39,6 +41,18 @@ public enum StorageAccessProperty { Boolean.class, "s3.path-style-access", "whether to use S3 path style access", false), CLIENT_REGION( String.class, "client.region", "region to configure client for making requests to AWS"), + AWS_REFRESH_CREDENTIALS_ENABLED( + Boolean.class, + AwsClientProperties.REFRESH_CREDENTIALS_ENABLED, + "whether to enable automatic refresh of credentials", + true, + false), + AWS_REFRESH_CREDENTIALS_ENDPOINT( + String.class, + AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, + "the endpoint to use for refreshing credentials", + true, + false), GCS_ACCESS_TOKEN(String.class, "gcs.oauth2.token", "the gcs scoped access token"), GCS_ACCESS_TOKEN_EXPIRES_AT( @@ -52,6 +66,18 @@ public enum StorageAccessProperty { // it expects for SAS AZURE_ACCESS_TOKEN(String.class, "", "the azure scoped access token"), AZURE_SAS_TOKEN(String.class, "adls.sas-token.", "an azure shared access signature token"), + AZURE_REFRESH_CREDENTIALS_ENABLED( + Boolean.class, + "adls.refresh-credentials-enabled", + "whether to enable automatic refresh of credentials", + true, + false), + AZURE_REFRESH_CREDENTIALS_ENDPOINT( + String.class, + "adls.refresh-credentials-endpoint", + "the endpoint to use for refreshing credentials", + true, + false), EXPIRATION_TIME( Long.class, "expiration-time", diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 616fb1f4d..7c73b17c3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -74,7 +74,8 @@ public AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { int storageCredentialDurationSeconds = realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS); AwsStorageConfigurationInfo storageConfig = config(); @@ -120,6 +121,12 @@ public AccessConfig getSubscopedCreds( accessConfig.put(StorageAccessProperty.CLIENT_REGION, region); } + refreshCredentialsEndpoint.ifPresent( + endpoint -> { + accessConfig.put(StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENABLED, "true"); + accessConfig.put(StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENDPOINT, endpoint); + }); + URI endpointUri = storageConfig.getEndpointUri(); if (endpointUri != null) { accessConfig.put(StorageAccessProperty.AWS_ENDPOINT, endpointUri.toString()); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java index 50dd8c414..9c9469b2c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java @@ -46,6 +46,7 @@ import java.time.temporal.ChronoUnit; import java.util.HashSet; import java.util.Objects; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.AccessConfig; @@ -76,7 +77,8 @@ public AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { String loc = !allowedWriteLocations.isEmpty() ? allowedWriteLocations.stream().findAny().orElse(null) @@ -169,15 +171,25 @@ public AccessConfig getSubscopedCreds( String.format("Endpoint %s not supported", location.getEndpoint())); } - return toAccessConfig(sasToken, storageDnsName, sanitizedEndTime.toInstant()); + return toAccessConfig( + sasToken, storageDnsName, sanitizedEndTime.toInstant(), refreshCredentialsEndpoint); } @VisibleForTesting - static AccessConfig toAccessConfig(String sasToken, String storageDnsName, Instant expiresAt) { + static AccessConfig toAccessConfig( + String sasToken, + String storageDnsName, + Instant expiresAt, + Optional refreshCredentialsEndpoint) { AccessConfig.Builder accessConfig = AccessConfig.builder(); handleAzureCredential(accessConfig, sasToken, storageDnsName); accessConfig.put( StorageAccessProperty.EXPIRATION_TIME, String.valueOf(expiresAt.toEpochMilli())); + refreshCredentialsEndpoint.ifPresent( + endpoint -> { + accessConfig.put(StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT, endpoint); + accessConfig.put(StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENABLED, "true"); + }); return accessConfig.build(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index d8d88edc6..ab3b3cc4f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -105,7 +105,8 @@ public AccessConfig getOrGenerateSubScopeCreds( @Nonnull PolarisEntity polarisEntity, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { if (!isTypeSupported(polarisEntity.getType())) { callCtx .getDiagServices() @@ -117,7 +118,8 @@ public AccessConfig getOrGenerateSubScopeCreds( polarisEntity, allowListOperation, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); LOGGER.atDebug().addKeyValue("key", key).log("subscopedCredsCache"); Function loader = k -> { @@ -130,7 +132,8 @@ public AccessConfig getOrGenerateSubScopeCreds( polarisEntity.getType(), k.allowedListAction(), k.allowedReadLocations(), - k.allowedWriteLocations()); + k.allowedWriteLocations(), + refreshCredentialsEndpoint); if (scopedCredentialsResult.isSuccess()) { long maxCacheDurationMs = maxCacheDurationMs(callCtx.getRealmConfig()); return new StorageCredentialCacheEntry( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java index 79eba7d1d..8b9d0542d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.storage.cache; import jakarta.annotation.Nullable; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; @@ -47,12 +48,16 @@ public interface StorageCredentialCacheKey { @Value.Parameter(order = 6) Set allowedWriteLocations(); + @Value.Parameter(order = 7) + Optional refreshCredentialsEndpoint(); + static StorageCredentialCacheKey of( String realmId, PolarisEntity entity, boolean allowedListAction, Set allowedReadLocations, - Set allowedWriteLocations) { + Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { String storageConfigSerializedStr = entity .getInternalPropertiesAsMap() @@ -63,6 +68,7 @@ static StorageCredentialCacheKey of( storageConfigSerializedStr, allowedListAction, allowedReadLocations, - allowedWriteLocations); + allowedWriteLocations, + refreshCredentialsEndpoint); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java index 0120df2b1..27b1581c7 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; import org.apache.polaris.core.config.RealmConfig; @@ -75,7 +76,8 @@ public AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { try { sourceCredentials.refresh(); } catch (IOException e) { diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index fa7777814..9ba5271ab 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -21,6 +21,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.config.RealmConfig; @@ -197,7 +198,8 @@ public AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { return null; } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java index 89b60dba5..fab57532e 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegrationTest.java @@ -22,7 +22,9 @@ import static org.apache.polaris.core.storage.azure.AzureCredentialsStorageIntegration.toAccessConfig; import java.time.Instant; +import java.util.Optional; import org.apache.polaris.core.storage.AccessConfig; +import org.apache.polaris.core.storage.StorageAccessProperty; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -32,20 +34,36 @@ public class AzureCredentialsStorageIntegrationTest { public void testAzureCredentialFormatting() { Instant expiresAt = Instant.ofEpochMilli(Long.MAX_VALUE); - AccessConfig noSuffixResult = toAccessConfig("sasToken", "some_account", expiresAt); + AccessConfig noSuffixResult = + toAccessConfig("sasToken", "some_account", expiresAt, Optional.empty()); Assertions.assertThat(noSuffixResult.credentials()).hasSize(2); Assertions.assertThat(noSuffixResult.credentials()).containsKey("adls.sas-token.some_account"); + Assertions.assertThat(noSuffixResult.credentials()) + .doesNotContainKey( + StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName()); AccessConfig adlsSuffixResult = - toAccessConfig("sasToken", "some_account." + AzureLocation.ADLS_ENDPOINT, expiresAt); - Assertions.assertThat(adlsSuffixResult.credentials()).hasSize(3); + toAccessConfig( + "sasToken", + "some_account." + AzureLocation.ADLS_ENDPOINT, + expiresAt, + Optional.of("endpoint/credentials")); + Assertions.assertThat(adlsSuffixResult.credentials()).hasSize(5); Assertions.assertThat(adlsSuffixResult.credentials()) .containsKey("adls.sas-token.some_account"); Assertions.assertThat(adlsSuffixResult.credentials()) .containsKey("adls.sas-token.some_account." + AzureLocation.ADLS_ENDPOINT); + Assertions.assertThat(adlsSuffixResult.credentials()) + .containsEntry( + StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName(), + "endpoint/credentials"); + Assertions.assertThat(adlsSuffixResult.credentials()) + .containsEntry( + StorageAccessProperty.AZURE_REFRESH_CREDENTIALS_ENABLED.getPropertyName(), "true"); AccessConfig blobSuffixResult = - toAccessConfig("sasToken", "some_account." + AzureLocation.BLOB_ENDPOINT, expiresAt); + toAccessConfig( + "sasToken", "some_account." + AzureLocation.BLOB_ENDPOINT, expiresAt, Optional.empty()); Assertions.assertThat(blobSuffixResult.credentials()).hasSize(3); Assertions.assertThat(blobSuffixResult.credentials()) .containsKey("adls.sas-token.some_account"); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java index 07b233228..e1611411e 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.polaris.core.PolarisCallContext; @@ -88,7 +89,8 @@ public void testBadResult() { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(badResult); PolarisEntity polarisEntity = new PolarisEntity( @@ -102,7 +104,8 @@ public void testBadResult() { polarisEntity, true, Set.of("s3://bucket1/path"), - Set.of("s3://bucket3/path"))) + Set.of("s3://bucket3/path"), + Optional.empty())) .isInstanceOf(UnprocessableEntityException.class) .hasMessage("Failed to get subscoped credentials: extra_error_info"); } @@ -120,7 +123,8 @@ public void testCacheHit() { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(1)); @@ -136,7 +140,8 @@ public void testCacheHit() { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); // subscope for the same entity and same allowed locations, will hit the cache @@ -146,7 +151,8 @@ public void testCacheHit() { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1); } @@ -163,7 +169,8 @@ public void testCacheEvict() throws InterruptedException { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); @@ -177,7 +184,8 @@ public void testCacheEvict() throws InterruptedException { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); // the entry will be evicted immediately because the token is expired storageCredentialCache.getOrGenerateSubScopeCreds( @@ -186,7 +194,8 @@ public void testCacheEvict() throws InterruptedException { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -195,7 +204,8 @@ public void testCacheEvict() throws InterruptedException { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); storageCredentialCache.getOrGenerateSubScopeCreds( @@ -204,7 +214,8 @@ public void testCacheEvict() throws InterruptedException { polarisEntity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); } @@ -221,7 +232,8 @@ public void testCacheGenerateNewEntries() { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); @@ -235,7 +247,8 @@ public void testCacheGenerateNewEntries() { entity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // update the entity's storage config, since StorageConfig changed, cache will generate new @@ -252,7 +265,8 @@ public void testCacheGenerateNewEntries() { PolarisEntity.of(updateEntity), /* allowedListAction= */ true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // allowedListAction changed to different value FALSE, will generate new entry @@ -263,7 +277,8 @@ public void testCacheGenerateNewEntries() { entity, /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedWriteLocations, will generate new entry @@ -274,7 +289,8 @@ public void testCacheGenerateNewEntries() { entity, /* allowedListAction= */ false, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://differentbucket/path")); + Set.of("s3://differentbucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedReadLocations, will generate new try @@ -290,7 +306,8 @@ public void testCacheGenerateNewEntries() { PolarisEntity.of(updateEntity), /* allowedListAction= */ false, Set.of("s3://differentbucket/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); + Set.of("s3://bucket/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } } @@ -309,7 +326,8 @@ public void testCacheNotAffectedBy() { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(mockedScopedCreds.get(0)) .thenReturn(mockedScopedCreds.get(1)) .thenReturn(mockedScopedCreds.get(2)); @@ -321,7 +339,8 @@ public void testCacheNotAffectedBy() { entity, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); } Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); @@ -333,7 +352,8 @@ public void testCacheNotAffectedBy() { new PolarisEntity(new PolarisBaseEntity.Builder(entity).id(1234).build()), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -345,7 +365,8 @@ public void testCacheNotAffectedBy() { new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } // order of the allowedReadLocations does not affect the cache @@ -356,7 +377,8 @@ public void testCacheNotAffectedBy() { new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket2/path", "s3://bucket1/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } @@ -368,7 +390,8 @@ public void testCacheNotAffectedBy() { new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), true, Set.of("s3://bucket2/path", "s3://bucket1/path"), - Set.of("s3://bucket4/path", "s3://bucket3/path")); + Set.of("s3://bucket4/path", "s3://bucket3/path"), + Optional.empty()); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); } } @@ -450,7 +473,8 @@ public void testExtraProperties() { Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), - Mockito.anySet())) + Mockito.anySet(), + Mockito.any())) .thenReturn(properties); List entityList = getPolarisEntities(); @@ -461,7 +485,8 @@ public void testExtraProperties() { entityList.get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); + Set.of("s3://bucket3/path", "s3://bucket4/path"), + Optional.empty()); Assertions.assertThat(config.credentials()) .containsExactly(Map.entry("s3.secret-access-key", "super-secret-123")); Assertions.assertThat(config.extraProperties()) diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java index 10ba4b908..44f3b86c5 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java @@ -23,6 +23,7 @@ import jakarta.annotation.Nonnull; import java.time.Instant; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; @@ -95,12 +96,18 @@ public void testGetSubscopedCreds(String scheme) { EMPTY_REALM_CONFIG, true, Set.of(warehouseDir + "/namespace/table"), - Set.of(warehouseDir + "/namespace/table")); + Set.of(warehouseDir + "/namespace/table"), + Optional.of("/namespace/table/credentials")); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") .containsEntry(StorageAccessProperty.AWS_KEY_ID.getPropertyName(), "accessKey") .containsEntry(StorageAccessProperty.AWS_SECRET_KEY.getPropertyName(), "secretKey") + .containsEntry( + StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENABLED.getPropertyName(), "true") + .containsEntry( + StorageAccessProperty.AWS_REFRESH_CREDENTIALS_ENDPOINT.getPropertyName(), + "/namespace/table/credentials") .containsEntry( StorageAccessProperty.AWS_SESSION_TOKEN_EXPIRES_AT_MS.getPropertyName(), String.valueOf(EXPIRE_TIME.toEpochMilli())); @@ -242,7 +249,8 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { EMPTY_REALM_CONFIG, true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of(s3Path(bucket, firstPath)))) + Set.of(s3Path(bucket, firstPath)), + null)) .isInstanceOf(IllegalArgumentException.class); break; case AWS_PARTITION: @@ -260,7 +268,8 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { EMPTY_REALM_CONFIG, true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of(s3Path(bucket, firstPath))); + Set.of(s3Path(bucket, firstPath)), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -360,7 +369,8 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { EMPTY_REALM_CONFIG, false, /* allowList = false*/ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of(s3Path(bucket, firstPath))); + Set.of(s3Path(bucket, firstPath)), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -454,7 +464,8 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), - Set.of()); + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -516,7 +527,12 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { .region("us-east-2") .build(), stsClient) - .getSubscopedCreds(EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of()); + .getSubscopedCreds( + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), "sess") @@ -554,7 +570,11 @@ public void testClientRegion(String awsPartition) { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of())) + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty())) .isInstanceOf(IllegalArgumentException.class); break; case AWS_PARTITION: @@ -569,7 +589,11 @@ public void testClientRegion(String awsPartition) { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of()); + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .containsEntry(StorageAccessProperty.CLIENT_REGION.getPropertyName(), clientRegion); @@ -604,7 +628,11 @@ public void testNoClientRegion(String awsPartition) { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of()); + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty()); assertThat(accessConfig.credentials()) .isNotEmpty() .doesNotContainKey(StorageAccessProperty.CLIENT_REGION.getPropertyName()); @@ -621,7 +649,11 @@ public void testNoClientRegion(String awsPartition) { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, true, /* allowList = true */ Set.of(), Set.of())) + EMPTY_REALM_CONFIG, + true, /* allowList = true */ + Set.of(), + Set.of(), + Optional.empty())) .isInstanceOf(IllegalArgumentException.class); break; default: diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java index 768f4c330..f2818738f 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java @@ -44,6 +44,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; @@ -352,7 +353,8 @@ private AccessConfig subscopedCredsForOperations( EMPTY_REALM_CONFIG, allowListAction, new HashSet<>(allowedReadLoc), - new HashSet<>(allowedWriteLoc)); + new HashSet<>(allowedWriteLoc), + Optional.empty()); } private BlobContainerClient createContainerClient( diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java index da890627e..9df6ab926 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java @@ -41,6 +41,7 @@ import java.util.Date; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; @@ -170,7 +171,8 @@ private AccessConfig subscopedCredsForOperations( EMPTY_REALM_CONFIG, allowListAction, new HashSet<>(allowedReadLoc), - new HashSet<>(allowedWriteLoc)); + new HashSet<>(allowedWriteLoc), + Optional.empty()); } @Test diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 6a93da886..ef09478f8 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -42,12 +42,14 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.PositionOutputStream; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.auth.OAuth2Properties; +import org.apache.iceberg.rest.credentials.Credential; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.types.Types; import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; @@ -267,6 +269,15 @@ public LoadTableResponse doTestCreateTable(RESTCatalog restCatalog) throws IOExc LoadTableResponse loadTableResponse = catalogApi.loadTableWithAccessDelegation(catalogName, id, "ALL"); assertThat(loadTableResponse.config()).containsKey("s3.endpoint"); + assertThat(loadTableResponse.credentials().stream().map(Credential::config)) + .allSatisfy( + c -> + assertThat(c) + .containsEntry( + AwsClientProperties.REFRESH_CREDENTIALS_ENABLED, Boolean.TRUE.toString()) + .containsEntry( + AwsClientProperties.REFRESH_CREDENTIALS_ENDPOINT, + "v1/" + catalogName + "/namespaces/test-ns/tables/t1/credentials")); restCatalog.dropTable(id); assertThat(restCatalog.tableExists(id)).isFalse(); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 7eec03595..e40090562 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -824,7 +824,8 @@ public boolean sendNotification( public AccessConfig getAccessConfig( TableIdentifier tableIdentifier, TableMetadata tableMetadata, - Set storageActions) { + Set storageActions, + Optional refreshCredentialsEndpoint) { Optional storageInfo = findStorageInfo(tableIdentifier); if (storageInfo.isEmpty()) { LOGGER @@ -840,7 +841,8 @@ public AccessConfig getAccessConfig( tableIdentifier, StorageUtil.getLocationsAllowedToBeAccessed(tableMetadata), storageActions, - storageInfo.get()); + storageInfo.get(), + refreshCredentialsEndpoint); } private String buildPrefixedLocation(TableIdentifier tableIdentifier) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 76401582a..860476cf8 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -75,6 +75,7 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.persistence.resolver.ResolverStatus; import org.apache.polaris.core.rest.PolarisEndpoints; +import org.apache.polaris.core.rest.PolarisResourcePaths; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.service.catalog.AccessDelegationMode; import org.apache.polaris.service.catalog.CatalogPrefixParser; @@ -359,12 +360,18 @@ public Response createTable( securityContext, prefix, catalog -> { + Optional refreshCredentialsEndpoint = + getRefreshCredentialsEndpoint( + delegationModes, + prefix, + TableIdentifier.of(namespace, createTableRequest.name())); if (createTableRequest.stageCreate()) { if (delegationModes.isEmpty()) { return Response.ok(catalog.createTableStaged(ns, createTableRequest)).build(); } else { return Response.ok( - catalog.createTableStagedWithWriteDelegation(ns, createTableRequest)) + catalog.createTableStagedWithWriteDelegation( + ns, createTableRequest, refreshCredentialsEndpoint)) .build(); } } else if (delegationModes.isEmpty()) { @@ -374,7 +381,8 @@ public Response createTable( .build(); } else { LoadTableResponse response = - catalog.createTableDirectWithWriteDelegation(ns, createTableRequest); + catalog.createTableDirectWithWriteDelegation( + ns, createTableRequest, refreshCredentialsEndpoint); return tryInsertETagHeader( Response.ok(response), response, namespace, createTableRequest.name()) .build(); @@ -430,9 +438,12 @@ public Response loadTable( .loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots) .orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED)); } else { + Optional refreshCredentialsEndpoint = + getRefreshCredentialsEndpoint(delegationModes, prefix, tableIdentifier); response = catalog - .loadTableWithAccessDelegationIfStale(tableIdentifier, ifNoneMatch, snapshots) + .loadTableWithAccessDelegationIfStale( + tableIdentifier, ifNoneMatch, snapshots, refreshCredentialsEndpoint) .orElseThrow(() -> new WebApplicationException(Response.Status.NOT_MODIFIED)); } @@ -440,6 +451,15 @@ public Response loadTable( }); } + private static Optional getRefreshCredentialsEndpoint( + EnumSet delegationModes, + String prefix, + TableIdentifier tableIdentifier) { + return delegationModes.contains(AccessDelegationMode.VENDED_CREDENTIALS) + ? Optional.of(new PolarisResourcePaths(prefix).credentialsPath(tableIdentifier)) + : Optional.empty(); + } + @Override public Response tableExists( String prefix, @@ -599,7 +619,10 @@ public Response loadCredentials( prefix, catalog -> { LoadTableResponse loadTableResponse = - catalog.loadTableWithAccessDelegation(tableIdentifier, "all"); + catalog.loadTableWithAccessDelegation( + tableIdentifier, + "all", + Optional.of(new PolarisResourcePaths(prefix).credentialsPath(tableIdentifier))); return Response.ok( ImmutableLoadCredentialsResponse.builder() .credentials(loadTableResponse.credentials()) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 33b4bef06..74c64202d 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -393,7 +393,9 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque * @return ETagged {@link LoadTableResponse} to uniquely identify the table metadata */ public LoadTableResponse createTableDirectWithWriteDelegation( - Namespace namespace, CreateTableRequest request) { + Namespace namespace, + CreateTableRequest request, + Optional refreshCredentialsEndpoint) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION; authorizeCreateTableLikeUnderNamespaceOperationOrThrow( @@ -437,7 +439,8 @@ public LoadTableResponse createTableDirectWithWriteDelegation( PolarisStorageActions.READ, PolarisStorageActions.WRITE, PolarisStorageActions.LIST), - SNAPSHOTS_ALL) + SNAPSHOTS_ALL, + refreshCredentialsEndpoint) .build(); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now @@ -510,7 +513,9 @@ public LoadTableResponse createTableStaged(Namespace namespace, CreateTableReque } public LoadTableResponse createTableStagedWithWriteDelegation( - Namespace namespace, CreateTableRequest request) { + Namespace namespace, + CreateTableRequest request, + Optional refreshCredentialsEndpoint) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION; authorizeCreateTableLikeUnderNamespaceOperationOrThrow( @@ -529,7 +534,11 @@ public LoadTableResponse createTableStagedWithWriteDelegation( TableMetadata metadata = stageTableCreateHelper(namespace, request); return buildLoadTableResponseWithDelegationCredentials( - ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL) + ident, + metadata, + Set.of(PolarisStorageActions.ALL), + SNAPSHOTS_ALL, + refreshCredentialsEndpoint) .build(); } @@ -643,8 +652,12 @@ public Optional loadTableIfStale( } public LoadTableResponse loadTableWithAccessDelegation( - TableIdentifier tableIdentifier, String snapshots) { - return loadTableWithAccessDelegationIfStale(tableIdentifier, null, snapshots).get(); + TableIdentifier tableIdentifier, + String snapshots, + Optional refreshCredentialsEndpoint) { + return loadTableWithAccessDelegationIfStale( + tableIdentifier, null, snapshots, refreshCredentialsEndpoint) + .get(); } /** @@ -658,7 +671,10 @@ public LoadTableResponse loadTableWithAccessDelegation( * load table response, otherwise */ public Optional loadTableWithAccessDelegationIfStale( - TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String snapshots) { + TableIdentifier tableIdentifier, + IfNoneMatch ifNoneMatch, + String snapshots, + Optional refreshCredentialsEndpoint) { // Here we have a single method that falls through multiple candidate // PolarisAuthorizableOperations because instead of identifying the desired operation up-front // and @@ -729,7 +745,11 @@ public Optional loadTableWithAccessDelegationIfStale( TableMetadata tableMetadata = baseTable.operations().current(); return Optional.of( buildLoadTableResponseWithDelegationCredentials( - tableIdentifier, tableMetadata, actionsRequested, snapshots) + tableIdentifier, + tableMetadata, + actionsRequested, + snapshots, + refreshCredentialsEndpoint) .build()); } else if (table instanceof BaseMetadataTable) { // metadata tables are loaded on the client side, return NoSuchTableException for now @@ -743,7 +763,8 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential TableIdentifier tableIdentifier, TableMetadata tableMetadata, Set actions, - String snapshots) { + String snapshots, + Optional refreshCredentialsEndpoint) { LoadTableResponse.Builder responseBuilder = LoadTableResponse.builder().withTableMetadata(tableMetadata); if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) { @@ -753,7 +774,8 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential .addKeyValue("tableLocation", tableMetadata.location()) .log("Fetching client credentials for table"); AccessConfig accessConfig = - credentialDelegation.getAccessConfig(tableIdentifier, tableMetadata, actions); + credentialDelegation.getAccessConfig( + tableIdentifier, tableMetadata, actions, refreshCredentialsEndpoint); Map credentialConfig = accessConfig.credentials(); responseBuilder.addAllConfig(credentialConfig); responseBuilder.addAllConfig(accessConfig.extraProperties()); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java index 21ec380eb..b85973ed8 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.catalog.iceberg; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.TableIdentifier; @@ -35,5 +36,6 @@ public interface SupportsCredentialDelegation { AccessConfig getAccessConfig( TableIdentifier tableIdentifier, TableMetadata tableMetadata, - Set storageActions); + Set storageActions, + Optional refreshCredentialsEndpoint); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index d2c73e268..81f6e7c8f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -91,7 +91,8 @@ public FileIO loadFileIO( identifier, tableLocations, storageActions, - storageInfo)); + storageInfo, + Optional.empty())); // Update the FileIO with the subscoped credentials // Update with properties in case there are table-level overrides the credentials should diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java index c5ef12d78..f4a6320d6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java @@ -81,7 +81,8 @@ public static AccessConfig refreshAccessConfig( TableIdentifier tableIdentifier, Set tableLocations, Set storageActions, - PolarisEntity entity) { + PolarisEntity entity, + Optional refreshCredentialsEndpoint) { boolean skipCredentialSubscopingIndirection = callContext @@ -111,7 +112,8 @@ public static AccessConfig refreshAccessConfig( entity, allowList, tableLocations, - writeLocations); + writeLocations, + refreshCredentialsEndpoint); LOGGER .atDebug() .addKeyValue("tableIdentifier", tableIdentifier) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java index e07bdd082..e04a9525b 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java @@ -113,7 +113,8 @@ public AccessConfig getSubscopedCreds( @Nonnull RealmConfig realmConfig, boolean allowListOperation, @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { + @Nonnull Set allowedWriteLocations, + Optional refreshCredentialsEndpoint) { return AccessConfig.builder().build(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java index 38c4db7e5..ca4bc8d6a 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Function; @@ -1828,7 +1829,8 @@ public void testDropTableWithPurge() { taskEntity.getType(), true, Set.of(tableMetadata.location()), - Set.of(tableMetadata.location())) + Set.of(tableMetadata.location()), + Optional.empty()) .getAccessConfig() .credentials(); Assertions.assertThat(credentials) diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java index a3ab18e3f..a8090f038 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/IcebergCatalogHandlerAuthzTest.java @@ -26,6 +26,7 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -616,7 +617,8 @@ public void testCreateTableDirectWithWriteDelegationAllSufficientPrivileges() { Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest); + .createTableDirectWithWriteDelegation( + NS2, createDirectWithWriteDelegationRequest, Optional.empty()); }, () -> { newWrapper(Set.of(PRINCIPAL_ROLE2)).dropTableWithPurge(newtable); @@ -646,7 +648,8 @@ public void testCreateTableDirectWithWriteDelegationInsufficientPermissions() { PolarisPrivilege.TABLE_LIST), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableDirectWithWriteDelegation(NS2, createDirectWithWriteDelegationRequest); + .createTableDirectWithWriteDelegation( + NS2, createDirectWithWriteDelegationRequest, Optional.empty()); }); } @@ -719,7 +722,8 @@ public void testCreateTableStagedWithWriteDelegationAllSufficientPrivileges() { Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableStagedWithWriteDelegation(NS2, createStagedWithWriteDelegationRequest); + .createTableStagedWithWriteDelegation( + NS2, createStagedWithWriteDelegationRequest, Optional.empty()); }, // createTableStagedWithWriteDelegation doesn't actually commit any metadata null, @@ -748,7 +752,8 @@ public void testCreateTableStagedWithWriteDelegationInsufficientPermissions() { PolarisPrivilege.TABLE_LIST), () -> { newWrapper(Set.of(PRINCIPAL_ROLE1)) - .createTableStagedWithWriteDelegation(NS2, createStagedWithWriteDelegationRequest); + .createTableStagedWithWriteDelegation( + NS2, createStagedWithWriteDelegationRequest, Optional.empty()); }); } @@ -892,7 +897,7 @@ public void testLoadTableWithReadAccessDelegationSufficientPrivileges() { PolarisPrivilege.TABLE_READ_DATA, PolarisPrivilege.TABLE_WRITE_DATA, PolarisPrivilege.CATALOG_MANAGE_CONTENT), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"), + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty()), null /* cleanupAction */); } @@ -908,7 +913,7 @@ public void testLoadTableWithReadAccessDelegationInsufficientPermissions() { PolarisPrivilege.TABLE_CREATE, PolarisPrivilege.TABLE_LIST, PolarisPrivilege.TABLE_DROP), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all")); + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty())); } @Test @@ -921,7 +926,7 @@ public void testLoadTableWithWriteAccessDelegationSufficientPrivileges() { PolarisPrivilege.TABLE_READ_DATA, PolarisPrivilege.TABLE_WRITE_DATA, PolarisPrivilege.CATALOG_MANAGE_CONTENT), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all"), + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty()), null /* cleanupAction */); } @@ -937,7 +942,7 @@ public void testLoadTableWithWriteAccessDelegationInsufficientPermissions() { PolarisPrivilege.TABLE_CREATE, PolarisPrivilege.TABLE_LIST, PolarisPrivilege.TABLE_DROP), - () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all")); + () -> newWrapper().loadTableWithAccessDelegation(TABLE_NS1A_2, "all", Optional.empty())); } @Test @@ -950,7 +955,7 @@ public void testLoadTableWithReadAccessDelegationIfStaleSufficientPrivileges() { () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"), + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty()), null /* cleanupAction */); } @@ -969,7 +974,7 @@ public void testLoadTableWithReadAccessDelegationIfStaleInsufficientPermissions( () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all")); + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty())); } @Test @@ -985,7 +990,7 @@ public void testLoadTableWithWriteAccessDelegationIfStaleSufficientPrivileges() () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all"), + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty()), null /* cleanupAction */); } @@ -1004,7 +1009,7 @@ public void testLoadTableWithWriteAccessDelegationIfStaleInsufficientPermissions () -> newWrapper() .loadTableWithAccessDelegationIfStale( - TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all")); + TABLE_NS1A_2, IfNoneMatch.fromHeader("W/\"0:0\""), "all", Optional.empty())); } @Test