Skip to content

Commit 4427dc8

Browse files
committed
Fix TableIdentifier in TaskFileIOSupplier
we cant just convert a `TaskEntity` to a `IcebergTableLikeEntity` as the `getTableIdentifier()` method will not return a correct value by using the name of the task and its parent namespace (which is empty?). task handlers instead need to pass in the `TableIdentifier` that they already inferred via `TaskEntity.readData`.
1 parent cfff798 commit 4427dc8

File tree

6 files changed

+22
-21
lines changed

6 files changed

+22
-21
lines changed

runtime/service/src/main/java/org/apache/polaris/service/task/BatchFileCleanupTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public boolean handleTask(TaskEntity task, CallContext callContext) {
5353
BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class);
5454
TableIdentifier tableId = cleanupTask.tableId();
5555
List<String> batchFiles = cleanupTask.batchFiles();
56-
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
56+
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) {
5757
List<String> validFiles =
5858
batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList();
5959
if (validFiles.isEmpty()) {

runtime/service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public boolean canHandleTask(TaskEntity task) {
6363
public boolean handleTask(TaskEntity task, CallContext callContext) {
6464
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
6565
TableIdentifier tableId = cleanupTask.tableId();
66-
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
66+
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) {
6767
ManifestFile manifestFile = decodeManifestData(cleanupTask.manifestFileData());
6868
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
6969
}

runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.Clock;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.Optional;
2425
import java.util.UUID;
2526
import java.util.function.Function;
2627
import java.util.stream.Collectors;
@@ -36,7 +37,6 @@
3637
import org.apache.polaris.core.context.CallContext;
3738
import org.apache.polaris.core.entity.AsyncTaskType;
3839
import org.apache.polaris.core.entity.PolarisBaseEntity;
39-
import org.apache.polaris.core.entity.PolarisEntity;
4040
import org.apache.polaris.core.entity.PolarisEntityType;
4141
import org.apache.polaris.core.entity.TaskEntity;
4242
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
@@ -71,21 +71,19 @@ public TableCleanupTaskHandler(
7171

7272
@Override
7373
public boolean canHandleTask(TaskEntity task) {
74-
return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER && taskEntityIsTable(task);
74+
return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER
75+
&& tryGetTableEntity(task).isPresent();
7576
}
7677

77-
private boolean taskEntityIsTable(TaskEntity task) {
78-
PolarisEntity entity = PolarisEntity.of((task.readData(PolarisBaseEntity.class)));
79-
return entity.getType().equals(PolarisEntityType.TABLE_LIKE);
78+
private Optional<IcebergTableLikeEntity> tryGetTableEntity(TaskEntity task) {
79+
return Optional.ofNullable(task.readData(PolarisBaseEntity.class))
80+
.filter(entity -> entity.getType().equals(PolarisEntityType.TABLE_LIKE))
81+
.map(IcebergTableLikeEntity::of);
8082
}
8183

8284
@Override
8385
public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
84-
PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class);
85-
PolarisMetaStoreManager metaStoreManager =
86-
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
87-
IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(entity);
88-
PolarisCallContext polarisCallContext = callContext.getPolarisCallContext();
86+
IcebergTableLikeEntity tableEntity = tryGetTableEntity(cleanupTask).orElseThrow();
8987
LOGGER
9088
.atInfo()
9189
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
@@ -95,7 +93,8 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
9593
// It's likely the cleanupTask has already been completed, but wasn't dropped successfully.
9694
// Log a
9795
// warning and move on
98-
try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) {
96+
try (FileIO fileIO =
97+
fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(), callContext)) {
9998
if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
10099
LOGGER
101100
.atWarn()
@@ -108,6 +107,10 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
108107
TableMetadata tableMetadata =
109108
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation());
110109

110+
PolarisMetaStoreManager metaStoreManager =
111+
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
112+
PolarisCallContext polarisCallContext = callContext.getPolarisCallContext();
113+
111114
Stream<TaskEntity> manifestCleanupTasks =
112115
getManifestTaskStream(
113116
cleanupTask,

runtime/service/src/main/java/org/apache/polaris/service/task/TaskFileIOSupplier.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,21 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27-
import java.util.function.BiFunction;
27+
import org.apache.commons.lang3.function.TriFunction;
2828
import org.apache.iceberg.CatalogProperties;
2929
import org.apache.iceberg.catalog.TableIdentifier;
3030
import org.apache.iceberg.io.FileIO;
3131
import org.apache.polaris.core.context.CallContext;
3232
import org.apache.polaris.core.entity.PolarisTaskConstants;
3333
import org.apache.polaris.core.entity.TaskEntity;
34-
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
3534
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
3635
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
3736
import org.apache.polaris.core.storage.PolarisStorageActions;
3837
import org.apache.polaris.service.catalog.io.FileIOFactory;
3938

4039
@ApplicationScoped
41-
public class TaskFileIOSupplier implements BiFunction<TaskEntity, CallContext, FileIO> {
40+
public class TaskFileIOSupplier
41+
implements TriFunction<TaskEntity, TableIdentifier, CallContext, FileIO> {
4242
private final FileIOFactory fileIOFactory;
4343

4444
@Inject
@@ -47,12 +47,10 @@ public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
4747
}
4848

4949
@Override
50-
public FileIO apply(TaskEntity task, CallContext callContext) {
50+
public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext callContext) {
5151
Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
5252
Map<String, String> properties = new HashMap<>(internalProperties);
5353

54-
IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(task);
55-
TableIdentifier identifier = tableEntity.getTableIdentifier();
5654
String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION);
5755
Set<String> locations = Set.of(location);
5856
Set<PolarisStorageActions> storageActions = Set.of(PolarisStorageActions.ALL);

runtime/service/src/test/java/org/apache/polaris/service/catalog/AbstractIcebergCatalogTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1841,7 +1841,7 @@ public void testDropTableWithPurge() {
18411841
FileIO fileIO =
18421842
new TaskFileIOSupplier(
18431843
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory))
1844-
.apply(taskEntity, polarisContext);
1844+
.apply(taskEntity, TABLE, polarisContext);
18451845
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
18461846
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
18471847
.isInstanceOf(InMemoryFileIO.class);

runtime/service/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void testLoadFileIOForCleanupTask(String scheme) {
176176
Assertions.assertThat(tasks).hasSize(1);
177177
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
178178
FileIO fileIO =
179-
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, callContext);
179+
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, TABLE, callContext);
180180
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
181181
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
182182
.isInstanceOf(InMemoryFileIO.class);

0 commit comments

Comments
 (0)