From 36d420a4cfd2366d315e7e7b3530442daa70e329 Mon Sep 17 00:00:00 2001 From: Tony Baeg Date: Wed, 16 Jul 2025 00:28:57 -0400 Subject: [PATCH] Allow skipping of data deletion in expire_snapshots --- docs/src/main/sphinx/connector/iceberg.md | 6 ++- .../trino/plugin/iceberg/IcebergMetadata.java | 5 ++- .../ExpireSnapshotsTableProcedure.java | 6 +++ .../IcebergExpireSnapshotsHandle.java | 2 +- .../iceberg/BaseIcebergConnectorTest.java | 40 ++++++++++++++++++- 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 94915bbb169e..d68ea41612e2 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -884,7 +884,7 @@ with the `retention_threshold` parameter. `expire_snapshots` can be run as follows: ```sql -ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d'); +ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d', delete_files => true); ``` The value for `retention_threshold` must be higher than or equal to @@ -893,6 +893,10 @@ procedure fails with a similar message: `Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)`. The default value for this property is `7d`. +The value for `delete_files` can be `true` or `false`. When set to `false` +files associated with the expired snapshots will NOT be deleted. The default +value for this property is `true`. + (iceberg-remove-orphan-files)= ##### remove_orphan_files diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 87d7424aba68..df9498df38c3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -437,6 +437,7 @@ public class IcebergMetadata private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2; private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2; private static final String RETENTION_THRESHOLD = "retention_threshold"; + private static final String DELETE_FILES = "delete_files"; private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN"; public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.builder() .add(EXTRA_PROPERTIES_PROPERTY) @@ -1746,12 +1747,13 @@ private Optional getTableHandleForDropExtendedStats private Optional getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) { Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD); + boolean deleteFiles = (boolean) executeProperties.get(DELETE_FILES); Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); return Optional.of(new IcebergTableExecuteHandle( tableHandle.getSchemaTableName(), EXPIRE_SNAPSHOTS, - new IcebergExpireSnapshotsHandle(retentionThreshold), + new IcebergExpireSnapshotsHandle(retentionThreshold, deleteFiles), icebergTable.location(), icebergTable.io().properties())); } @@ -2208,6 +2210,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut // ForwardingFileIo handles bulk operations so no separate function implementation is needed table.expireSnapshots() + .cleanExpiredFiles(expireSnapshotsHandle.deleteFiles()) .expireOlderThan(session.getStart().toEpochMilli() - retention.toMillis()) .planWith(icebergScanExecutor) .commit(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java index 1aa84ea92887..a56e00fe2824 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java @@ -21,6 +21,7 @@ import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.booleanProperty; public class ExpireSnapshotsTableProcedure implements Provider @@ -36,6 +37,11 @@ public TableProcedureMetadata get() "retention_threshold", "Only snapshots older than threshold should be removed", Duration.valueOf("7d"), + false), + booleanProperty( + "delete_files", + "Delete underlying files associated to the expired snapshot(s)", + true, false))); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java index 0757a82d66e3..c52a9c7f891a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java @@ -17,7 +17,7 @@ import static java.util.Objects.requireNonNull; -public record IcebergExpireSnapshotsHandle(Duration retentionThreshold) +public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, boolean deleteFiles) implements IcebergProcedureHandle { public IcebergExpireSnapshotsHandle diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index bff0be542839..3811d4aadbcb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6549,6 +6549,44 @@ public void testExpireSnapshotsPartitionedTable() assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); } + @Test + public void testExpireSnapshotsKeepFiles() + throws Exception + { + try (TestTable table = newTrinoTable("test_expiring_snapshots_", "(key varchar, value integer)")) { + Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); + assertUpdate("INSERT INTO " + table.getName() + " VALUES ('one', 1)", 1); + assertUpdate("INSERT INTO " + table.getName() + " VALUES ('two', 2)", 1); + List initialFiles = getAllDataFilesFromTableDirectory(table.getName()); + + assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)"); + + List updatedDataFiles = getAllDataFilesFromTableDirectory(table.getName()); + assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles); + assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + table.getName())) + .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); + } + } + + @Test + public void testExpireSnapshotsPartitionedTableKeepFiles() + throws Exception + { + try (TestTable table = newTrinoTable("test_expiring_snapshots_partitioned_table", "(col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])")) { + Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); + assertUpdate("INSERT INTO " + table.getName() + " VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6); + assertUpdate("DELETE FROM " + table.getName() + " WHERE col1 = 1", 3); + assertUpdate("INSERT INTO " + table.getName() + " VALUES(4, 400)", 1); + List initialFiles = getAllDataFilesFromTableDirectory(table.getName()); + + assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)"); + + List updatedDataFiles = getAllDataFilesFromTableDirectory(table.getName()); + assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles); + assertQuery("SELECT sum(col2) FROM " + table.getName(), "SELECT 1101"); + } + } + @Test public void testExpireSnapshotsOnSnapshot() { @@ -6583,7 +6621,7 @@ public void testExplainExpireSnapshotOutput() assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')", - "SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s].*"); + "SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s, deleteFiles=true].*"); } @Test