Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ with the `retention_threshold` parameter.
`expire_snapshots` can be run as follows:

```sql
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d');
ALTER TABLE test_table EXECUTE expire_snapshots(retention_threshold => '7d', delete_files => true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to understand why this is needed. We don't see this in https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots
What is the "more efficient out-of-band" way of doing deletes ? Is there something can we improve about how we execute deletes in the current implementation to avoid the need to have a flag to disable it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to understand why this is needed. We don't see this in https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots

So a few quick reasons:

  1. To meet data retention policy requirements.
  2. Reduce coordinator work load pressure.
  3. Flexibility in how table maintenance is performed.
  4. expire_snapshots execution speed.

The main use case I am targeting is speeding up execution of expire_snapshots for a large table(s).

For a small number of tables, with reasonable amount of data, there isn't a ton of benefit (hence the default is true). But I have found at certain scales (i.e. - petabytes of data and or trillions of rows), the call to expire_snapshots can take an extremely long time; most of the time taken spent on the deletion of files.

Additionally, depending on how many tables are being curated concurrently, there can be a fair bit of pressure on the coordinator.

With this parameter, we have more control over when we actually delete the underlying data while still benefiting from minimizing metadata for query planning.

What is the "more efficient out-of-band" way of doing deletes ?

What is "more efficient" depends on underlying storage, but a simple example would be leveraging S3 bucket lifecycle policies. Assuming you had a time based retention policy for data in a table, the lifecycle policy would be a convenient and more efficient way to delete the files.

Is there something can we improve about how we execute deletes in the current implementation to avoid the need to have a flag to disable it ?

Parallelizing/distributing the deletes would certainly be an improvement. I believe the current implementation is single threaded.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming you had a time based retention policy for data in a table, the lifecycle policy would be a convenient and more efficient way to delete the files.

If we want to rely on lifecycle policy then we need to "soft delete" the files by adding a deleted tag to the files which the lifecycle policy can then act on. Relying on just time based policy has the danger of either deleting live files or keeping unnecessary files around for too long. Its not easy to keep the expiration interval of the table always aligned with a time based lifecycle policy.

Parallelizing/distributing the deletes would certainly be an improvement. I believe the current implementation is single threaded.

Why don't we use a threadpool in org.apache.iceberg.ExpireSnapshots#executeDeleteWith to use more parallelism in deletes ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to rely on lifecycle policy then we need to "soft delete" the files by adding a deleted tag to the files which the lifecycle policy can then act on. Relying on just time based policy has the danger of either deleting live files or keeping unnecessary files around for too long. Its not easy to keep the expiration interval of the table always aligned with a time based lifecycle policy.

Yeah, definitely some nuance I intentionally didn't touch on but in principal the point remains.

Why don't we use a threadpool in org.apache.iceberg.ExpireSnapshots#executeDeleteWith to use more parallelism in deletes ?

I think we should but the property still adds value for points 1, 2, and 3. I am also happy to throw something together to use a threadpool as well.

```

The value for `retention_threshold` must be higher than or equal to
Expand All @@ -893,6 +893,10 @@ procedure fails with a similar message: `Retention specified (1.00d) is shorter
than the minimum retention configured in the system (7.00d)`. The default value
for this property is `7d`.

The value for `delete_files` can be `true` or `false`. When set to `false`
files associated with the expired snapshots will NOT be deleted. The default
value for this property is `true`.

(iceberg-remove-orphan-files)=
##### remove_orphan_files

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ public class IcebergMetadata
private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
private static final String RETENTION_THRESHOLD = "retention_threshold";
private static final String DELETE_FILES = "delete_files";
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.<String>builder()
.add(EXTRA_PROPERTIES_PROPERTY)
Expand Down Expand Up @@ -1746,12 +1747,13 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForDropExtendedStats
private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
boolean deleteFiles = (boolean) executeProperties.get(DELETE_FILES);
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
EXPIRE_SNAPSHOTS,
new IcebergExpireSnapshotsHandle(retentionThreshold),
new IcebergExpireSnapshotsHandle(retentionThreshold, deleteFiles),
icebergTable.location(),
icebergTable.io().properties()));
}
Expand Down Expand Up @@ -2208,6 +2210,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut

// ForwardingFileIo handles bulk operations so no separate function implementation is needed
table.expireSnapshots()
.cleanExpiredFiles(expireSnapshotsHandle.deleteFiles())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this make the table harder to clean up in the future, since the first maintenance task for removing expired snapshots is always slow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure I am following what you mean, could clarify?

.expireOlderThan(session.getStart().toEpochMilli() - retention.toMillis())
.planWith(icebergScanExecutor)
.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;

public class ExpireSnapshotsTableProcedure
implements Provider<TableProcedureMetadata>
Expand All @@ -36,6 +37,11 @@ public TableProcedureMetadata get()
"retention_threshold",
"Only snapshots older than threshold should be removed",
Duration.valueOf("7d"),
false),
booleanProperty(
"delete_files",
"Delete underlying files associated to the expired snapshot(s)",
true,
false)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static java.util.Objects.requireNonNull;

public record IcebergExpireSnapshotsHandle(Duration retentionThreshold)
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, boolean deleteFiles)
implements IcebergProcedureHandle
{
public IcebergExpireSnapshotsHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6549,6 +6549,44 @@ public void testExpireSnapshotsPartitionedTable()
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
}

@Test
public void testExpireSnapshotsKeepFiles()
throws Exception
{
try (TestTable table = newTrinoTable("test_expiring_snapshots_", "(key varchar, value integer)")) {
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
assertUpdate("INSERT INTO " + table.getName() + " VALUES ('one', 1)", 1);
assertUpdate("INSERT INTO " + table.getName() + " VALUES ('two', 2)", 1);
List<String> initialFiles = getAllDataFilesFromTableDirectory(table.getName());

assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");

List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(table.getName());
assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles);
assertThat(query("SELECT sum(value), listagg(key, ' ') WITHIN GROUP (ORDER BY key) FROM " + table.getName()))
.matches("VALUES (BIGINT '3', VARCHAR 'one two')");
}
}

@Test
public void testExpireSnapshotsPartitionedTableKeepFiles()
throws Exception
{
try (TestTable table = newTrinoTable("test_expiring_snapshots_partitioned_table", "(col1 BIGINT, col2 BIGINT) WITH (partitioning = ARRAY['col1'])")) {
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
assertUpdate("INSERT INTO " + table.getName() + " VALUES(1, 100), (1, 101), (1, 102), (2, 200), (2, 201), (3, 300)", 6);
assertUpdate("DELETE FROM " + table.getName() + " WHERE col1 = 1", 3);
assertUpdate("INSERT INTO " + table.getName() + " VALUES(4, 400)", 1);
List<String> initialFiles = getAllDataFilesFromTableDirectory(table.getName());

assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s', delete_files => false)");

List<String> updatedDataFiles = getAllDataFilesFromTableDirectory(table.getName());
assertThat(updatedDataFiles).containsExactlyInAnyOrderElementsOf(initialFiles);
assertQuery("SELECT sum(col2) FROM " + table.getName(), "SELECT 1101");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment why the result is 1101 -- it's not obvious otherwise considering using the SELECT col2 .. to check each row

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same test as testExpireSnapshotsPartitionedTable. It also feels a redundant to list out and explain each sql query that was executed to reach this value. Could this be done in a follow up?

}
}

@Test
public void testExpireSnapshotsOnSnapshot()
{
Expand Down Expand Up @@ -6583,7 +6621,7 @@ public void testExplainExpireSnapshotOutput()
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);

assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')",
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s].*");
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s, deleteFiles=true].*");
}

@Test
Expand Down