From 2fed1006e62801823fa230472fde60be9c62b4fc Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 22 Jun 2025 08:23:22 -0400 Subject: [PATCH 1/8] Added logic to scan for orphan files and remove them as part of maintenace schedule --- .../maintenance/MaintenanceScheduler.java | 4 + .../maintenance/OrphanFileScanner.java | 148 ++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 954fd66..7cb6f10 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -119,6 +119,10 @@ public void performMaintenance() { table.rewriteManifests().rewriteIf(manifest -> true).commit(); table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); + + // Remove orphans. + OrphanFileScanner orphanFileScanner = new OrphanFileScanner(table); + orphanFileScanner.removeOrphanedFiles(olderThanMillis, false); } } logger.info("Maintenance operations completed for catalog: {}", catalog.name()); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java new file mode 100644 index 0000000..4063774 --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package com.altinity.ice.rest.catalog.internal.maintenance; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OrphanFileScanner { + private static final Logger LOG = LoggerFactory.getLogger(OrphanFileScanner.class); + private final Table table; + + public OrphanFileScanner(Table table) { + this.table = table; + } + + private Set getAllKnownFiles() { + Set knownFiles = new HashSet<>(); + + for (Snapshot snapshot : table.snapshots()) { + // Manifest list file + if (snapshot.manifestListLocation() != null) { + knownFiles.add(snapshot.manifestListLocation()); + } + + // Manifest files + FileIO io = table.io(); + for (ManifestFile manifest : snapshot.dataManifests(io)) { + knownFiles.add(manifest.path()); + } + } + + return knownFiles; + } + + public Set findOrphanedFiles(String location, long olderThanMillis) throws IOException { + Set knownFiles = getAllKnownFiles(); + + FileIO fileIO = table.io(); + if (!(fileIO instanceof HadoopFileIO)) { + throw new IllegalArgumentException("Only HadoopFileIO supported in this implementation"); + } + + Configuration conf = ((HadoopFileIO) fileIO).conf(); + Path rootPath = new Path(location); + FileSystem fs = rootPath.getFileSystem(conf); + + RemoteIterator files = fs.listFiles(rootPath, true); + + long cutoffTime = System.currentTimeMillis() - olderThanMillis; + + Set orphanedFiles = new HashSet<>(); + + while (files.hasNext()) { + LocatedFileStatus status = files.next(); + long modTime = status.getModificationTime(); + + if (modTime < cutoffTime) { + String filePath = status.getPath().toString(); + if (!knownFiles.contains(filePath)) { + orphanedFiles.add(filePath); + } + } + } + + return orphanedFiles; + } + + public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOException { + String location = table.location(); + Set orphanedFiles = findOrphanedFiles(location, olderThanMillis); + + LOG.info("Found {} orphaned files at {}!", orphanedFiles.size(), location); + + if (orphanedFiles.isEmpty()) { + LOG.info("No orphaned files found at {}!", location); + return; + } + + if (dryRun) { + LOG.info("(Dry Run) Would delete {} orphaned files at {}!", orphanedFiles.size(), location); + orphanedFiles.forEach(f -> LOG.info("Orphaned file: {}", f)); + } else { + ExecutorService executor = Executors.newFixedThreadPool(8); + List> futures = + orphanedFiles.stream() + .map( + file -> + executor.submit( + () -> { + try { + table.io().deleteFile(file); + return file; + } catch (Exception e) { + LOG.warn("Failed to delete file {}", file, e); + return null; + } + })) + .collect(Collectors.toList()); + + executor.shutdown(); + + List deletedFiles = new ArrayList<>(); + for (Future future : futures) { + try { + String result = future.get(); + if (result != null) { + deletedFiles.add(result); + } + } catch (Exception e) { + LOG.error("Error during file deletion", e); + } + } + + LOG.info("Deleted {} orphaned files at {}!", deletedFiles.size(), location); + if (!deletedFiles.isEmpty()) { + deletedFiles.forEach(f -> LOG.info("Deleted: {}", f)); + } + + executor.shutdownNow(); + } + } +} From 483fdfae475f28f138e1c64717bed8aa075efa2e Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 23 Jun 2025 07:46:09 -0400 Subject: [PATCH 2/8] Added logic to scan for orphan files and remove them as part of maintenance schedule --- examples/scratch/.ice-rest-catalog.yaml | 2 ++ .../internal/maintenance/MaintenanceScheduler.java | 10 +++++++++- .../internal/maintenance/OrphanFileScanner.java | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index 51fe0e2..a619480 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -15,6 +15,8 @@ s3: bearerTokens: - value: foo +maintenanceSchedule: "every 2 mins" + anonymousAccess: enabled: true accessConfig: {} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 7cb6f10..49f5b9a 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -22,6 +22,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,8 +110,15 @@ public void performMaintenance() { for (TableIdentifier tableIdent : tables) { long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays); - Table table = catalog.loadTable(tableIdent); + // This throws a Location does not exist error. + Table table = null; + try { + table = catalog.loadTable(tableIdent); + } catch (NotFoundException ne) { + logger.warn("Table {} location not found, skipping maintenance", tableIdent, ne); + continue; + } // Check if table has any snapshots before performing maintenance if (table.currentSnapshot() == null) { logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java index 4063774..cb05886 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -58,6 +58,7 @@ private Set getAllKnownFiles() { return knownFiles; } + public Set findOrphanedFiles(String location, long olderThanMillis) throws IOException { Set knownFiles = getAllKnownFiles(); From 0214f290f2579fd7420c4e430fdec589f98f6245 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 23 Jun 2025 13:08:42 -0400 Subject: [PATCH 3/8] Replaced HadoopFileIO with S3FileIO --- examples/scratch/.ice-rest-catalog.yaml | 2 +- .../maintenance/OrphanFileScanner.java | 49 ++++++++----------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index a619480..55768d8 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -15,7 +15,7 @@ s3: bearerTokens: - value: foo -maintenanceSchedule: "every 2 mins" +maintenanceSchedule: "every 1 minutes" anonymousAccess: enabled: true diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java index cb05886..9230805 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -9,6 +9,7 @@ */ package com.altinity.ice.rest.catalog.internal.maintenance; +import com.altinity.ice.cli.internal.s3.S3; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -18,18 +19,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; public class OrphanFileScanner { private static final Logger LOG = LoggerFactory.getLogger(OrphanFileScanner.class); @@ -58,42 +56,37 @@ private Set getAllKnownFiles() { return knownFiles; } - public Set findOrphanedFiles(String location, long olderThanMillis) throws IOException { Set knownFiles = getAllKnownFiles(); - FileIO fileIO = table.io(); - if (!(fileIO instanceof HadoopFileIO)) { - throw new IllegalArgumentException("Only HadoopFileIO supported in this implementation"); - } + String bucket = location.replace("s3://", "").split("/")[0]; + String prefix = location.replace("s3://" + bucket + "/", ""); - Configuration conf = ((HadoopFileIO) fileIO).conf(); - Path rootPath = new Path(location); - FileSystem fs = rootPath.getFileSystem(conf); + S3Client s3 = S3.newClient(true); - RemoteIterator files = fs.listFiles(rootPath, true); + ListObjectsV2Request listRequest = + ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build(); - long cutoffTime = System.currentTimeMillis() - olderThanMillis; + Set allFiles = new HashSet<>(); - Set orphanedFiles = new HashSet<>(); + ListObjectsV2Response listResponse; + do { + listResponse = s3.listObjectsV2(listRequest); + listResponse.contents().forEach(obj -> allFiles.add("s3://" + bucket + "/" + obj.key())); + listRequest = + listRequest.toBuilder().continuationToken(listResponse.nextContinuationToken()).build(); + } while (listResponse.isTruncated()); - while (files.hasNext()) { - LocatedFileStatus status = files.next(); - long modTime = status.getModificationTime(); + // Set orphanedFiles = new HashSet<>(); - if (modTime < cutoffTime) { - String filePath = status.getPath().toString(); - if (!knownFiles.contains(filePath)) { - orphanedFiles.add(filePath); - } - } - } + allFiles.remove(knownFiles); - return orphanedFiles; + return allFiles; } public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOException { String location = table.location(); + LOG.info("Looking for Orphaned files in location {}", location); Set orphanedFiles = findOrphanedFiles(location, olderThanMillis); LOG.info("Found {} orphaned files at {}!", orphanedFiles.size(), location); From b3ee8d387bd7b8c51058a50931032b96101479fb Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 25 Jun 2025 17:28:13 -0400 Subject: [PATCH 4/8] Addressed PR review comments. --- .../maintenance/MaintenanceScheduler.java | 18 +++++++-- .../maintenance/OrphanFileScanner.java | 39 ++++++++++++------- 2 files changed, 40 insertions(+), 17 deletions(-) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 49f5b9a..f11fcc3 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -128,9 +128,21 @@ public void performMaintenance() { table.rewriteManifests().rewriteIf(manifest -> true).commit(); table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); - // Remove orphans. - OrphanFileScanner orphanFileScanner = new OrphanFileScanner(table); - orphanFileScanner.removeOrphanedFiles(olderThanMillis, false); + // Remove orphans only for S3-based tables + String tableLocation = table.location(); + if (tableLocation != null && tableLocation.startsWith("s3://")) { + OrphanFileScanner orphanFileScanner = new OrphanFileScanner(table); + try { + orphanFileScanner.removeOrphanedFiles(olderThanMillis, false); + } catch (Exception e) { + logger.warn("Failed to remove orphan files for table {}", tableIdent, e); + } + } else { + logger.debug( + "Skipping orphan file removal for non-S3 table: {} (location: {})", + tableIdent, + tableLocation); + } } } logger.info("Maintenance operations completed for catalog: {}", catalog.name()); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java index 9230805..7928dc9 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -9,7 +9,6 @@ */ package com.altinity.ice.rest.catalog.internal.maintenance; -import com.altinity.ice.cli.internal.s3.S3; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -19,9 +18,14 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; + +import com.altinity.ice.cli.internal.s3.S3; +import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +34,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; public class OrphanFileScanner { - private static final Logger LOG = LoggerFactory.getLogger(OrphanFileScanner.class); + private static final Logger logger = LoggerFactory.getLogger(OrphanFileScanner.class); private final Table table; public OrphanFileScanner(Table table) { @@ -50,6 +54,14 @@ private Set getAllKnownFiles() { FileIO io = table.io(); for (ManifestFile manifest : snapshot.dataManifests(io)) { knownFiles.add(manifest.path()); + // Add data files inside each manifest + try (CloseableIterable files = ManifestFiles.read(manifest, table.io())) { + for (DataFile dataFile : files) { + knownFiles.add(dataFile.path().toString()); + } + } catch (Exception e) { + logger.error("Error getting list of data files", e); + } } } @@ -79,26 +91,27 @@ public Set findOrphanedFiles(String location, long olderThanMillis) thro // Set orphanedFiles = new HashSet<>(); - allFiles.remove(knownFiles); + allFiles.removeAll(knownFiles); return allFiles; } public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOException { String location = table.location(); - LOG.info("Looking for Orphaned files in location {}", location); + logger.info("Looking for Orphaned files in location {}", location); Set orphanedFiles = findOrphanedFiles(location, olderThanMillis); - LOG.info("Found {} orphaned files at {}!", orphanedFiles.size(), location); + logger.info("Found {} orphaned files at {}!", orphanedFiles.size(), location); if (orphanedFiles.isEmpty()) { - LOG.info("No orphaned files found at {}!", location); + logger.info("No orphaned files found at {}!", location); return; } if (dryRun) { - LOG.info("(Dry Run) Would delete {} orphaned files at {}!", orphanedFiles.size(), location); - orphanedFiles.forEach(f -> LOG.info("Orphaned file: {}", f)); + logger.info( + "(Dry Run) Would delete {} orphaned files at {}!", orphanedFiles.size(), location); + orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f)); } else { ExecutorService executor = Executors.newFixedThreadPool(8); List> futures = @@ -111,7 +124,7 @@ public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOE table.io().deleteFile(file); return file; } catch (Exception e) { - LOG.warn("Failed to delete file {}", file, e); + logger.warn("Failed to delete file {}", file, e); return null; } })) @@ -127,16 +140,14 @@ public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOE deletedFiles.add(result); } } catch (Exception e) { - LOG.error("Error during file deletion", e); + logger.error("Error during file deletion", e); } } - LOG.info("Deleted {} orphaned files at {}!", deletedFiles.size(), location); + logger.info("Deleted {} orphaned files at {}!", deletedFiles.size(), location); if (!deletedFiles.isEmpty()) { - deletedFiles.forEach(f -> LOG.info("Deleted: {}", f)); + deletedFiles.forEach(f -> logger.info("Deleted: {}", f)); } - - executor.shutdownNow(); } } } From ac4455fb2ad3d3c51873ea00a9e3802b27089a55 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 3 Jul 2025 16:49:06 -0500 Subject: [PATCH 5/8] Created new config for orphanFileExpiration TTL. Added logic to retrieve snapshot metadata JSON files for orphan file deletion. --- .../com/altinity/ice/rest/catalog/Main.java | 5 +- .../rest/catalog/internal/config/Config.java | 3 + .../maintenance/MaintenanceScheduler.java | 13 +++- .../maintenance/OrphanFileScanner.java | 65 ++++++++++++------- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index feaab56..588981a 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -359,7 +359,10 @@ private void initializeMaintenanceScheduler(Catalog catalog, Config config) { try { MaintenanceScheduler scheduler = new MaintenanceScheduler( - catalog, config.maintenanceSchedule(), config.snapshotTTLInDays()); + catalog, + config.maintenanceSchedule(), + config.snapshotTTLInDays(), + config.orphanFileExpirationDays()); scheduler.startScheduledMaintenance(); logger.info( "Maintenance scheduler initialized with schedule: {}", config.maintenanceSchedule()); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java index 21012f5..49871b0 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java @@ -53,6 +53,7 @@ public record Config( "Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)") String maintenanceSchedule, @JsonPropertyDescription("TTL for snapshots in days.") int snapshotTTLInDays, + @JsonPropertyDescription("TTL for orphan files in days.") int orphanFileExpirationDays, @JsonPropertyDescription( "(experimental) Extra properties to include in loadTable REST response.") Map loadTableProperties, @@ -77,6 +78,7 @@ public Config( AnonymousAccess anonymousAccess, String maintenanceSchedule, int snapshotTTLInDays, + int orphanFileExpirationDays, Map loadTableProperties, @JsonProperty("iceberg") Map icebergProperties) { this.addr = Strings.orDefault(addr, DEFAULT_ADDR); @@ -91,6 +93,7 @@ public Config( Objects.requireNonNullElse(anonymousAccess, new AnonymousAccess(false, null)); this.maintenanceSchedule = maintenanceSchedule; this.snapshotTTLInDays = snapshotTTLInDays; + this.orphanFileExpirationDays = orphanFileExpirationDays; this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of()); this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of()); } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index f11fcc3..5b12cdc 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -37,13 +37,16 @@ public class MaintenanceScheduler { private ScheduledFuture currentTask; private final Integer snapshotExpirationDays; + private final Integer orphanFileExpirationDays; - public MaintenanceScheduler(Catalog catalog, String schedule, int snapshotExpirationDays) { + public MaintenanceScheduler( + Catalog catalog, String schedule, int snapshotExpirationDays, int orphanFileExpirationDays) { this.catalog = catalog; this.executor = new ScheduledThreadPoolExecutor(1); ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true); this.schedule = Schedule.parse(schedule); this.snapshotExpirationDays = snapshotExpirationDays; + this.orphanFileExpirationDays = orphanFileExpirationDays; } public void startScheduledMaintenance() { @@ -128,12 +131,18 @@ public void performMaintenance() { table.rewriteManifests().rewriteIf(manifest -> true).commit(); table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); + if (orphanFileExpirationDays == 0) { + logger.info("Skipping orphan file removal for table {}", tableIdent); + continue; + } + long orphanCutOffMillis = + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(orphanFileExpirationDays); // Remove orphans only for S3-based tables String tableLocation = table.location(); if (tableLocation != null && tableLocation.startsWith("s3://")) { OrphanFileScanner orphanFileScanner = new OrphanFileScanner(table); try { - orphanFileScanner.removeOrphanedFiles(olderThanMillis, false); + orphanFileScanner.removeOrphanedFiles(orphanCutOffMillis, false); } catch (Exception e) { logger.warn("Failed to remove orphan files for table {}", tableIdent, e); } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java index 7928dc9..548ab50 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -9,6 +9,7 @@ */ package com.altinity.ice.rest.catalog.internal.maintenance; +import com.altinity.ice.internal.iceberg.io.SchemeFileIO; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -18,20 +19,19 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.Collectors; - -import com.altinity.ice.cli.internal.s3.S3; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; public class OrphanFileScanner { private static final Logger logger = LoggerFactory.getLogger(OrphanFileScanner.class); @@ -41,26 +41,39 @@ public OrphanFileScanner(Table table) { this.table = table; } - private Set getAllKnownFiles() { + private Set getAllKnownFiles() throws IOException { Set knownFiles = new HashSet<>(); for (Snapshot snapshot : table.snapshots()) { - // Manifest list file if (snapshot.manifestListLocation() != null) { knownFiles.add(snapshot.manifestListLocation()); } - // Manifest files FileIO io = table.io(); + + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + + String currentMetadataFile = meta.metadataFileLocation(); + // Current metadata json file + knownFiles.add(currentMetadataFile); + + // All the previous metadata JSON(is there a chance there might be + // json files that are not physically present?. + for (TableMetadata.MetadataLogEntry previousFile : meta.previousFiles()) { + knownFiles.add(previousFile.file()); + } + for (ManifestFile manifest : snapshot.dataManifests(io)) { knownFiles.add(manifest.path()); + // Add data files inside each manifest try (CloseableIterable files = ManifestFiles.read(manifest, table.io())) { for (DataFile dataFile : files) { knownFiles.add(dataFile.path().toString()); } } catch (Exception e) { - logger.error("Error getting list of data files", e); + throw e; } } } @@ -71,25 +84,23 @@ private Set getAllKnownFiles() { public Set findOrphanedFiles(String location, long olderThanMillis) throws IOException { Set knownFiles = getAllKnownFiles(); - String bucket = location.replace("s3://", "").split("/")[0]; - String prefix = location.replace("s3://" + bucket + "/", ""); - - S3Client s3 = S3.newClient(true); + FileIO tableIO = table.io(); - ListObjectsV2Request listRequest = - ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build(); + SchemeFileIO schemeFileIO; + if (tableIO instanceof SchemeFileIO) { + schemeFileIO = (SchemeFileIO) tableIO; + } else { + throw new UnsupportedOperationException("SchemeFileIO is required for S3 locations"); + } Set allFiles = new HashSet<>(); - ListObjectsV2Response listResponse; - do { - listResponse = s3.listObjectsV2(listRequest); - listResponse.contents().forEach(obj -> allFiles.add("s3://" + bucket + "/" + obj.key())); - listRequest = - listRequest.toBuilder().continuationToken(listResponse.nextContinuationToken()).build(); - } while (listResponse.isTruncated()); - - // Set orphanedFiles = new HashSet<>(); + Iterable fileInfos = schemeFileIO.listPrefix(location); + for (FileInfo fileInfo : fileInfos) { + if (fileInfo.createdAtMillis() > olderThanMillis) { + allFiles.add(fileInfo.location()); + } + } allFiles.removeAll(knownFiles); @@ -113,7 +124,10 @@ public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOE "(Dry Run) Would delete {} orphaned files at {}!", orphanedFiles.size(), location); orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f)); } else { - ExecutorService executor = Executors.newFixedThreadPool(8); + + int numThreads = Math.min(8, orphanedFiles.size()); + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); List> futures = orphanedFiles.stream() .map( @@ -141,6 +155,7 @@ public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOE } } catch (Exception e) { logger.error("Error during file deletion", e); + return; } } From f5b068f99dfea876936dbafd0d3c8609eb512403 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 3 Jul 2025 17:54:28 -0500 Subject: [PATCH 6/8] Fixed info message. --- .../catalog/internal/maintenance/MaintenanceScheduler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 5b12cdc..2d39749 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -132,7 +132,9 @@ public void performMaintenance() { table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); if (orphanFileExpirationDays == 0) { - logger.info("Skipping orphan file removal for table {}", tableIdent); + logger.info( + "Skipping orphan file removal for table {} since orphanFileExpirationDays config was not set", + tableIdent); continue; } long orphanCutOffMillis = From c891d389f09bd415f0e43fe7817e7a343f62903d Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 3 Jul 2025 19:16:23 -0500 Subject: [PATCH 7/8] Fixed info message. --- examples/scratch/.ice-rest-catalog.yaml | 4 ++-- .../rest/catalog/internal/maintenance/OrphanFileScanner.java | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index 55768d8..c3898a6 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -15,8 +15,8 @@ s3: bearerTokens: - value: foo -maintenanceSchedule: "every 1 minutes" - +maintenanceSchedule: "every 1 minutes" +orphanFileExpirationDays: 1 anonymousAccess: enabled: true accessConfig: {} diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java index 548ab50..32fcd7b 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanFileScanner.java @@ -97,7 +97,7 @@ public Set findOrphanedFiles(String location, long olderThanMillis) thro Iterable fileInfos = schemeFileIO.listPrefix(location); for (FileInfo fileInfo : fileInfos) { - if (fileInfo.createdAtMillis() > olderThanMillis) { + if (fileInfo.createdAtMillis() < olderThanMillis) { allFiles.add(fileInfo.location()); } } @@ -113,6 +113,8 @@ public void removeOrphanedFiles(long olderThanMillis, boolean dryRun) throws IOE Set orphanedFiles = findOrphanedFiles(location, olderThanMillis); logger.info("Found {} orphaned files at {}!", orphanedFiles.size(), location); + // log all the orphaned files + orphanedFiles.forEach(f -> logger.info("Orphaned file: {}", f)); if (orphanedFiles.isEmpty()) { logger.info("No orphaned files found at {}!", location); From 68d27a4cee0268db130232376fc896fcd2b8545a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Thu, 10 Jul 2025 13:14:31 -0500 Subject: [PATCH 8/8] set default orphan expiration days to 10 --- .../altinity/ice/rest/catalog/internal/config/Config.java | 6 +++++- .../catalog/internal/maintenance/MaintenanceScheduler.java | 6 ------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java index 49871b0..2923ab3 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java @@ -64,6 +64,7 @@ public record Config( private static final String DEFAULT_ADDR = "0.0.0.0:5000"; private static final String DEFAULT_DEBUG_ADDR = "0.0.0.0:5001"; + private static final int DEFAULT_ORPHAN_FILE_EXPIRATION_DAYS = 10; @JsonCreator public Config( @@ -93,7 +94,10 @@ public Config( Objects.requireNonNullElse(anonymousAccess, new AnonymousAccess(false, null)); this.maintenanceSchedule = maintenanceSchedule; this.snapshotTTLInDays = snapshotTTLInDays; - this.orphanFileExpirationDays = orphanFileExpirationDays; + this.orphanFileExpirationDays = + orphanFileExpirationDays == 0 + ? DEFAULT_ORPHAN_FILE_EXPIRATION_DAYS + : orphanFileExpirationDays; this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of()); this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of()); } diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java index 2d39749..00f9103 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -131,12 +131,6 @@ public void performMaintenance() { table.rewriteManifests().rewriteIf(manifest -> true).commit(); table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); - if (orphanFileExpirationDays == 0) { - logger.info( - "Skipping orphan file removal for table {} since orphanFileExpirationDays config was not set", - tableIdent); - continue; - } long orphanCutOffMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(orphanFileExpirationDays); // Remove orphans only for S3-based tables