diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/EclipseLinkPolarisMetaStoreManagerFactory.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/EclipseLinkPolarisMetaStoreManagerFactory.java index 16c1bb8f44..46fd33821f 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/EclipseLinkPolarisMetaStoreManagerFactory.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/EclipseLinkPolarisMetaStoreManagerFactory.java @@ -31,7 +31,6 @@ import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; -import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; /** * The implementation of Configuration interface for configuring the {@link PolarisMetaStoreManager} @@ -43,17 +42,20 @@ public class EclipseLinkPolarisMetaStoreManagerFactory extends LocalPolarisMetaStoreManagerFactory { - @Inject EclipseLinkConfiguration eclipseLinkConfiguration; - @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; + private final EclipseLinkConfiguration eclipseLinkConfiguration; @SuppressWarnings("unused") // Required by CDI protected EclipseLinkPolarisMetaStoreManagerFactory() { - this(null, null); + this(null, null, null); } @Inject - protected EclipseLinkPolarisMetaStoreManagerFactory(Clock clock, PolarisDiagnostics diagnostics) { + protected EclipseLinkPolarisMetaStoreManagerFactory( + Clock clock, + PolarisDiagnostics diagnostics, + EclipseLinkConfiguration eclipseLinkConfiguration) { super(clock, diagnostics); + this.eclipseLinkConfiguration = eclipseLinkConfiguration; } @Override @@ -69,7 +71,6 @@ protected TransactionalPersistence createMetaStoreSession( @Nonnull PolarisDiagnostics diagnostics) { return new PolarisEclipseLinkMetaStoreSessionImpl( store, - storageIntegrationProvider, realmContext, configurationFile(), persistenceUnitName(), diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index cf2e9053b1..61857de131 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -53,7 +53,6 @@ import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.exceptions.AlreadyExistsException; -import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; import org.apache.polaris.core.persistence.pagination.EntityIdToken; @@ -61,9 +60,6 @@ import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; -import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.extension.persistence.impl.eclipselink.models.ModelEntity; import org.apache.polaris.extension.persistence.impl.eclipselink.models.ModelEntityActive; import org.apache.polaris.extension.persistence.impl.eclipselink.models.ModelEntityChangeTracking; @@ -93,21 +89,18 @@ public class PolarisEclipseLinkMetaStoreSessionImpl extends AbstractTransactiona private final ThreadLocal localSession = new ThreadLocal<>(); private final PolarisEclipseLinkStore store; - private final PolarisStorageIntegrationProvider storageIntegrationProvider; private final PrincipalSecretsGenerator secretsGenerator; /** * Create a meta store session against provided realm. Each realm has its own database. * * @param store Backing store of EclipseLink implementation - * @param storageIntegrationProvider Storage integration provider * @param realmContext Realm context used to communicate with different database. * @param confFile Optional EclipseLink configuration file. Default to 'META-INF/persistence.xml'. * @param persistenceUnitName Optional persistence-unit name in confFile. Default to 'polaris'. */ public PolarisEclipseLinkMetaStoreSessionImpl( @Nonnull PolarisEclipseLinkStore store, - @Nonnull PolarisStorageIntegrationProvider storageIntegrationProvider, @Nonnull RealmContext realmContext, @Nullable String confFile, @Nullable String persistenceUnitName, @@ -121,7 +114,6 @@ public PolarisEclipseLinkMetaStoreSessionImpl( try (EntityManager session = emf.createEntityManager()) { this.store.initialize(session); } - this.storageIntegrationProvider = storageIntegrationProvider; this.secretsGenerator = secretsGenerator; } @@ -276,16 +268,6 @@ public void writeToEntitiesInCurrentTxn( this.store.writeToEntities(localSession.get(), entity); } - /** {@inheritDoc} */ - @Override - public - void persistStorageIntegrationIfNeededInCurrentTxn( - @Nonnull PolarisCallContext callContext, - @Nonnull PolarisBaseEntity entity, - @Nullable PolarisStorageIntegration storageIntegration) { - // not implemented for eclipselink store - } - /** {@inheritDoc} */ @Override public void writeToEntitiesActiveInCurrentTxn( @@ -661,28 +643,6 @@ public void deletePrincipalSecretsInCurrentTxn( this.store.deletePrincipalSecrets(localSession.get(), clientId); } - /** {@inheritDoc} */ - @Override - public @Nullable - PolarisStorageIntegration createStorageIntegrationInCurrentTxn( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) { - return storageIntegrationProvider.getStorageIntegrationForConfig( - polarisStorageConfigurationInfo); - } - - /** {@inheritDoc} */ - @Override - public @Nullable - PolarisStorageIntegration loadPolarisStorageIntegrationInCurrentTxn( - @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { - PolarisStorageConfigurationInfo storageConfig = - BaseMetaStoreManager.extractStorageConfiguration(callCtx.getDiagServices(), entity); - return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig); - } - /** {@inheritDoc} */ @Override public void writeToPolicyMappingRecordsInCurrentTxn( diff --git a/persistence/eclipselink/src/test/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreManagerTest.java b/persistence/eclipselink/src/test/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreManagerTest.java index aa3a6a3d55..d6a99e269f 100644 --- a/persistence/eclipselink/src/test/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreManagerTest.java +++ b/persistence/eclipselink/src/test/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreManagerTest.java @@ -48,7 +48,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.Mockito; /** * Integration test for EclipseLink based metastore implementation @@ -86,7 +85,7 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { RealmContext realmContext = () -> "realm"; PolarisEclipseLinkMetaStoreSessionImpl session = new PolarisEclipseLinkMetaStoreSessionImpl( - store, Mockito.mock(), realmContext, null, "polaris", RANDOM_SECRETS); + store, realmContext, null, "polaris", RANDOM_SECRETS); TransactionalMetaStoreManagerImpl metaStoreManager = new TransactionalMetaStoreManagerImpl(clock); PolarisCallContext callCtx = new PolarisCallContext(realmContext, session, diagServices); @@ -104,7 +103,7 @@ void testCreateStoreSession(String confFile, boolean success) { try { var session = new PolarisEclipseLinkMetaStoreSessionImpl( - store, Mockito.mock(), () -> "realm", confFile, "polaris", RANDOM_SECRETS); + store, () -> "realm", confFile, "polaris", RANDOM_SECRETS); assertNotNull(session); assertTrue(success); } catch (Exception e) { diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 76da5fa92a..247c141a34 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -47,7 +47,6 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; -import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.EntityAlreadyExistsException; import org.apache.polaris.core.persistence.IntegrationPersistence; @@ -60,9 +59,6 @@ import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; -import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageLocation; import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity; import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord; @@ -78,7 +74,6 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers private final DatasourceOperations datasourceOperations; private final PrincipalSecretsGenerator secretsGenerator; - private final PolarisStorageIntegrationProvider storageIntegrationProvider; private final String realmId; private final int schemaVersion; @@ -88,12 +83,10 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers public JdbcBasePersistenceImpl( DatasourceOperations databaseOperations, PrincipalSecretsGenerator secretsGenerator, - PolarisStorageIntegrationProvider storageIntegrationProvider, String realmId, int schemaVersion) { this.datasourceOperations = databaseOperations; this.secretsGenerator = secretsGenerator; - this.storageIntegrationProvider = storageIntegrationProvider; this.realmId = realmId; this.schemaVersion = schemaVersion; } @@ -1103,34 +1096,6 @@ private List fetchPolicyMappingRecords( } } - @Nullable - @Override - public - PolarisStorageIntegration createStorageIntegration( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) { - return storageIntegrationProvider.getStorageIntegrationForConfig( - polarisStorageConfigurationInfo); - } - - @Override - public void persistStorageIntegrationIfNeeded( - @Nonnull PolarisCallContext callContext, - @Nonnull PolarisBaseEntity entity, - @Nullable PolarisStorageIntegration storageIntegration) {} - - @Nullable - @Override - public - PolarisStorageIntegration loadPolarisStorageIntegration( - @Nonnull PolarisCallContext callContext, @Nonnull PolarisBaseEntity entity) { - PolarisStorageConfigurationInfo storageConfig = - BaseMetaStoreManager.extractStorageConfiguration(callContext.getDiagServices(), entity); - return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig); - } - @FunctionalInterface private interface QueryAction { Integer apply(Connection connection, QueryGenerator.PreparedQuery query) throws SQLException; diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index c8f05e3b33..f13791915a 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -105,7 +105,6 @@ private void initializeForRealm( new JdbcBasePersistenceImpl( datasourceOperations, secretsGenerator(realmId, rootCredentialsSet), - storageIntegrationProvider, realmId, schemaVersion)); diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java index c389709943..85fd9580c7 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/AtomicMetastoreManagerWithJdbcBasePersistenceImplTest.java @@ -32,7 +32,6 @@ import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest; import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; import org.h2.jdbcx.JdbcConnectionPool; -import org.mockito.Mockito; public class AtomicMetastoreManagerWithJdbcBasePersistenceImplTest extends BasePolarisMetaStoreManagerTest { @@ -64,11 +63,7 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { RealmContext realmContext = () -> "REALM"; JdbcBasePersistenceImpl basePersistence = new JdbcBasePersistenceImpl( - datasourceOperations, - RANDOM_SECRETS, - Mockito.mock(), - realmContext.getRealmIdentifier(), - schemaVersion); + datasourceOperations, RANDOM_SECRETS, realmContext.getRealmIdentifier(), schemaVersion); AtomicOperationMetaStoreManager metaStoreManager = new AtomicOperationMetaStoreManager(clock); PolarisCallContext callCtx = new PolarisCallContext(realmContext, basePersistence, diagServices); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java index 622df1fca6..a3343c7a04 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/CatalogEntity.java @@ -141,6 +141,7 @@ private StorageConfigInfo getStorageInfo(Map internalProperties) .setEndpoint(awsConfig.getEndpoint()) .setStsEndpoint(awsConfig.getStsEndpoint()) .setPathStyleAccess(awsConfig.getPathStyleAccess()) + .setEndpointInternal(awsConfig.getEndpointInternal()) .build(); } if (configInfo instanceof AzureStorageConfigurationInfo) { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 0a09b20ea3..b9b2972900 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -64,16 +64,12 @@ import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; -import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; import org.apache.polaris.core.policy.PolicyType; -import org.apache.polaris.core.storage.AccessConfig; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -433,26 +429,6 @@ private void revokeGrantRecord( // validate input callCtx.getDiagServices().checkNotNull(catalog, "unexpected_null_catalog"); - Map internalProp = getInternalPropertyMap(catalog); - String integrationIdentifierOrId = - internalProp.get(PolarisEntityConstants.getStorageIntegrationIdentifierPropertyName()); - String storageConfigInfoStr = - internalProp.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()); - PolarisStorageIntegration integration; - // storageConfigInfo's presence is needed to create a storage integration - // and the catalog should not have an internal property of storage identifier or id yet - if (storageConfigInfoStr != null && integrationIdentifierOrId == null) { - integration = - ((IntegrationPersistence) ms) - .createStorageIntegration( - callCtx, - catalog.getCatalogId(), - catalog.getId(), - PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr)); - } else { - integration = null; - } - // check if that catalog has already been created // This can be done safely as a separate atomic operation before trying to create the catalog // because same-id idempotent-retry collisions of this sort are necessarily sequential, so @@ -489,7 +465,6 @@ private void revokeGrantRecord( // done, return the existing catalog return new CreateCatalogResult(refreshCatalog, catalogAdminRole); } - ((IntegrationPersistence) ms).persistStorageIntegrationIfNeeded(callCtx, catalog, integration); // now create and persist new catalog entity EntityResult lowLevelResult = this.persistNewEntity(callCtx, ms, catalog); @@ -1568,65 +1543,6 @@ private void revokeGrantRecord( return EntitiesResult.fromPage(Page.fromItems(loadedTasks)); } - /** {@inheritDoc} */ - @Override - public @Nonnull ScopedCredentialsResult getSubscopedCredsForEntity( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisEntityType entityType, - boolean allowListOperation, - @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { - - // get meta store session we should be using - BasePersistence ms = callCtx.getMetaStore(); - callCtx - .getDiagServices() - .check( - !allowedReadLocations.isEmpty() || !allowedWriteLocations.isEmpty(), - "allowed_locations_to_subscope_is_required"); - - // reload the entity, error out if not found - EntityResult reloadedEntity = loadEntity(callCtx, catalogId, entityId, entityType); - if (reloadedEntity.getReturnStatus() != BaseResult.ReturnStatus.SUCCESS) { - return new ScopedCredentialsResult( - reloadedEntity.getReturnStatus(), reloadedEntity.getExtraInformation()); - } - - // TODO: Consider whether this independent lookup is safe for the model already or whether - // we need better atomicity semantics between the base entity and the embedded storage - // integration. - - // get storage integration - PolarisStorageIntegration storageIntegration = - ((IntegrationPersistence) ms) - .loadPolarisStorageIntegration(callCtx, reloadedEntity.getEntity()); - - // cannot be null - callCtx - .getDiagServices() - .checkNotNull( - storageIntegration, - "storage_integration_not_exists", - "catalogId={}, entityId={}", - catalogId, - entityId); - - try { - AccessConfig accessConfig = - storageIntegration.getSubscopedCreds( - callCtx.getRealmConfig(), - allowListOperation, - allowedReadLocations, - allowedWriteLocations); - return new ScopedCredentialsResult(accessConfig); - } catch (Exception ex) { - return new ScopedCredentialsResult( - BaseResult.ReturnStatus.SUBSCOPE_CREDS_ERROR, ex.getMessage()); - } - } - /** * Get the internal property map for an entity * diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BaseMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BaseMetaStoreManager.java index 9820d8950c..990583635f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BaseMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BaseMetaStoreManager.java @@ -24,35 +24,17 @@ import jakarta.annotation.Nonnull; import java.util.Map; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; /** Shared basic PolarisMetaStoreManager logic for transactional and non-transactional impls. */ public abstract class BaseMetaStoreManager implements PolarisMetaStoreManager { /** mapper, allows to serialize/deserialize properties to/from JSON */ private static final ObjectMapper MAPPER = new ObjectMapper(); - public static PolarisStorageConfigurationInfo extractStorageConfiguration( - @Nonnull PolarisDiagnostics diagnostics, PolarisBaseEntity reloadedEntity) { - Map propMap = - PolarisObjectMapperUtil.deserializeProperties(reloadedEntity.getInternalProperties()); - String storageConfigInfoStr = - propMap.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()); - - diagnostics.check( - storageConfigInfoStr != null, - "missing_storage_configuration_info", - "catalogId={}, entityId={}", - reloadedEntity.getCatalogId(), - reloadedEntity.getId()); - return PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr); - } - /** * Given the internal property as a map of key/value pairs, serialize it to a String * diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IntegrationPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IntegrationPersistence.java index c9374182b6..4ae5036fc7 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/IntegrationPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/IntegrationPersistence.java @@ -21,10 +21,7 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; /** * Interface for the necessary "peripheral integration" objects that are logically attached to core @@ -90,44 +87,4 @@ PolarisPrincipalSecrets rotatePrincipalSecrets( */ void deletePrincipalSecrets( @Nonnull PolarisCallContext callCtx, @Nonnull String clientId, long principalId); - - /** - * Create an in-memory storage integration - * - * @param callCtx the polaris calllctx - * @param catalogId the catalog id - * @param entityId the entity id - * @param polarisStorageConfigurationInfo the storage configuration information - * @return a storage integration object - */ - @Nullable - PolarisStorageIntegration createStorageIntegration( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisStorageConfigurationInfo polarisStorageConfigurationInfo); - - /** - * Persist a storage integration in the metastore - * - * @param callContext the polaris call context - * @param entity the entity of the object - * @param storageIntegration the storage integration to persist - */ - void persistStorageIntegrationIfNeeded( - @Nonnull PolarisCallContext callContext, - @Nonnull PolarisBaseEntity entity, - @Nullable PolarisStorageIntegration storageIntegration); - - /** - * Load the polaris storage integration for a polaris entity (Catalog,Namespace,Table,View) - * - * @param callContext the polaris call context - * @param entity the polaris entity - * @return a polaris storage integration - */ - @Nullable - - PolarisStorageIntegration loadPolarisStorageIntegration( - @Nonnull PolarisCallContext callContext, @Nonnull PolarisBaseEntity entity); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java index ea247fdfb5..063b08f76a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java @@ -53,13 +53,12 @@ public abstract class LocalPolarisMetaStoreManagerFactory final Map entityCacheMap = new HashMap<>(); final Map backingStoreMap = new HashMap<>(); final Map> sessionSupplierMap = new HashMap<>(); + private final Clock clock; + private final PolarisDiagnostics diagnostics; private static final Logger LOGGER = LoggerFactory.getLogger(LocalPolarisMetaStoreManagerFactory.class); - private final Clock clock; - private final PolarisDiagnostics diagnostics; - protected LocalPolarisMetaStoreManagerFactory( @Nonnull Clock clock, @Nonnull PolarisDiagnostics diagnostics) { this.clock = clock; diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index 67175e21fb..edda8ea13e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -49,17 +49,13 @@ import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingManager; -import org.apache.polaris.core.storage.PolarisCredentialVendor; /** * Polaris Metastore Manager manages all Polaris entities and associated grant records metadata for * authorization. It uses the underlying persistent metastore to store and retrieve Polaris metadata */ public interface PolarisMetaStoreManager - extends PolarisSecretsManager, - PolarisGrantManager, - PolarisCredentialVendor, - PolarisPolicyMappingManager { + extends PolarisSecretsManager, PolarisGrantManager, PolarisPolicyMappingManager { /** * Bootstrap the Polaris service, creating the root catalog, root principal, and associated diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index a7138c9fe2..f65337ea9e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.LocationBasedEntity; import org.apache.polaris.core.entity.PolarisBaseEntity; @@ -51,7 +50,6 @@ import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; -import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -329,25 +327,6 @@ public EntitiesResult loadTasks( return null; } - @Override - public ScopedCredentialsResult getSubscopedCredsForEntity( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisEntityType entityType, - boolean allowListOperation, - @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { - return delegate.getSubscopedCredsForEntity( - callCtx, - catalogId, - entityId, - entityType, - allowListOperation, - allowedReadLocations, - allowedWriteLocations); - } - @Override public ResolvedEntityResult loadResolvedEntityById( @Nonnull PolarisCallContext callCtx, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java index f08b85e122..3c5e8e5fe9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java @@ -41,8 +41,6 @@ import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyType; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; /** * Extends BasePersistence to express a more "transaction-oriented" control flow for backing stores @@ -513,45 +511,6 @@ public void deletePrincipalSecrets( callCtx, () -> this.deletePrincipalSecretsInCurrentTxn(callCtx, clientId, principalId)); } - /** {@inheritDoc} */ - @Override - @Nullable - public - PolarisStorageIntegration createStorageIntegration( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) { - return runInTransaction( - callCtx, - () -> - this.createStorageIntegrationInCurrentTxn( - callCtx, catalogId, entityId, polarisStorageConfigurationInfo)); - } - - /** {@inheritDoc} */ - @Override - public void persistStorageIntegrationIfNeeded( - @Nonnull PolarisCallContext callCtx, - @Nonnull PolarisBaseEntity entity, - @Nullable PolarisStorageIntegration storageIntegration) { - runActionInTransaction( - callCtx, - () -> - this.persistStorageIntegrationIfNeededInCurrentTxn( - callCtx, entity, storageIntegration)); - } - - /** {@inheritDoc} */ - @Override - @Nullable - public - PolarisStorageIntegration loadPolarisStorageIntegration( - @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { - return runInReadTransaction( - callCtx, () -> this.loadPolarisStorageIntegrationInCurrentTxn(callCtx, entity)); - } - // // Implementations of the in-transaction versions for basic write/delete/lookup using the // slice-based model supported by this class. diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 34e2396227..61b5d1332c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -69,16 +69,12 @@ import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; -import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; import org.apache.polaris.core.policy.PolicyType; -import org.apache.polaris.core.storage.AccessConfig; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -416,8 +412,6 @@ private void revokeGrantRecord( * @param callCtx call context * @param ms meta store in read/write mode * @param catalog the catalog entity to create - * @param integration the storage integration that should be attached to the catalog. If null, do - * nothing, otherwise persist the integration. * @param principalRoles once the catalog has been created, list of principal roles to grant its * catalog_admin role to. If no principal role is specified, we will grant the catalog_admin * role of the newly created catalog to the service admin role. @@ -428,7 +422,6 @@ private void revokeGrantRecord( @Nonnull PolarisCallContext callCtx, @Nonnull TransactionalPersistence ms, @Nonnull PolarisBaseEntity catalog, - @Nullable PolarisStorageIntegration integration, @Nonnull List principalRoles) { // validate input callCtx.getDiagServices().checkNotNull(catalog, "unexpected_null_catalog"); @@ -480,8 +473,6 @@ private void revokeGrantRecord( return new CreateCatalogResult(BaseResult.ReturnStatus.ENTITY_ALREADY_EXISTS, null); } - ms.persistStorageIntegrationIfNeededInCurrentTxn(callCtx, catalog, integration); - // now create and persist new catalog entity this.persistNewEntity(callCtx, ms, catalog); @@ -968,27 +959,9 @@ private void bootstrapPolarisService( // get metastore we should be using TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); - Map internalProp = getInternalPropertyMap(catalog); - String integrationIdentifierOrId = - internalProp.get(PolarisEntityConstants.getStorageIntegrationIdentifierPropertyName()); - String storageConfigInfoStr = - internalProp.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()); - PolarisStorageIntegration integration; - // storageConfigInfo's presence is needed to create a storage integration - // and the catalog should not have an internal property of storage identifier or id yet - if (storageConfigInfoStr != null && integrationIdentifierOrId == null) { - integration = - ms.createStorageIntegrationInCurrentTxn( - callCtx, - catalog.getCatalogId(), - catalog.getId(), - PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr)); - } else { - integration = null; - } // need to run inside a read/write transaction return ms.runInTransaction( - callCtx, () -> this.createCatalog(callCtx, ms, catalog, integration, principalRoles)); + callCtx, () -> this.createCatalog(callCtx, ms, catalog, principalRoles)); } /** {@link #createEntityIfNotExists(PolarisCallContext, List, PolarisBaseEntity)} */ @@ -2020,60 +1993,6 @@ private PolarisEntityResolver resolveSecurableToRoleGrant( return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, pageToken)); } - /** {@inheritDoc} */ - @Override - public @Nonnull ScopedCredentialsResult getSubscopedCredsForEntity( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisEntityType entityType, - boolean allowListOperation, - @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations) { - - // get meta store session we should be using - TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); - callCtx - .getDiagServices() - .check( - !allowedReadLocations.isEmpty() || !allowedWriteLocations.isEmpty(), - "allowed_locations_to_subscope_is_required"); - - // reload the entity, error out if not found - EntityResult reloadedEntity = loadEntity(callCtx, catalogId, entityId, entityType); - if (reloadedEntity.getReturnStatus() != BaseResult.ReturnStatus.SUCCESS) { - return new ScopedCredentialsResult( - reloadedEntity.getReturnStatus(), reloadedEntity.getExtraInformation()); - } - - // get storage integration - PolarisStorageIntegration storageIntegration = - ms.loadPolarisStorageIntegrationInCurrentTxn(callCtx, reloadedEntity.getEntity()); - - // cannot be null - callCtx - .getDiagServices() - .checkNotNull( - storageIntegration, - "storage_integration_not_exists", - "catalogId={}, entityId={}", - catalogId, - entityId); - - try { - AccessConfig accessConfig = - storageIntegration.getSubscopedCreds( - callCtx.getRealmConfig(), - allowListOperation, - allowedReadLocations, - allowedWriteLocations); - return new ScopedCredentialsResult(accessConfig); - } catch (Exception ex) { - return new ScopedCredentialsResult( - BaseResult.ReturnStatus.SUBSCOPE_CREDS_ERROR, ex.getMessage()); - } - } - /** * Get the internal property map for an entity * diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java index 1c58334d55..3ec87f0ed9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java @@ -39,8 +39,6 @@ import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.TransactionalPolicyMappingPersistence; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; /** * Extends BasePersistence to express a more "transaction-oriented" control flow for backing stores @@ -297,33 +295,4 @@ PolarisPrincipalSecrets rotatePrincipalSecretsInCurrentTxn( */ void deletePrincipalSecretsInCurrentTxn( @Nonnull PolarisCallContext callCtx, @Nonnull String clientId, long principalId); - - /** - * See {@link org.apache.polaris.core.persistence.IntegrationPersistence#createStorageIntegration} - */ - @Nullable - - PolarisStorageIntegration createStorageIntegrationInCurrentTxn( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisStorageConfigurationInfo polarisStorageConfigurationInfo); - - /** - * See {@link - * org.apache.polaris.core.persistence.IntegrationPersistence#persistStorageIntegrationIfNeeded} - */ - void persistStorageIntegrationIfNeededInCurrentTxn( - @Nonnull PolarisCallContext callContext, - @Nonnull PolarisBaseEntity entity, - @Nullable PolarisStorageIntegration storageIntegration); - - /** - * See {@link - * org.apache.polaris.core.persistence.IntegrationPersistence#loadPolarisStorageIntegration} - */ - @Nullable - - PolarisStorageIntegration loadPolarisStorageIntegrationInCurrentTxn( - @Nonnull PolarisCallContext callContext, @Nonnull PolarisBaseEntity entity); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java index 6ebb18e8ce..4b157877a8 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java @@ -43,33 +43,23 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; -import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.pagination.EntityIdToken; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; -import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; -import org.apache.polaris.core.storage.PolarisStorageIntegration; -import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageLocation; public class TreeMapTransactionalPersistenceImpl extends AbstractTransactionalPersistence { // the TreeMap store to use private final TreeMapMetaStore store; - private final PolarisStorageIntegrationProvider storageIntegrationProvider; private final PrincipalSecretsGenerator secretsGenerator; public TreeMapTransactionalPersistenceImpl( - @Nonnull TreeMapMetaStore store, - @Nonnull PolarisStorageIntegrationProvider storageIntegrationProvider, - @Nonnull PrincipalSecretsGenerator secretsGenerator) { - - // init store + @Nonnull TreeMapMetaStore store, @Nonnull PrincipalSecretsGenerator secretsGenerator) { this.store = store; - this.storageIntegrationProvider = storageIntegrationProvider; this.secretsGenerator = secretsGenerator; } @@ -124,16 +114,6 @@ public void writeToEntitiesInCurrentTxn( this.store.getSliceEntities().write(entity); } - /** {@inheritDoc} */ - @Override - public - void persistStorageIntegrationIfNeededInCurrentTxn( - @Nonnull PolarisCallContext callContext, - @Nonnull PolarisBaseEntity entity, - @Nullable PolarisStorageIntegration storageIntegration) { - // not implemented for in-memory store - } - /** {@inheritDoc} */ @Override public void writeToEntitiesActiveInCurrentTxn( @@ -559,28 +539,6 @@ public void deletePrincipalSecretsInCurrentTxn( this.store.getSlicePrincipalSecrets().delete(clientId); } - /** {@inheritDoc} */ - @Override - public @Nullable - PolarisStorageIntegration createStorageIntegrationInCurrentTxn( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisStorageConfigurationInfo polarisStorageConfigurationInfo) { - return storageIntegrationProvider.getStorageIntegrationForConfig( - polarisStorageConfigurationInfo); - } - - /** {@inheritDoc} */ - @Override - public @Nullable - PolarisStorageIntegration loadPolarisStorageIntegrationInCurrentTxn( - @Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) { - PolarisStorageConfigurationInfo storageConfig = - BaseMetaStoreManager.extractStorageConfiguration(callCtx.getDiagServices(), entity); - return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig); - } - @Override public void rollback() { this.store.rollback(); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java deleted file mode 100644 index 04022d233c..0000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.core.storage; - -import jakarta.annotation.Nonnull; -import java.util.Set; -import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; - -/** Manage credentials for storage locations. */ -public interface PolarisCredentialVendor { - /** - * Get a sub-scoped credentials for an entity against the provided allowed read and write - * locations. - * - * @param callCtx the polaris call context - * @param catalogId the catalog id - * @param entityId the entity id - * @param allowListOperation whether to allow LIST operation on the allowedReadLocations and - * allowedWriteLocations - * @param allowedReadLocations a set of allowed to read locations - * @param allowedWriteLocations a set of allowed to write locations - * @return an enum map containing the scoped credentials - */ - @Nonnull - ScopedCredentialsResult getSubscopedCredsForEntity( - @Nonnull PolarisCallContext callCtx, - long catalogId, - long entityId, - PolarisEntityType entityType, - boolean allowListOperation, - @Nonnull Set allowedReadLocations, - @Nonnull Set allowedWriteLocations); -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java index d8d88edc64..d0ba8996e7 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java @@ -18,26 +18,25 @@ */ package org.apache.polaris.core.storage.cache; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.LoadingCache; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; -import jakarta.annotation.Nullable; import java.time.Duration; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import org.apache.iceberg.exceptions.UnprocessableEntityException; -import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; -import org.apache.polaris.core.entity.PolarisEntity; -import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; +import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.storage.AccessConfig; -import org.apache.polaris.core.storage.PolarisCredentialVendor; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.PolarisStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,10 +46,14 @@ public class StorageCredentialCache { private static final Logger LOGGER = LoggerFactory.getLogger(StorageCredentialCache.class); private final LoadingCache cache; + private final PolarisStorageIntegrationProvider storageIntegrationProvider; /** Initialize the creds cache */ - public StorageCredentialCache(StorageCredentialCacheConfig cacheConfig) { - cache = + public StorageCredentialCache( + StorageCredentialCacheConfig cacheConfig, + PolarisStorageIntegrationProvider storageIntegrationProvider) { + this.storageIntegrationProvider = storageIntegrationProvider; + this.cache = Caffeine.newBuilder() .maximumSize(cacheConfig.maxEntries()) .expireAfter( @@ -91,30 +94,23 @@ private long maxCacheDurationMs(RealmConfig realmConfig) { /** * Either get from the cache or generate a new entry for a scoped creds * - * @param credentialVendor the credential vendor used to generate a new scoped creds if needed - * @param callCtx the call context - * @param polarisEntity the polaris entity that is going to scoped creds + * @param callContext the call context + * @param storageConfigurationInfo storage configuration * @param allowListOperation whether allow list action on the provided read and write locations * @param allowedReadLocations a set of allowed to read locations * @param allowedWriteLocations a set of allowed to write locations. * @return the a map of string containing the scoped creds information */ public AccessConfig getOrGenerateSubScopeCreds( - @Nonnull PolarisCredentialVendor credentialVendor, - @Nonnull PolarisCallContext callCtx, - @Nonnull PolarisEntity polarisEntity, + @Nonnull CallContext callContext, + @Nonnull PolarisStorageConfigurationInfo storageConfigurationInfo, boolean allowListOperation, @Nonnull Set allowedReadLocations, @Nonnull Set allowedWriteLocations) { - if (!isTypeSupported(polarisEntity.getType())) { - callCtx - .getDiagServices() - .fail("entity_type_not_suppported_to_scope_creds", "type={}", polarisEntity.getType()); - } StorageCredentialCacheKey key = StorageCredentialCacheKey.of( - callCtx.getRealmContext().getRealmIdentifier(), - polarisEntity, + callContext.getRealmContext().getRealmIdentifier(), + storageConfigurationInfo, allowListOperation, allowedReadLocations, allowedWriteLocations); @@ -122,52 +118,60 @@ public AccessConfig getOrGenerateSubScopeCreds( Function loader = k -> { LOGGER.atDebug().log("StorageCredentialCache::load"); - ScopedCredentialsResult scopedCredentialsResult = - credentialVendor.getSubscopedCredsForEntity( - callCtx, - k.catalogId(), - polarisEntity.getId(), - polarisEntity.getType(), - k.allowedListAction(), - k.allowedReadLocations(), - k.allowedWriteLocations()); - if (scopedCredentialsResult.isSuccess()) { - long maxCacheDurationMs = maxCacheDurationMs(callCtx.getRealmConfig()); - return new StorageCredentialCacheEntry( - scopedCredentialsResult.getAccessConfig(), maxCacheDurationMs); - } - LOGGER - .atDebug() - .addKeyValue("errorMessage", scopedCredentialsResult.getExtraInformation()) - .log("Failed to get subscoped credentials"); - throw new UnprocessableEntityException( - "Failed to get subscoped credentials: %s", - scopedCredentialsResult.getExtraInformation()); + checkArgument( + !allowedReadLocations.isEmpty() || !allowedWriteLocations.isEmpty(), + "allowed_locations_to_subscope_is_required"); + + // get storage integration + var storageIntegration = getStorageIntegrationForConfig(storageConfigurationInfo); + + return buildSubscopedCacheEntry( + callContext, + allowListOperation, + allowedReadLocations, + allowedWriteLocations, + storageIntegration); }; - return cache.get(key, loader).toAccessConfig(); + return requireNonNull(cache.get(key, loader)).toAccessConfig(); } @VisibleForTesting - @Nullable - Map getIfPresent(StorageCredentialCacheKey key) { - return getAccessConfig(key).map(AccessConfig::credentials).orElse(null); + StorageCredentialCacheEntry buildSubscopedCacheEntry( + CallContext callContext, + boolean allowListOperation, + Set allowedReadLocations, + Set allowedWriteLocations, + PolarisStorageIntegration storageIntegration) { + try { + var realmConfig = callContext.getRealmConfig(); + var accessConfig = + storageIntegration.getSubscopedCreds( + realmConfig, allowListOperation, allowedReadLocations, allowedWriteLocations); + long maxCacheDurationMs = maxCacheDurationMs(realmConfig); + return new StorageCredentialCacheEntry(accessConfig, maxCacheDurationMs); + } catch (Exception ex) { + LOGGER + .atDebug() + .addKeyValue("errorMessage", ex.getMessage()) + .log("Failed to get subscoped credentials"); + throw new UnprocessableEntityException( + "Failed to get subscoped credentials: %s", ex.getMessage()); + } } @VisibleForTesting - Optional getAccessConfig(StorageCredentialCacheKey key) { - return Optional.ofNullable(cache.getIfPresent(key)) - .map(StorageCredentialCacheEntry::toAccessConfig); + PolarisStorageIntegration getStorageIntegrationForConfig( + PolarisStorageConfigurationInfo storageConfigurationInfo) { + return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfigurationInfo); } - private boolean isTypeSupported(PolarisEntityType type) { - return type == PolarisEntityType.CATALOG - || type == PolarisEntityType.NAMESPACE - || type == PolarisEntityType.TABLE_LIKE - || type == PolarisEntityType.TASK; + @VisibleForTesting + boolean isPresent(StorageCredentialCacheKey key) { + return cache.getIfPresent(key) != null; } @VisibleForTesting - public long getEstimatedSize() { + long getEstimatedSize() { return this.cache.estimatedSize(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java index 79eba7d1dc..26c38b768a 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java @@ -18,10 +18,8 @@ */ package org.apache.polaris.core.storage.cache; -import jakarta.annotation.Nullable; import java.util.Set; -import org.apache.polaris.core.entity.PolarisEntity; -import org.apache.polaris.core.entity.PolarisEntityConstants; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.immutables.PolarisImmutable; import org.immutables.value.Value; @@ -32,35 +30,26 @@ public interface StorageCredentialCacheKey { String realmId(); @Value.Parameter(order = 2) - long catalogId(); + PolarisStorageConfigurationInfo storageConfigurationInfo(); @Value.Parameter(order = 3) - @Nullable - String storageConfigSerializedStr(); - - @Value.Parameter(order = 4) boolean allowedListAction(); - @Value.Parameter(order = 5) + @Value.Parameter(order = 4) Set allowedReadLocations(); - @Value.Parameter(order = 6) + @Value.Parameter(order = 5) Set allowedWriteLocations(); static StorageCredentialCacheKey of( String realmId, - PolarisEntity entity, + PolarisStorageConfigurationInfo storageConfigurationInfo, boolean allowedListAction, Set allowedReadLocations, Set allowedWriteLocations) { - String storageConfigSerializedStr = - entity - .getInternalPropertiesAsMap() - .get(PolarisEntityConstants.getStorageConfigInfoPropertyName()); return ImmutableStorageCredentialCacheKey.of( realmId, - entity.getCatalogId(), - storageConfigSerializedStr, + storageConfigurationInfo, allowedListAction, allowedReadLocations, allowedWriteLocations); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapAtomicOperationMetaStoreManagerTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapAtomicOperationMetaStoreManagerTest.java index ee103ab103..72aeada677 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapAtomicOperationMetaStoreManagerTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapAtomicOperationMetaStoreManagerTest.java @@ -25,7 +25,6 @@ import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; -import org.mockito.Mockito; public class PolarisTreeMapAtomicOperationMetaStoreManagerTest extends BasePolarisMetaStoreManagerTest { @@ -37,7 +36,7 @@ public PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { PolarisCallContext callCtx = new PolarisCallContext( () -> "testRealm", - new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS), + new TreeMapTransactionalPersistenceImpl(store, RANDOM_SECRETS), diagServices); return new PolarisTestMetaStoreManager(metaStoreManager, callCtx); } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreManagerTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreManagerTest.java index 41030b0146..6276b3358b 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreManagerTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/PolarisTreeMapMetaStoreManagerTest.java @@ -26,7 +26,6 @@ import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; -import org.mockito.Mockito; public class PolarisTreeMapMetaStoreManagerTest extends BasePolarisMetaStoreManagerTest { @Override @@ -38,7 +37,7 @@ public PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() { PolarisCallContext callCtx = new PolarisCallContext( () -> "testRealm", - new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS), + new TreeMapTransactionalPersistenceImpl(store, RANDOM_SECRETS), diagServices); return new PolarisTestMetaStoreManager(metaStoreManager, callCtx); } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java index ba7b202fef..445a18125a 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java @@ -26,7 +26,6 @@ import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; -import org.mockito.Mockito; public class ResolverTest extends BaseResolverTest { @@ -41,7 +40,7 @@ protected PolarisCallContext callCtx() { PolarisDefaultDiagServiceImpl diagServices = new PolarisDefaultDiagServiceImpl(); TreeMapMetaStore store = new TreeMapMetaStore(diagServices); TreeMapTransactionalPersistenceImpl metaStore = - new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS); + new TreeMapTransactionalPersistenceImpl(store, RANDOM_SECRETS); callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices); } return callCtx; diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java index 1184e55a0f..a89aa0ffac 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java @@ -43,7 +43,6 @@ import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; /** Unit testing of the entity cache */ public class InMemoryEntityCacheTest { @@ -79,7 +78,7 @@ public InMemoryEntityCacheTest() { diagServices = new PolarisDefaultDiagServiceImpl(); TreeMapMetaStore store = new TreeMapMetaStore(diagServices); TransactionalPersistence metaStore = - new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS); + new TreeMapTransactionalPersistenceImpl(store, RANDOM_SECRETS); metaStoreManager = new TransactionalMetaStoreManagerImpl(Clock.systemUTC()); callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices); diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java index 5364fa8433..9c15a88916 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java @@ -18,89 +18,86 @@ */ package org.apache.polaris.core.storage.cache; -import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; - import jakarta.annotation.Nonnull; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.UnprocessableEntityException; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; -import org.apache.polaris.core.PolarisDiagnostics; -import org.apache.polaris.core.entity.PolarisBaseEntity; -import org.apache.polaris.core.entity.PolarisEntity; -import org.apache.polaris.core.entity.PolarisEntityConstants; -import org.apache.polaris.core.entity.PolarisEntitySubType; -import org.apache.polaris.core.entity.PolarisEntityType; -import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.core.persistence.PolarisObjectMapperUtil; -import org.apache.polaris.core.persistence.dao.entity.BaseResult; -import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; -import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; -import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; -import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.config.RealmConfigImpl; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.storage.AccessConfig; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; +import org.apache.polaris.core.storage.PolarisStorageIntegration; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessProperty; +import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo; +import org.apache.polaris.core.storage.azure.AzureStorageConfigurationInfo; +import org.apache.polaris.core.storage.gcp.GcpStorageConfigurationInfo; import org.assertj.core.api.Assertions; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.mockito.Mockito; public class StorageCredentialCacheTest { - private final PolarisCallContext callCtx; - private final StorageCredentialCacheConfig storageCredentialCacheConfig; - private final PolarisMetaStoreManager metaStoreManager; - + private CallContext callCtx; private StorageCredentialCache storageCredentialCache; + private PolarisStorageIntegrationProvider storageIntegrationProvider; - public StorageCredentialCacheTest() { - // diag services - PolarisDiagnostics diagServices = new PolarisDefaultDiagServiceImpl(); - // the entity store, use treemap implementation - TreeMapMetaStore store = new TreeMapMetaStore(diagServices); + @BeforeEach + public void setup() { // to interact with the metastore - TransactionalPersistence metaStore = - new TreeMapTransactionalPersistenceImpl(store, Mockito.mock(), RANDOM_SECRETS); - callCtx = new PolarisCallContext(() -> "testRealm", metaStore, diagServices); - storageCredentialCacheConfig = () -> 10_000; - metaStoreManager = Mockito.mock(PolarisMetaStoreManager.class); - storageCredentialCache = newStorageCredentialCache(); - } + RealmContext realmContext = () -> "testRealm"; + callCtx = + new CallContext() { + @Override + public RealmConfig getRealmConfig() { + return new RealmConfigImpl(new PolarisConfigurationStore() {}, realmContext); + } + + @Override + public RealmContext getRealmContext() { + return realmContext; + } - private StorageCredentialCache newStorageCredentialCache() { - return new StorageCredentialCache(storageCredentialCacheConfig); + @Override + public PolarisCallContext getPolarisCallContext() { + throw new UnsupportedOperationException(); + } + + @Override + public CallContext copy() { + throw new UnsupportedOperationException(); + } + }; + + storageIntegrationProvider = Mockito.mock(); + storageCredentialCache = new StorageCredentialCache(() -> 10_000, storageIntegrationProvider); } @Test public void testBadResult() { - storageCredentialCache = newStorageCredentialCache(); - ScopedCredentialsResult badResult = - new ScopedCredentialsResult( - BaseResult.ReturnStatus.SUBSCOPE_CREDS_ERROR, "extra_error_info"); + PolarisStorageIntegration integration = mockedIntegration(); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(), - Mockito.anyBoolean(), - Mockito.anySet(), - Mockito.anySet())) - .thenReturn(badResult); - PolarisEntity polarisEntity = - new PolarisEntity( - new PolarisBaseEntity( - 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name")); + integration.getSubscopedCreds( + Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet())) + .thenThrow(new RuntimeException("extra_error_info")); + Mockito.when(storageIntegrationProvider.getStorageIntegrationForConfig(Mockito.any())) + .thenReturn(integration); + Assertions.assertThatThrownBy( () -> storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - polarisEntity, + storageConfigs().get(0), true, Set.of("s3://bucket1/path"), Set.of("s3://bucket3/path"))) @@ -110,31 +107,12 @@ public void testBadResult() { @Test public void testCacheHit() { - storageCredentialCache = newStorageCredentialCache(); - List mockedScopedCreds = - getFakeScopedCreds(3, /* expireSoon= */ false); - Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(), - Mockito.anyBoolean(), - Mockito.anySet(), - Mockito.anySet())) - .thenReturn(mockedScopedCreds.get(0)) - .thenReturn(mockedScopedCreds.get(1)) - .thenReturn(mockedScopedCreds.get(1)); - PolarisBaseEntity baseEntity = - new PolarisBaseEntity( - 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name"); - PolarisEntity polarisEntity = new PolarisEntity(baseEntity); + fakedIntegrations(false); // add an item to the cache storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - polarisEntity, + storageConfigs().get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path")); @@ -142,9 +120,8 @@ public void testCacheHit() { // subscope for the same entity and same allowed locations, will hit the cache storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - polarisEntity, + storageConfigs().get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path")); @@ -152,148 +129,70 @@ public void testCacheHit() { } @RepeatedTest(10) - public void testCacheEvict() throws InterruptedException { - storageCredentialCache = newStorageCredentialCache(); - List mockedScopedCreds = getFakeScopedCreds(3, /* expireSoon= */ true); + public void testCacheEvict() { + fakedIntegrations(true); - Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(), - Mockito.anyBoolean(), - Mockito.anySet(), - Mockito.anySet())) - .thenReturn(mockedScopedCreds.get(0)) - .thenReturn(mockedScopedCreds.get(1)) - .thenReturn(mockedScopedCreds.get(2)); - PolarisBaseEntity baseEntity = - new PolarisBaseEntity( - 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name"); - PolarisEntity polarisEntity = new PolarisEntity(baseEntity); StorageCredentialCacheKey cacheKey = StorageCredentialCacheKey.of( callCtx.getRealmContext().getRealmIdentifier(), - polarisEntity, + storageConfigs().get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path")); // the entry will be evicted immediately because the token is expired storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - polarisEntity, + storageConfigs().get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path")); - Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); - - storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, - polarisEntity, - true, - Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); - Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); - - storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, - polarisEntity, - true, - Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); - Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull(); + Assertions.assertThat(cacheKey) + .extracting(storageCredentialCache::isPresent, InstanceOfAssertFactories.BOOLEAN) + .isFalse(); } @Test public void testCacheGenerateNewEntries() { - storageCredentialCache = newStorageCredentialCache(); - List mockedScopedCreds = - getFakeScopedCreds(3, /* expireSoon= */ false); - Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(), - Mockito.anyBoolean(), - Mockito.anySet(), - Mockito.anySet())) - .thenReturn(mockedScopedCreds.get(0)) - .thenReturn(mockedScopedCreds.get(1)) - .thenReturn(mockedScopedCreds.get(2)); - List entityList = getPolarisEntities(); + fakedIntegrations(false); + int cacheSize = 0; // different catalog will generate new cache entries - for (PolarisEntity entity : entityList) { + for (var storageConfigStr : storageConfigs()) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - entity, + storageConfigStr, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path")); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } - // update the entity's storage config, since StorageConfig changed, cache will generate new - // entry - for (PolarisEntity entity : entityList) { - Map internalMap = entity.getPropertiesAsMap(); - internalMap.put( - PolarisEntityConstants.getStorageConfigInfoPropertyName(), "newStorageConfig"); - PolarisBaseEntity updateEntity = - new PolarisBaseEntity.Builder(entity) - .internalProperties(PolarisObjectMapperUtil.serializeProperties(internalMap)) - .build(); - storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, - PolarisEntity.of(updateEntity), - /* allowedListAction= */ true, - Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket/path")); - Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); - } // allowedListAction changed to different value FALSE, will generate new entry - for (PolarisEntity entity : entityList) { + for (var storageConfigStr : storageConfigs()) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - entity, - /* allowedListAction= */ false, + /* allowedListAction= */ storageConfigStr, + false, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket/path")); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedWriteLocations, will generate new entry - for (PolarisEntity entity : entityList) { + for (var storageConfigStr : storageConfigs()) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - entity, - /* allowedListAction= */ false, + /* allowedListAction= */ storageConfigStr, + false, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://differentbucket/path")); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); } // different allowedReadLocations, will generate new try - for (PolarisEntity entity : entityList) { - Map internalMap = entity.getPropertiesAsMap(); - internalMap.put( - PolarisEntityConstants.getStorageConfigInfoPropertyName(), "newStorageConfig"); - PolarisBaseEntity updateEntity = - new PolarisBaseEntity.Builder(entity) - .internalProperties(PolarisObjectMapperUtil.serializeProperties(internalMap)) - .build(); + for (var storageConfigStr : storageConfigs()) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - PolarisEntity.of(updateEntity), - /* allowedListAction= */ false, + /* allowedListAction= */ storageConfigStr, + false, Set.of("s3://differentbucket/path", "s3://bucket2/path"), Set.of("s3://bucket/path")); Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize); @@ -302,168 +201,137 @@ public void testCacheGenerateNewEntries() { @Test public void testCacheNotAffectedBy() { - storageCredentialCache = newStorageCredentialCache(); - List mockedScopedCreds = - getFakeScopedCreds(3, /* expireSoon= */ false); + fakedIntegrations(false); - Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(), - Mockito.anyBoolean(), - Mockito.anySet(), - Mockito.anySet())) - .thenReturn(mockedScopedCreds.get(0)) - .thenReturn(mockedScopedCreds.get(1)) - .thenReturn(mockedScopedCreds.get(2)); - List entityList = getPolarisEntities(); - for (PolarisEntity entity : entityList) { + List configValues = storageConfigs(); + for (var storageConfigStr : configValues) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - entity, + storageConfigStr, true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path")); } - Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); + Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(configValues.size()); - // entity ID does not affect the cache - for (PolarisEntity entity : entityList) { - storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, - new PolarisEntity(new PolarisBaseEntity.Builder(entity).id(1234).build()), - true, - Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); - Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); - } - - // other property changes does not affect the cache - for (PolarisEntity entity : entityList) { - storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, - callCtx, - new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), - true, - Set.of("s3://bucket1/path", "s3://bucket2/path"), - Set.of("s3://bucket3/path", "s3://bucket4/path")); - Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); - } // order of the allowedReadLocations does not affect the cache - for (PolarisEntity entity : entityList) { + for (var storageConfigStr : configValues) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), + storageConfigStr, true, Set.of("s3://bucket2/path", "s3://bucket1/path"), Set.of("s3://bucket3/path", "s3://bucket4/path")); - Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); + Assertions.assertThat(storageCredentialCache.getEstimatedSize()) + .isEqualTo(configValues.size()); } // order of the allowedWriteLocations does not affect the cache - for (PolarisEntity entity : entityList) { + for (var storageConfigStr : configValues) { storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - new PolarisEntity(new PolarisBaseEntity.Builder(entity).entityVersion(5).build()), + storageConfigStr, true, Set.of("s3://bucket2/path", "s3://bucket1/path"), Set.of("s3://bucket4/path", "s3://bucket3/path")); - Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size()); + Assertions.assertThat(storageCredentialCache.getEstimatedSize()) + .isEqualTo(configValues.size()); } } - private static List getFakeScopedCreds(int number, boolean expireSoon) { - List res = new ArrayList<>(); - for (int i = 1; i <= number; i = i + 3) { - int finalI = i; - // NOTE: The default behavior of the Caffeine cache seems to have a bug; if our - // expireAfter definition in the StorageCredentialCache constructor doesn't clip - // the returned time to minimum of 0, and we set the expiration time to more than - // 1 second in the past, it seems the cache fails to remove the expired entries - // no matter how long we wait. This is possibly related to the implementation-specific - // "minimum difference between the scheduled executions" documented in Caffeine.java - // to be 1 second. - String expireTime = - expireSoon - ? String.valueOf(System.currentTimeMillis() - 100) - : String.valueOf(Long.MAX_VALUE); - res.add( - new ScopedCredentialsResult( - AccessConfig.builder() - .put(StorageAccessProperty.AWS_KEY_ID, "key_id_" + finalI) - .put(StorageAccessProperty.AWS_SECRET_KEY, "key_secret_" + finalI) - .put(StorageAccessProperty.AWS_SESSION_TOKEN_EXPIRES_AT_MS, expireTime) - .put(StorageAccessProperty.EXPIRATION_TIME, expireTime) - .build())); - if (res.size() == number) return res; - res.add( - new ScopedCredentialsResult( - AccessConfig.builder() - .put(StorageAccessProperty.AZURE_SAS_TOKEN, "sas_token_" + finalI) - .put(StorageAccessProperty.EXPIRATION_TIME, expireTime) - .build())); - if (res.size() == number) return res; - res.add( - new ScopedCredentialsResult( - AccessConfig.builder() - .put(StorageAccessProperty.GCS_ACCESS_TOKEN, "gcs_token_" + finalI) - .put(StorageAccessProperty.GCS_ACCESS_TOKEN_EXPIRES_AT, expireTime) - .build())); - } + private void fakedIntegrations(boolean expireSoon) { + List mockedAccessConfigs = getFakeAccessConfigs(expireSoon); + var integrations = + mockedAccessConfigs.stream() + .map( + accessConfig -> { + var integration = mockedIntegration(); + Mockito.when( + integration.getSubscopedCreds( + Mockito.any(), + Mockito.anyBoolean(), + Mockito.anySet(), + Mockito.anySet())) + .thenReturn(accessConfig); + return integration; + }) + .collect(Collectors.toList()); + Mockito.when(storageIntegrationProvider.getStorageIntegrationForConfig(Mockito.any())) + .thenReturn(integrations.get(0)) + .thenReturn(integrations.get(1)) + .thenReturn(integrations.get(2)); + } + + private static List getFakeAccessConfigs(boolean expireSoon) { + // NOTE: The default behavior of the Caffeine cache seems to have a bug; if our + // expireAfter definition in the StorageCredentialCache constructor doesn't clip + // the returned time to minimum of 0, and we set the expiration time to more than + // 1 second in the past, it seems the cache fails to remove the expired entries + // no matter how long we wait. This is possibly related to the implementation-specific + // "minimum difference between the scheduled executions" documented in Caffeine.java + // to be 1 second. + String expireTime = + expireSoon + ? String.valueOf(System.currentTimeMillis() - 100) + : String.valueOf(Long.MAX_VALUE); + List res = new ArrayList<>(); + res.add( + AccessConfig.builder() + .put(StorageAccessProperty.AWS_KEY_ID, "key_id_1") + .put(StorageAccessProperty.AWS_SECRET_KEY, "key_secret_1") + .put(StorageAccessProperty.AWS_SESSION_TOKEN_EXPIRES_AT_MS, expireTime) + .put(StorageAccessProperty.EXPIRATION_TIME, expireTime) + .build()); + res.add( + AccessConfig.builder() + .put(StorageAccessProperty.AZURE_SAS_TOKEN, "sas_token_1") + .put(StorageAccessProperty.EXPIRATION_TIME, expireTime) + .build()); + res.add( + AccessConfig.builder() + .put(StorageAccessProperty.GCS_ACCESS_TOKEN, "gcs_token_1") + .put(StorageAccessProperty.GCS_ACCESS_TOKEN_EXPIRES_AT, expireTime) + .build()); return res; } @Nonnull - private static List getPolarisEntities() { - PolarisEntity polarisEntity1 = - new PolarisEntity( - new PolarisBaseEntity( - 1, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name")); - PolarisEntity polarisEntity2 = - new PolarisEntity( - new PolarisBaseEntity( - 2, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name")); - PolarisEntity polarisEntity3 = - new PolarisEntity( - new PolarisBaseEntity( - 3, 2, PolarisEntityType.CATALOG, PolarisEntitySubType.ICEBERG_TABLE, 0, "name")); - - return Arrays.asList(polarisEntity1, polarisEntity2, polarisEntity3); + private static List storageConfigs() { + return List.of( + AwsStorageConfigurationInfo.builder() + .addAllowedLocation("s3://foo/bar") + .roleARN("arn:foo") + .region("no-where-1") + .build(), + GcpStorageConfigurationInfo.builder().addAllowedLocation("gs://foo/bar").build(), + AzureStorageConfigurationInfo.builder() + .addAllowedLocation("abfs://foo@baz.foo/bar") + .tenantId("someone") + .build()); } @Test public void testExtraProperties() { - storageCredentialCache = newStorageCredentialCache(); - ScopedCredentialsResult properties = - new ScopedCredentialsResult( - AccessConfig.builder() - .put(StorageAccessProperty.AWS_SECRET_KEY, "super-secret-123") - .put(StorageAccessProperty.AWS_ENDPOINT, "test-endpoint1") - .put(StorageAccessProperty.AWS_PATH_STYLE_ACCESS, "true") - .build()); + var accessConfig = + AccessConfig.builder() + .put(StorageAccessProperty.AWS_SECRET_KEY, "super-secret-123") + .put(StorageAccessProperty.AWS_ENDPOINT, "test-endpoint1") + .put(StorageAccessProperty.AWS_PATH_STYLE_ACCESS, "true") + .build(); + var integration = mockedIntegration(); Mockito.when( - metaStoreManager.getSubscopedCredsForEntity( - Mockito.any(), - Mockito.anyLong(), - Mockito.anyLong(), - Mockito.any(), - Mockito.anyBoolean(), - Mockito.anySet(), - Mockito.anySet())) - .thenReturn(properties); - List entityList = getPolarisEntities(); + integration.getSubscopedCreds( + Mockito.any(), Mockito.anyBoolean(), Mockito.anySet(), Mockito.anySet())) + .thenReturn(accessConfig); + Mockito.when(storageIntegrationProvider.getStorageIntegrationForConfig(Mockito.any())) + .thenReturn(integration); + + List configValues = storageConfigs(); AccessConfig config = storageCredentialCache.getOrGenerateSubScopeCreds( - metaStoreManager, callCtx, - entityList.get(0), + configValues.get(0), true, Set.of("s3://bucket1/path", "s3://bucket2/path"), Set.of("s3://bucket3/path", "s3://bucket4/path")); @@ -473,4 +341,9 @@ public void testExtraProperties() { .containsExactlyInAnyOrderEntriesOf( Map.of("s3.endpoint", "test-endpoint1", "s3.path-style-access", "true")); } + + @SuppressWarnings("unchecked") + private static PolarisStorageIntegration mockedIntegration() { + return Mockito.mock(PolarisStorageIntegration.class); + } } diff --git a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java index 9d2f22be7d..b4d171d1b6 100644 --- a/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java +++ b/runtime/service/src/intTest/java/org/apache/polaris/service/it/RestCatalogMinIOSpecialIT.java @@ -230,6 +230,15 @@ public void testInternalEndpoints() throws IOException { Optional.of(endpoint), false, Optional.of(endpoint))) { + StorageConfigInfo storageConfig = + managementApi.getCatalog(catalogName).getStorageConfigInfo(); + assertThat((AwsStorageConfigInfo) storageConfig) + .extracting( + AwsStorageConfigInfo::getEndpoint, + AwsStorageConfigInfo::getStsEndpoint, + AwsStorageConfigInfo::getEndpointInternal, + AwsStorageConfigInfo::getPathStyleAccess) + .containsExactly("http://s3.example.com", endpoint, endpoint, false); LoadTableResponse loadTableResponse = doTestCreateTable(restCatalog); assertThat(loadTableResponse.config()).containsEntry("s3.endpoint", "http://s3.example.com"); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 310a3cd328..fb9654bba9 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -98,6 +98,7 @@ import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.LocationBasedEntity; import org.apache.polaris.core.entity.NamespaceEntity; +import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.entity.PolarisEntitySubType; @@ -121,7 +122,6 @@ import org.apache.polaris.core.persistence.resolver.ResolverStatus; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.InMemoryStorageIntegration; -import org.apache.polaris.core.storage.PolarisCredentialVendor; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; @@ -857,22 +857,29 @@ public AccessConfig getAccessConfig( TableIdentifier tableIdentifier, TableMetadata tableMetadata, Set storageActions) { - Optional storageInfo = findStorageInfo(tableIdentifier); - if (storageInfo.isEmpty()) { - LOGGER - .atWarn() - .addKeyValue("tableIdentifier", tableIdentifier) - .log("Table entity has no storage configuration in its hierarchy"); - return AccessConfig.builder().build(); - } - return FileIOUtil.refreshAccessConfig( - callContext, - storageCredentialCache, - getCredentialVendor(), - tableIdentifier, - getLocationsAllowedToBeAccessed(tableMetadata), - storageActions, - storageInfo.get()); + return findStorageInfo(tableIdentifier) + .map( + e -> + e.getInternalPropertiesAsMap() + .get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize) + .map( + storageConfigInfo -> + FileIOUtil.getAccessConfig( + callContext, + storageCredentialCache, + tableIdentifier, + getLocationsAllowedToBeAccessed(tableMetadata), + storageActions, + storageConfigInfo)) + .orElseGet( + () -> { + LOGGER + .atWarn() + .addKeyValue("tableIdentifier", tableIdentifier) + .log("Table entity has no storage configuration in its hierarchy"); + return AccessConfig.builder().build(); + }); } private String buildPrefixedLocation(TableIdentifier tableIdentifier) { @@ -2087,6 +2094,10 @@ private FileIO loadFileIOForTableLike( PolarisResolvedPathWrapper resolvedStorageEntity, Map tableProperties, Set storageActions) { + + PolarisEntity entity = + FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity).orElseThrow(); + // Reload fileIO based on table specific context FileIO fileIO = fileIOFactory.loadFileIO( @@ -2096,7 +2107,7 @@ private FileIO loadFileIOForTableLike( identifier, readLocations, storageActions, - resolvedStorageEntity); + polarisStorageConfigurationInfoFromEntity(entity)); // ensure the new fileIO is closed when the catalog is closed closeableGroup.addCloseable(fileIO); return fileIO; @@ -2110,10 +2121,6 @@ private PolarisMetaStoreManager getMetaStoreManager() { return metaStoreManager; } - private PolarisCredentialVendor getCredentialVendor() { - return metaStoreManager; - } - @VisibleForTesting public void setFileIOFactory(FileIOFactory newFactory) { this.fileIOFactory = newFactory; @@ -2611,19 +2618,25 @@ protected FileIO loadFileIO(String ioImpl, Map properties) { new ResolvedPolarisEntity(catalogEntity, List.of(), List.of()); PolarisResolvedPathWrapper resolvedPath = new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity)); + PolarisBaseEntity entity = FileIOUtil.findStorageInfoFromHierarchy(resolvedPath).orElseThrow(); Set storageActions = Set.of(PolarisStorageActions.ALL); return fileIOFactory.loadFileIO( - callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); + callContext, + ioImpl, + properties, + identifier, + locations, + storageActions, + polarisStorageConfigurationInfoFromEntity(entity)); } - private void blockedUserSpecifiedWriteLocation(Map properties) { - if (properties != null - && (properties.containsKey(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY) - || properties.containsKey( - IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY))) { - throw new ForbiddenException( - "Delegate access to table with user-specified write location is temporarily not supported."); - } + private static Optional + polarisStorageConfigurationInfoFromEntity(PolarisBaseEntity entity) { + return Optional.ofNullable( + entity + .getInternalPropertiesAsMap() + .get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize); } private int getMaxMetadataRefreshRetries() { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java index d2c73e2684..9333c7c4fb 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/DefaultFileIOFactory.java @@ -33,12 +33,10 @@ import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; -import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.AccessConfig; -import org.apache.polaris.core.storage.PolarisCredentialVendor; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; /** @@ -72,38 +70,31 @@ public FileIO loadFileIO( @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { - RealmContext realmContext = callContext.getRealmContext(); - PolarisCredentialVendor credentialVendor = - metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); + Optional storageConfigurationInfo) { - // Get subcoped creds - properties = new HashMap<>(properties); - Optional storageInfoEntity = - FileIOUtil.findStorageInfoFromHierarchy(resolvedEntityPath); - Optional accessConfig = - storageInfoEntity.map( - storageInfo -> - FileIOUtil.refreshAccessConfig( - callContext, - storageCredentialCache, - credentialVendor, - identifier, - tableLocations, - storageActions, - storageInfo)); + Map updatedProperties = new HashMap<>(properties); + storageConfigurationInfo.ifPresent( + config -> { + RealmContext realmContext = callContext.getRealmContext(); + AccessConfig accessConfig = + FileIOUtil.getAccessConfig( + callContext, + storageCredentialCache, + identifier, + tableLocations, + storageActions, + config); - // Update the FileIO with the subscoped credentials - // Update with properties in case there are table-level overrides the credentials should - // always override table-level properties, since storage configuration will be found at - // whatever entity defines it - if (accessConfig.isPresent()) { - properties.putAll(accessConfig.get().credentials()); - properties.putAll(accessConfig.get().extraProperties()); - properties.putAll(accessConfig.get().internalProperties()); - } + // Update the FileIO with the subscoped credentials + // Update with properties in case there are table-level overrides the credentials should + // always override table-level properties, since storage configuration will be found at + // whatever entity defines it + updatedProperties.putAll(accessConfig.credentials()); + updatedProperties.putAll(accessConfig.extraProperties()); + updatedProperties.putAll(accessConfig.internalProperties()); + }); - return loadFileIOInternal(ioImplClassName, properties); + return loadFileIOInternal(ioImplClassName, updatedProperties); } @VisibleForTesting diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java index f3e0d6b98b..ca4572c71e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOFactory.java @@ -21,12 +21,13 @@ import jakarta.annotation.Nonnull; import jakarta.enterprise.context.ApplicationScoped; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; /** * Interface for providing a way to construct FileIO objects, such as for reading/writing S3. @@ -47,7 +48,7 @@ public interface FileIOFactory { * @param identifier the table identifier. * @param tableLocations locations associated with the table. * @param storageActions storage actions allowed for the table. - * @param resolvedEntityPath resolved paths for the entities. + * @param storageConfigurationInfo storage configuration * @return a configured FileIO instance. */ FileIO loadFileIO( @@ -57,5 +58,5 @@ FileIO loadFileIO( @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath); + Optional storageConfigurationInfo); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java index c5ef12d784..24d8775703 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/FileIOUtil.java @@ -27,8 +27,8 @@ import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.AccessConfig; -import org.apache.polaris.core.storage.PolarisCredentialVendor; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.slf4j.Logger; @@ -74,14 +74,13 @@ public static Optional findStorageInfoFromHierarchy( * and read/write metadata JSON files. * */ - public static AccessConfig refreshAccessConfig( + public static AccessConfig getAccessConfig( CallContext callContext, StorageCredentialCache storageCredentialCache, - PolarisCredentialVendor credentialVendor, TableIdentifier tableIdentifier, Set tableLocations, Set storageActions, - PolarisEntity entity) { + PolarisStorageConfigurationInfo storageConfigurationInfo) { boolean skipCredentialSubscopingIndirection = callContext @@ -106,12 +105,7 @@ public static AccessConfig refreshAccessConfig( : Set.of(); AccessConfig accessConfig = storageCredentialCache.getOrGenerateSubScopeCreds( - credentialVendor, - callContext.getPolarisCallContext(), - entity, - allowList, - tableLocations, - writeLocations); + callContext, storageConfigurationInfo, allowList, tableLocations, writeLocations); LOGGER .atDebug() .addKeyValue("tableIdentifier", tableIdentifier) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java index 048e19bb41..23a25b3907 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/WasbTranslatingFileIOFactory.java @@ -23,13 +23,14 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; /** A {@link FileIOFactory} that translates WASB paths to ABFS ones */ @@ -55,7 +56,7 @@ public FileIO loadFileIO( @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + Optional storageConfigurationInfo) { return new WasbTranslatingFileIO( defaultFileIOFactory.loadFileIO( callContext, @@ -64,6 +65,6 @@ public FileIO loadFileIO( identifier, tableLocations, storageActions, - resolvedEntityPath)); + storageConfigurationInfo)); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java index b83b68c0f6..54a262133a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java @@ -55,6 +55,7 @@ import org.apache.polaris.core.persistence.resolver.ResolverFactory; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; +import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.cache.StorageCredentialCache; import org.apache.polaris.core.storage.cache.StorageCredentialCacheConfig; import org.apache.polaris.service.auth.ActiveRolesProvider; @@ -105,8 +106,9 @@ public Clock clock() { @Produces @ApplicationScoped public StorageCredentialCache storageCredentialCache( - StorageCredentialCacheConfig storageCredentialCacheConfig) { - return new StorageCredentialCache(storageCredentialCacheConfig); + StorageCredentialCacheConfig storageCredentialCacheConfig, + PolarisStorageIntegrationProvider storageIntegrationProvider) { + return new StorageCredentialCache(storageCredentialCacheConfig, storageIntegrationProvider); } @Produces diff --git a/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryAtomicOperationMetaStoreManagerFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryAtomicOperationMetaStoreManagerFactory.java index 703ad1e380..2e571697c3 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryAtomicOperationMetaStoreManagerFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryAtomicOperationMetaStoreManagerFactory.java @@ -25,7 +25,6 @@ import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.persistence.AtomicOperationMetaStoreManager; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; /** * Uses a PolarisTreeMapStore for the underlying persistence layer but uses it to initialize an @@ -38,15 +37,13 @@ public class InMemoryAtomicOperationMetaStoreManagerFactory @SuppressWarnings("unused") // Required by CDI protected InMemoryAtomicOperationMetaStoreManagerFactory() { - this(null, null, null); + this(null, null); } @Inject public InMemoryAtomicOperationMetaStoreManagerFactory( - Clock clock, - PolarisDiagnostics diagnostics, - PolarisStorageIntegrationProvider storageIntegration) { - super(clock, diagnostics, storageIntegration); + Clock clock, PolarisDiagnostics diagnostics) { + super(clock, diagnostics); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryPolarisMetaStoreManagerFactory.java b/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryPolarisMetaStoreManagerFactory.java index fb846aef52..189ee42280 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryPolarisMetaStoreManagerFactory.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/persistence/InMemoryPolarisMetaStoreManagerFactory.java @@ -37,28 +37,22 @@ import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; -import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; @ApplicationScoped @Identifier("in-memory") public class InMemoryPolarisMetaStoreManagerFactory extends LocalPolarisMetaStoreManagerFactory { - private final PolarisStorageIntegrationProvider storageIntegration; private final Set bootstrappedRealms = new HashSet<>(); @SuppressWarnings("unused") // Required by CDI protected InMemoryPolarisMetaStoreManagerFactory() { - this(null, null, null); + this(null, null); } @Inject - public InMemoryPolarisMetaStoreManagerFactory( - Clock clock, - PolarisDiagnostics diagnostics, - PolarisStorageIntegrationProvider storageIntegration) { + public InMemoryPolarisMetaStoreManagerFactory(Clock clock, PolarisDiagnostics diagnostics) { super(clock, diagnostics); - this.storageIntegration = storageIntegration; } @Override @@ -73,7 +67,7 @@ protected TransactionalPersistence createMetaStoreSession( @Nullable RootCredentialsSet rootCredentialsSet, @Nonnull PolarisDiagnostics diagnostics) { return new TreeMapTransactionalPersistenceImpl( - store, storageIntegration, secretsGenerator(realmContext, rootCredentialsSet)); + store, secretsGenerator(realmContext, rootCredentialsSet)); } @Override diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java index e3a1ddd48f..af59e23c6a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java @@ -23,19 +23,23 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.entity.PolarisTaskConstants; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.catalog.io.FileIOUtil; @ApplicationScoped public class TaskFileIOSupplier implements BiFunction { @@ -60,11 +64,24 @@ public FileIO apply(TaskEntity task, CallContext callContext) { new ResolvedPolarisEntity(task, List.of(), List.of()); PolarisResolvedPathWrapper resolvedPath = new PolarisResolvedPathWrapper(List.of(resolvedTaskEntity)); + Optional polarisStorageConfigurationInfo = + FileIOUtil.findStorageInfoFromHierarchy(resolvedPath) + .map( + e -> + e.getInternalPropertiesAsMap() + .get(PolarisEntityConstants.getStorageConfigInfoPropertyName())) + .map(PolarisStorageConfigurationInfo::deserialize); String ioImpl = properties.getOrDefault( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO"); return fileIOFactory.loadFileIO( - callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath); + callContext, + ioImpl, + properties, + identifier, + locations, + storageActions, + polarisStorageConfigurationInfo); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java index b37447616a..37423624c5 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java @@ -46,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Function; @@ -102,13 +103,13 @@ import org.apache.polaris.core.entity.CatalogEntity; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityConstants; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PrincipalEntity; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.exceptions.CommitConflictException; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.cache.EntityCache; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; @@ -121,6 +122,7 @@ import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -1820,17 +1822,18 @@ public void testDropTableWithPurge() { .getEntities(); Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); + String storageConfigInfoStr = + taskEntity + .getInternalPropertiesAsMap() + .get(PolarisEntityConstants.getStorageConfigInfoPropertyName()); Map credentials = - metaStoreManager - .getSubscopedCredsForEntity( + storageCredentialCache + .getOrGenerateSubScopeCreds( polarisContext, - 0, - taskEntity.getId(), - taskEntity.getType(), + PolarisStorageConfigurationInfo.deserialize(storageConfigInfoStr), true, Set.of(tableMetadata.location()), Set.of(tableMetadata.location())) - .getAccessConfig() .credentials(); Assertions.assertThat(credentials) .isNotNull() @@ -2016,7 +2019,7 @@ public FileIO loadFileIO( @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + Optional storageConfigurationInfo) { return measured.loadFileIO( callContext, "org.apache.iceberg.inmemory.InMemoryFileIO", @@ -2024,7 +2027,7 @@ public FileIO loadFileIO( TABLE, Set.of(table.location()), Set.of(PolarisStorageActions.ALL), - Mockito.mock()); + storageConfigurationInfo); } }); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java b/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java index a9969d04a7..4b509f13d8 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/entity/CatalogEntityTest.java @@ -60,7 +60,7 @@ public class CatalogEntityTest { public void setup() { RealmContext realmContext = () -> "realm"; MetaStoreManagerFactory metaStoreManagerFactory = - new InMemoryPolarisMetaStoreManagerFactory(clock, diagnostics, null); + new InMemoryPolarisMetaStoreManagerFactory(clock, diagnostics); BasePersistence metaStore = metaStoreManagerFactory.getOrCreateSession(realmContext); this.callContext = new PolarisCallContext(realmContext, metaStore, diagnostics); } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java index 67d15a94ca..d1685154d3 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestFileIOFactory.java @@ -21,12 +21,13 @@ import jakarta.annotation.Nonnull; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.service.catalog.io.FileIOFactory; /** A FileIOFactory that always returns the same FileIO instance. */ @@ -46,7 +47,7 @@ public FileIO loadFileIO( @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + Optional storageConfigurationInfo) { return fileIO; } } diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java index d8ec777889..5e95e3641a 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -160,12 +160,11 @@ public TestServices build() { Optional.empty(), () -> GoogleCredentials.create(new AccessToken(GCP_ACCESS_TOKEN, new Date()))); InMemoryPolarisMetaStoreManagerFactory metaStoreManagerFactory = - new InMemoryPolarisMetaStoreManagerFactory( - clock, polarisDiagnostics, storageIntegrationProvider); + new InMemoryPolarisMetaStoreManagerFactory(clock, polarisDiagnostics); StorageCredentialCacheConfig storageCredentialCacheConfig = () -> 10_000; StorageCredentialCache storageCredentialCache = - new StorageCredentialCache(storageCredentialCacheConfig); + new StorageCredentialCache(storageCredentialCacheConfig, storageIntegrationProvider); UserSecretsManagerFactory userSecretsManagerFactory = new UnsafeInMemorySecretsManagerFactory(); diff --git a/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java b/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java index c4bf40ca92..a93454c4b8 100644 --- a/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java +++ b/runtime/service/src/testFixtures/java/org/apache/polaris/service/catalog/io/MeasuredFileIOFactory.java @@ -31,8 +31,8 @@ import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; -import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.cache.StorageCredentialCache; /** @@ -67,7 +67,7 @@ public FileIO loadFileIO( @Nonnull TableIdentifier identifier, @Nonnull Set tableLocations, @Nonnull Set storageActions, - @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + Optional storageConfigurationInfo) { loadFileIOExceptionSupplier.ifPresent( s -> { throw s.get(); @@ -82,7 +82,7 @@ public FileIO loadFileIO( identifier, tableLocations, storageActions, - resolvedEntityPath), + storageConfigurationInfo), newInputFileExceptionSupplier, newOutputFileExceptionSupplier, getLengthExceptionSupplier);