Skip to content

Fix TableIdentifier in TaskFileIOSupplier #2304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public boolean handleTask(TaskEntity task, CallContext callContext) {
BatchFileCleanupTask cleanupTask = task.readData(BatchFileCleanupTask.class);
TableIdentifier tableId = cleanupTask.tableId();
List<String> batchFiles = cleanupTask.batchFiles();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the tableId just be a part of the TaskEntity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depends on whether there will ever be tasks that dont operate on a single iceberg table... currently the TaskEntity design seems to leave this open

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this again, since the TableIdentifier is downstream only getting used for logging, it would seem an disproportionate amount of effort to change the TaskEntity to avoid passing in the tableidentifier here (that is already available in each task handler)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the existence of a task that involves multiple tables mean that a given task can't include a table name in its properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry i might not be fully understanding what you had been asking for.
could you please provide a more detailed design of how we would be making the tableId part of the TaskEntity?
and then also why that would be a better solution that simply passing in the tableId that is already available in the task handlers? (which is taken from TaskEntity.readData so its already part of the TaskEntity in a way)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, I completely missed that. Anyway, here is a one-line fix that doesn't change any APIs. From my debugger:
Screenshot 2025-08-13 at 11 15 05 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This correctly extracts the TableIdentifier (via the TableLikeEntity) from the TASK_DATA like I mentioned above.

Copy link
Contributor Author

@XN137 XN137 Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, here is a one-line fix that doesn't change any APIs

afaict your "one-line fix" only works for table cleanup tasks, where we happen to store a IcebergTableLikeEntity into the TASK_DATA:

Map<String, String> properties = new HashMap<>();
properties.put(
PolarisTaskConstants.TASK_TYPE,
String.valueOf(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER.typeCode()));
properties.put("data", PolarisObjectMapperUtil.serialize(refreshEntityToDrop));

for other tasks, it would be throwing an error as we store something else, for example:

ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
TableIdentifier tableId = cleanupTask.tableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's what I was trying to say above -- if there is a TableIdentifier in the TASK_DATA we should be extracting and using that. The current "cast" of a TaskEntity into a TableLikeEntity is definitely wrong, but if we can fix this the easy way we should.

If there is a task that doesn't have a TableIdentifier in TASK_DATA we should either add it there or remove the need for it (i.e. pass in null to loadFileIO and mark it Nullable) if that's easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a TableIdentifier in the TASK_DATA we should be extracting and using that.

it seems like we have circled back to a previous point in our discussion, so i am posting my reply again:

So I can just guess that you want to add something like a Optional getTableIdentifier() method to TaskEntity ?
but when we think about how it would need to be implemented, it would mean that it has to contain full knowledge of ALL TYPES of tasks and what kind of objects each one stores in TASK_DATA to de-serialize them and get the TaskIdentifier.

but that same knowledge ALREADY EXISTS in the task handlers (and its where we already have the TableIdentifier alongside the other task parameters), so again, just letting the task handlers pass that value into the TaskFileIOSupplier seems like the right approach to me.

i still dont have an answer as to why changing TaskFileIOSupplier is a no-go when all callers have the TableIdentifier already and the underlying FileIOFactory (in its current form) requires it.

List<String> validFiles =
batchFiles.stream().filter(file -> TaskUtils.exists(file, authorizedFileIO)).toList();
if (validFiles.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public boolean canHandleTask(TaskEntity task) {
public boolean handleTask(TaskEntity task, CallContext callContext) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
TableIdentifier tableId = cleanupTask.tableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) {
ManifestFile manifestFile = decodeManifestData(cleanupTask.manifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -36,7 +37,6 @@
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.AsyncTaskType;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
Expand Down Expand Up @@ -71,21 +71,19 @@ public TableCleanupTaskHandler(

@Override
public boolean canHandleTask(TaskEntity task) {
return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER && taskEntityIsTable(task);
return task.getTaskType() == AsyncTaskType.ENTITY_CLEANUP_SCHEDULER
&& tryGetTableEntity(task).isPresent();
}

private boolean taskEntityIsTable(TaskEntity task) {
PolarisEntity entity = PolarisEntity.of((task.readData(PolarisBaseEntity.class)));
return entity.getType().equals(PolarisEntityType.TABLE_LIKE);
private Optional<IcebergTableLikeEntity> tryGetTableEntity(TaskEntity task) {
return Optional.ofNullable(task.readData(PolarisBaseEntity.class))
.filter(entity -> entity.getType().equals(PolarisEntityType.TABLE_LIKE))
.map(IcebergTableLikeEntity::of);
}

@Override
public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
PolarisBaseEntity entity = cleanupTask.readData(PolarisBaseEntity.class);
PolarisMetaStoreManager metaStoreManager =
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(entity);
PolarisCallContext polarisCallContext = callContext.getPolarisCallContext();
IcebergTableLikeEntity tableEntity = tryGetTableEntity(cleanupTask).orElseThrow();
LOGGER
.atInfo()
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
Expand All @@ -95,7 +93,8 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
// It's likely the cleanupTask has already been completed, but wasn't dropped successfully.
// Log a
// warning and move on
try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) {
try (FileIO fileIO =
fileIOSupplier.apply(cleanupTask, tableEntity.getTableIdentifier(), callContext)) {
if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
LOGGER
.atWarn()
Expand All @@ -108,6 +107,10 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
TableMetadata tableMetadata =
TableMetadataParser.read(fileIO, tableEntity.getMetadataLocation());

PolarisMetaStoreManager metaStoreManager =
metaStoreManagerFactory.getOrCreateMetaStoreManager(callContext.getRealmContext());
PolarisCallContext polarisCallContext = callContext.getPolarisCallContext();

Stream<TaskEntity> manifestCleanupTasks =
getManifestTaskStream(
cleanupTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.commons.lang3.function.TriFunction;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.io.FileIOFactory;

@ApplicationScoped
public class TaskFileIOSupplier implements BiFunction<TaskEntity, CallContext, FileIO> {
public class TaskFileIOSupplier
implements TriFunction<TaskEntity, TableIdentifier, CallContext, FileIO> {
private final FileIOFactory fileIOFactory;

@Inject
Expand All @@ -47,12 +47,10 @@ public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
}

@Override
public FileIO apply(TaskEntity task, CallContext callContext) {
public FileIO apply(TaskEntity task, TableIdentifier identifier, CallContext callContext) {
Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
Map<String, String> properties = new HashMap<>(internalProperties);

IcebergTableLikeEntity tableEntity = IcebergTableLikeEntity.of(task);
TableIdentifier identifier = tableEntity.getTableIdentifier();
String location = properties.get(PolarisTaskConstants.STORAGE_LOCATION);
Set<String> locations = Set.of(location);
Set<PolarisStorageActions> storageActions = Set.of(PolarisStorageActions.ALL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ public void testDropTableWithPurge() {
FileIO fileIO =
new TaskFileIOSupplier(
new DefaultFileIOFactory(storageCredentialCache, metaStoreManagerFactory))
.apply(taskEntity, polarisContext);
.apply(taskEntity, TABLE, polarisContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
.isInstanceOf(InMemoryFileIO.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testLoadFileIOForCleanupTask(String scheme) {
Assertions.assertThat(tasks).hasSize(1);
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
FileIO fileIO =
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, callContext);
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, TABLE, callContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(ExceptionMappingFileIO.class);
Assertions.assertThat(((ExceptionMappingFileIO) fileIO).getInnerIo())
.isInstanceOf(InMemoryFileIO.class);
Expand Down