diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java index d47725351..fdb4ef5e0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java @@ -53,7 +53,7 @@ public boolean handleTask(TaskEntity task, CallContext callContext) { BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class); TableIdentifier tableId = cleanupTask.tableId(); List batchFiles = cleanupTask.batchFiles(); - try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) { + try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) { List validFiles = batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList(); if (validFiles.isEmpty()) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java index 3173dc25e..59e9be8dd 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java @@ -63,7 +63,7 @@ public boolean canHandleTask(TaskEntity task) { public boolean handleTask(TaskEntity task, CallContext callContext) { ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class); TableIdentifier tableId = cleanupTask.tableId(); - try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) { + try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) { ManifestFile manifestFile = decodeManifestData(cleanupTask.manifestFileData()); return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index 20bce48d4..db8b335ba 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -21,6 +21,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -36,7 +37,6 @@ import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.AsyncTaskType; import org.apache.polaris.core.entity.PolarisBaseEntity; -import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; @@ -71,21 +71,19 @@ public TableCleanupTaskHandler( @Override public boolean canHandleTask(TaskEntity task) { - return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER && taskEntityIsTable(task); + return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER + && tryGetTableEntity(task).isPresent(); } - private boolean taskEntityIsTable(TaskEntity task) { - PolarisEntity entity = PolarisEntity.of((task.readData(PolarisBaseEntity.class))); - return entity.getType().equals(PolarisEntityType.TABLE_LIKE); + private Optional tryGetTableEntity(TaskEntity task) { + return Optional.ofNullable(task.readData(PolarisBaseEntity.class)) + .filter(entity -> entity.getType().equals(PolarisEntityType.TABLE_LIKE)) + .map(IcebergTableLikeEntity::of); } @Override public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { - PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class); - PolarisMetaStoreManager metaStoreManager = - metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext()); - IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(entity); - PolarisCallContext polarisCallContext = callContext.getPolarisCallContext(); + IcebergTableLikeEntity tableEntity = tryGetTableEntity(cleanupTask).orElseThrow(); LOGGER .atInfo() .addKeyValue("tableIdentifier", tableEntity.getTableIdentifier()) @@ -95,7 +93,8 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { // It's likely the cleanupTask has already been completed, but wasn't dropped successfully. // Log a // warning and move on - try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) { + try (FileIO fileIO = + fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(), callContext)) { if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) { LOGGER .atWarn() @@ -108,6 +107,10 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) { TableMetadata tableMetadata = TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation()); + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext()); + PolarisCallContext polarisCallContext = callContext.getPolarisCallContext(); + Stream manifestCleanupTasks = getManifestTaskStream( cleanupTask, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java index e3a1ddd48..44de36210 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java @@ -24,21 +24,21 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; +import org.apache.commons.lang3.function.TriFunction; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.PolarisTaskConstants; import org.apache.polaris.core.entity.TaskEntity; -import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.catalog.io.FileIOFactory; @ApplicationScoped -public class TaskFileIOSupplier implements BiFunction { +public class TaskFileIOSupplier + implements TriFunction { private final FileIOFactory fileIOFactory; @Inject @@ -47,12 +47,10 @@ public TaskFileIOSupplier(FileIOFactory fileIOFactory) { } @Override - public FileIO apply(TaskEntity task, CallContext callContext) { + public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext callContext) { Map internalProperties = task.getInternalPropertiesAsMap(); Map properties = new HashMap<>(internalProperties); - IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(task); - TableIdentifier identifier = tableEntity.getTableIdentifier(); String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION); Set locations = Set.of(location); Set storageActions = Set.of(PolarisStorageActions.ALL); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java index b37447616..114425faf 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java @@ -1841,7 +1841,7 @@ public void testDropTableWithPurge() { FileIO fileIO = new TaskFileIOSupplier( new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory)) - .apply(taskEntity, polarisContext); + .apply(taskEntity, TABLE, polarisContext); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class); Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo()) .isInstanceOf(InMemoryFileIO.class); diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index fd6e79f12..5c72f1506 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -176,7 +176,7 @@ public void testLoadFileIOForCleanupTask(String scheme) { Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); FileIO fileIO = - new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, callContext); + new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, TABLE, callContext); Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class); Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo()) .isInstanceOf(InMemoryFileIO.class);