Skip to content

Commit 26fdfd2

Browse files
committed
Allow skipping of data deletion in expire_snapshots
1 parent e933971 commit 26fdfd2

File tree

5 files changed

+52
-4
lines changed

5 files changed

+52
-4
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ with the `retention_threshold` parameter.
881881
`expire_snapshots` can be run as follows:
882882

883883
```sql
884-
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d');
884+
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d', delete_files => true);
885885
```
886886

887887
The value for `retention_threshold` must be higher than or equal to
@@ -890,6 +890,10 @@ procedure fails with a similar message: `Retention specified (1.00d) is shorter
890890
than the minimum retention configured in the system (7.00d)`. The default value
891891
for this property is `7d`.
892892

893+
The value for `delete_files` can be `true` or `false`. When set to `false`
894+
files associated with the expired snapshots will NOT be deleted. The default
895+
value for this property is `true`.
896+
893897
(iceberg-remove-orphan-files)=
894898
##### remove_orphan_files
895899

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ public class IcebergMetadata
416416
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
417417
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
418418
private static final String RETENTION_THRESHOLD = "retention_threshold";
419+
private static final String DELETE_FILES = "delete_files";
419420
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
420421
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.<String>builder()
421422
.add(EXTRA_PROPERTIES_PROPERTY)
@@ -1721,12 +1722,13 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForDropExtendedStats
17211722
private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
17221723
{
17231724
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
1725+
boolean deleteFiles = (boolean) executeProperties.get(DELETE_FILES);
17241726
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
17251727

17261728
return Optional.of(new IcebergTableExecuteHandle(
17271729
tableHandle.getSchemaTableName(),
17281730
EXPIRE_SNAPSHOTS,
1729-
new IcebergExpireSnapshotsHandle(retentionThreshold),
1731+
new IcebergExpireSnapshotsHandle(retentionThreshold, deleteFiles),
17301732
icebergTable.location(),
17311733
icebergTable.io().properties()));
17321734
}
@@ -2198,6 +2200,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21982200
table.expireSnapshots()
21992201
.expireOlderThan(expireTimestampMillis)
22002202
.deleteWith(deleteFunction)
2203+
.cleanExpiredFiles(expireSnapshotsHandle.deleteFiles())
22012204
.commit();
22022205

22032206
fileSystem.deleteFiles(pathsToDelete);

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
2222
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
2323
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
24+
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
2425

2526
public class ExpireSnapshotsTableProcedure
2627
implements Provider<TableProcedureMetadata>
@@ -36,6 +37,11 @@ public TableProcedureMetadata get()
3637
"retention_threshold",
3738
"Only snapshots older than threshold should be removed",
3839
Duration.valueOf("7d"),
40+
false),
41+
booleanProperty(
42+
"delete_files",
43+
"Delete underlying files associated to the expired snapshot(s)",
44+
true,
3945
false)));
4046
}
4147
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import static java.util.Objects.requireNonNull;
1919

20-
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold)
20+
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, boolean deleteFiles)
2121
implements IcebergProcedureHandle
2222
{
2323
public IcebergExpireSnapshotsHandle

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6550,6 +6550,41 @@ public void testExpireSnapshotsPartitionedTable()
65506550
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
65516551
}
65526552

6553+
@Test
6554+
public void testExpireSnapshotsKeepFiles()
6555+
throws Exception
6556+
{
6557+
String tableName = "test_expiring_snapshots_" + randomNameSuffix();
6558+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6559+
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
6560+
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
6561+
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
6562+
List<String> initialFiles = getAllDataFilesFromTableDirectory(tableName);
6563+
6564+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");
6565+
6566+
List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(tableName);
6567+
assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles);
6568+
}
6569+
6570+
@Test
6571+
public void testExpireSnapshotsPartitionedTableKeepFiles()
6572+
throws Exception
6573+
{
6574+
String tableName = "test_expiring_snapshots_partitioned_table" + randomNameSuffix();
6575+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6576+
assertUpdate("CREATE TABLE " + tableName + " (col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])");
6577+
assertUpdate("INSERT INTO " + tableName + " VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6);
6578+
assertUpdate("DELETE FROM " + tableName + " WHERE col1 = 1", 3);
6579+
assertUpdate("INSERT INTO " + tableName + " VALUES(4, 400)", 1);
6580+
List<String> initialFiles = getAllDataFilesFromTableDirectory(tableName);
6581+
6582+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");
6583+
6584+
List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(tableName);
6585+
assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles);
6586+
}
6587+
65536588
@Test
65546589
public void testExpireSnapshotsOnSnapshot()
65556590
{
@@ -6584,7 +6619,7 @@ public void testExplainExpireSnapshotOutput()
65846619
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
65856620

65866621
assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')",
6587-
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s].*");
6622+
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s, deleteFiles=true].*");
65886623
}
65896624

65906625
@Test

0 commit comments

Comments
 (0)