Skip to content

Commit aa5894a

Browse files
authored
Core: Load snapshot after it has been committed to prevent accidental cleanup of files (apache#15511)
1 parent 3af95aa commit aa5894a

File tree

4 files changed

+92
-19
lines changed

4 files changed

+92
-19
lines changed

core/src/main/java/org/apache/iceberg/SnapshotProducer.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import java.util.UUID;
4545
import java.util.concurrent.ExecutorService;
4646
import java.util.concurrent.atomic.AtomicInteger;
47-
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.concurrent.atomic.AtomicLong;
4848
import java.util.concurrent.atomic.AtomicReferenceArray;
4949
import java.util.function.Consumer;
5050
import java.util.function.Function;
@@ -455,8 +455,8 @@ protected TableMetadata refresh() {
455455
@Override
456456
@SuppressWarnings("checkstyle:CyclomaticComplexity")
457457
public void commit() {
458-
// this is always set to the latest commit attempt's snapshot
459-
AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
458+
// this is always set to the latest commit attempt's snapshot id.
459+
AtomicLong newSnapshotId = new AtomicLong(-1L);
460460
try (Timed ignore = commitMetrics().totalDuration().start()) {
461461
try {
462462
Tasks.foreach(ops)
@@ -471,7 +471,7 @@ public void commit() {
471471
.run(
472472
taskOps -> {
473473
Snapshot newSnapshot = apply();
474-
stagedSnapshot.set(newSnapshot);
474+
newSnapshotId.set(newSnapshot.snapshotId());
475475
TableMetadata.Builder update = TableMetadata.buildFrom(base);
476476
if (base.snapshot(newSnapshot.snapshotId()) != null) {
477477
// this is a rollback operation
@@ -509,22 +509,29 @@ public void commit() {
509509
throw e;
510510
}
511511

512-
// at this point, the commit must have succeeded so the stagedSnapshot is committed
513-
Snapshot committedSnapshot = stagedSnapshot.get();
514512
try {
515-
LOG.info(
516-
"Committed snapshot {} ({})",
517-
committedSnapshot.snapshotId(),
518-
getClass().getSimpleName());
513+
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());
514+
515+
// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
516+
// id in case another commit was added between this commit and the refresh.
517+
// it might not be known which commit attempt succeeded in some cases, so this only cleans
518+
// up the one that actually did succeed.
519+
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
520+
if (saved != null) {
521+
if (cleanupAfterCommit()) {
522+
cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
523+
}
519524

520-
if (cleanupAfterCommit()) {
521-
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
522-
}
523-
// also clean up unused manifest lists created by multiple attempts
524-
for (String manifestList : manifestLists) {
525-
if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
526-
deleteFile(manifestList);
525+
// also clean up unused manifest lists created by multiple attempts
526+
for (String manifestList : manifestLists) {
527+
if (!saved.manifestListLocation().equals(manifestList)) {
528+
deleteFile(manifestList);
529+
}
527530
}
531+
} else {
532+
// saved may not be present if the latest metadata couldn't be loaded due to eventual
533+
// consistency problems in refresh. in that case, don't clean up.
534+
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
528535
}
529536
} catch (Throwable e) {
530537
LOG.warn(

core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import static org.assertj.core.api.Assertions.assertThat;
2323
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2424

25+
import java.io.File;
2526
import java.io.IOException;
27+
import java.nio.file.Paths;
2628
import java.util.List;
2729
import java.util.stream.Collectors;
2830
import javax.annotation.Nonnull;
@@ -158,4 +160,68 @@ public void testCommitValidationWithCustomSummaryProperties() throws IOException
158160
// Verify the table wasn't updated
159161
assertThat(table.snapshots()).hasSize(1);
160162
}
163+
164+
@TestTemplate
165+
public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit() {
166+
// Uses a custom TableOps that returns stale metadata (without the new snapshot) on the
167+
// first refresh() after commit, simulating eventual consistency. Verifies that commit succeeds
168+
// and that the committed data is visible once the table is refreshed again
169+
String tableName = "stale-table-on-first-refresh";
170+
TestTables.TestTableOperations ops = opsWithStaleRefreshAfterCommit(tableName, tableDir);
171+
TestTables.TestTable tableWithStaleRefresh =
172+
TestTables.create(
173+
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, ops);
174+
175+
// the first refresh() after the commit will return stale metadata (without this snapshot), so
176+
// SnapshotProducer will skip cleanup to avoid accidentally deleting files that are part of the
177+
// committed snapshot but commit still succeeds
178+
tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit();
179+
180+
// Refresh again to get the real metadata; the snapshot must be visible now
181+
tableWithStaleRefresh.ops().refresh();
182+
Snapshot snapshot = tableWithStaleRefresh.currentSnapshot();
183+
assertThat(snapshot)
184+
.as("Committed snapshot must be visible after refresh (eventual consistency resolved)")
185+
.isNotNull();
186+
187+
File metadata = Paths.get(tableDir.getPath(), "metadata").toFile();
188+
assertThat(snapshot.allManifests(tableWithStaleRefresh.io()))
189+
.isNotEmpty()
190+
.allSatisfy(
191+
manifest -> assertThat(metadata.listFiles()).contains(new File(manifest.path())));
192+
}
193+
194+
/**
195+
* Creates a TableOperations that returns stale metadata (without the newly committed snapshot) on
196+
* the first refresh() after a commit. This simulates eventual consistency where the committed
197+
* snapshot is not yet visible. Used to verify that when the snapshot cannot be loaded after
198+
* commit, cleanup is skipped to avoid accidentally deleting files that are part of the committed
199+
* snapshot.
200+
*/
201+
private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit(
202+
String name, File location) {
203+
return new TestTables.TestTableOperations(name, location) {
204+
private TableMetadata metadataToReturnOnNextRefresh;
205+
206+
@Override
207+
public void commit(TableMetadata base, TableMetadata updatedMetadata) {
208+
super.commit(base, updatedMetadata);
209+
if (base != null) {
210+
// return stale metadata on the first refresh() call
211+
this.metadataToReturnOnNextRefresh = base;
212+
}
213+
}
214+
215+
@Override
216+
public TableMetadata refresh() {
217+
if (metadataToReturnOnNextRefresh != null) {
218+
this.current = metadataToReturnOnNextRefresh;
219+
this.metadataToReturnOnNextRefresh = null;
220+
return current;
221+
}
222+
223+
return super.refresh();
224+
}
225+
};
226+
}
161227
}

core/src/test/java/org/apache/iceberg/TestTables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public static class TestTableOperations implements TableOperations {
276276
private final String tableName;
277277
private final File metadata;
278278
private final FileIO fileIO;
279-
private TableMetadata current = null;
279+
protected TableMetadata current = null;
280280
private long lastSnapshotId = 0;
281281
private int failCommits = 0;
282282

core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3659,7 +3659,7 @@ public void testNumLoadTableCallsForMergeAppend() {
36593659
table.newAppend().appendFile(FILE_A).commit();
36603660

36613661
// loadTable is executed once
3662-
Mockito.verify(adapter)
3662+
Mockito.verify(adapter, times(2))
36633663
.execute(matches(HTTPMethod.GET, RESOURCE_PATHS.table(TABLE)), any(), any(), any());
36643664

36653665
// CommitReport reflects the table state after the commit

0 commit comments

Comments
 (0)