Skip to content

Commit 35912d2

Browse files
singhpk234RussellSpitzer
authored andcommitted
Core: Avoid table corruption from 409 on self conflicts after 5xx retries by throwing CommitStateUnknown (apache#12818)
1 parent f40208a commit 35912d2

File tree

6 files changed

+95
-16
lines changed

6 files changed

+95
-16
lines changed

core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,19 @@ public void accept(ErrorResponse error) {
9191
case 404:
9292
throw new NoSuchTableException("%s", error.message());
9393
case 409:
94-
throw new CommitFailedException("Commit failed: %s", error.message());
94+
if (error.wasRetried()) {
95+
// If a retried request finally fails with 409,
96+
// the IRC service may have persisted the commit
97+
// despite initial 5xx errors, resulting in a self-conflict on retry
98+
// due to the base changing.
99+
// Mark this failure as commit state unknown rather than failed to prevent file cleanup.
100+
throw new CommitStateUnknownException(
101+
new RESTException(
102+
"Commit status unknown, due to retries: %s: %s",
103+
error.code(), error.message()));
104+
} else {
105+
throw new CommitFailedException("Commit failed: %s", error.message());
106+
}
95107
case 500:
96108
case 502:
97109
case 504:

core/src/main/java/org/apache/iceberg/rest/ExponentialHttpRequestRetryStrategy.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,20 @@ public boolean retryRequest(
118118
}
119119

120120
// Retry if the request is considered idempotent
121-
return Method.isIdempotent(request.getMethod());
121+
boolean shouldRetry = Method.isIdempotent(request.getMethod());
122+
if (shouldRetry && context != null) {
123+
context.setAttribute("was-retried", Boolean.TRUE);
124+
}
125+
return shouldRetry;
122126
}
123127

124128
@Override
125129
public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
126-
return execCount <= maxRetries && retriableCodes.contains(response.getCode());
130+
boolean shouldRetry = execCount <= maxRetries && retriableCodes.contains(response.getCode());
131+
if (shouldRetry && context != null) {
132+
context.setAttribute("was-retried", Boolean.TRUE);
133+
}
134+
return shouldRetry;
127135
}
128136

129137
@Override

core/src/main/java/org/apache/iceberg/rest/HTTPClient.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog;
4646
import org.apache.hc.core5.http.io.entity.EntityUtils;
4747
import org.apache.hc.core5.http.io.entity.StringEntity;
48+
import org.apache.hc.core5.http.protocol.BasicHttpContext;
49+
import org.apache.hc.core5.http.protocol.HttpContext;
4850
import org.apache.hc.core5.io.CloseMode;
4951
import org.apache.iceberg.IcebergBuild;
5052
import org.apache.iceberg.exceptions.RESTException;
@@ -175,7 +177,10 @@ private static ErrorResponse buildDefaultErrorResponse(CloseableHttpResponse res
175177
// Process a failed response through the provided errorHandler, and throw a RESTException if the
176178
// provided error handler doesn't already throw.
177179
private static void throwFailure(
178-
CloseableHttpResponse response, String responseBody, Consumer<ErrorResponse> errorHandler) {
180+
CloseableHttpResponse response,
181+
String responseBody,
182+
Consumer<ErrorResponse> errorHandler,
183+
Object wasRetried) {
179184
ErrorResponse errorResponse = null;
180185

181186
if (responseBody != null) {
@@ -212,10 +217,19 @@ private static void throwFailure(
212217
errorResponse = buildDefaultErrorResponse(response);
213218
}
214219

215-
errorHandler.accept(errorResponse);
220+
ErrorResponse enrichedErrorResponse =
221+
ErrorResponse.builder()
222+
.wasRetried(wasRetried == Boolean.TRUE)
223+
.responseCode(errorResponse.code())
224+
.withMessage(errorResponse.message())
225+
.withType(errorResponse.type())
226+
.withStackTrace(errorResponse.stack())
227+
.build();
228+
229+
errorHandler.accept(enrichedErrorResponse);
216230

217231
// Throw an exception in case the provided error handler does not throw.
218-
throw new RESTException("Unhandled error: %s", errorResponse);
232+
throw new RESTException("Unhandled error: %s", enrichedErrorResponse);
219233
}
220234

221235
@Override
@@ -278,7 +292,8 @@ protected <T extends RESTResponse> T execute(
278292
request.setEntity(new StringEntity(encodedBody));
279293
}
280294

281-
try (CloseableHttpResponse response = httpClient.execute(request)) {
295+
HttpContext context = new BasicHttpContext();
296+
try (CloseableHttpResponse response = httpClient.execute(request, context)) {
282297
Map<String, String> respHeaders = Maps.newHashMap();
283298
for (Header header : response.getHeaders()) {
284299
respHeaders.put(header.getName(), header.getValue());
@@ -296,7 +311,7 @@ protected <T extends RESTResponse> T execute(
296311

297312
if (!isSuccessful(response)) {
298313
// The provided error handler is expected to throw, but a RESTException is thrown if not.
299-
throwFailure(response, responseBody, errorHandler);
314+
throwFailure(response, responseBody, errorHandler, context.getAttribute("was-retried"));
300315
}
301316

302317
if (responseBody == null) {

core/src/main/java/org/apache/iceberg/rest/responses/ErrorResponse.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@ public class ErrorResponse implements RESTResponse {
3232
private final String type;
3333
private final int code;
3434
private final List<String> stack;
35+
private final boolean wasRetried;
3536

36-
private ErrorResponse(String message, String type, int code, List<String> stack) {
37+
private ErrorResponse(
38+
String message, String type, int code, List<String> stack, boolean wasRetried) {
3739
this.message = message;
3840
this.type = type;
3941
this.code = code;
4042
this.stack = stack;
43+
this.wasRetried = wasRetried;
4144
validate();
4245
}
4346

@@ -62,6 +65,10 @@ public List<String> stack() {
6265
return stack;
6366
}
6467

68+
public boolean wasRetried() {
69+
return wasRetried;
70+
}
71+
6572
@Override
6673
public String toString() {
6774
StringBuilder sb = new StringBuilder();
@@ -92,6 +99,7 @@ public static class Builder {
9299
private String type;
93100
private Integer code;
94101
private List<String> stack;
102+
private boolean wasRetried;
95103

96104
private Builder() {}
97105

@@ -126,9 +134,14 @@ public Builder responseCode(Integer responseCode) {
126134
return this;
127135
}
128136

137+
public Builder wasRetried(boolean hasBeenRetried) {
138+
this.wasRetried = hasBeenRetried;
139+
return this;
140+
}
141+
129142
public ErrorResponse build() {
130143
Preconditions.checkArgument(code != null, "Invalid response, missing field: code");
131-
return new ErrorResponse(message, type, code, stack);
144+
return new ErrorResponse(message, type, code, stack, wasRetried);
132145
}
133146
}
134147
}

core/src/test/java/org/apache/iceberg/rest/TestExponentialHttpRequestRetryStrategy.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.apache.hc.core5.http.HttpHeaders;
3838
import org.apache.hc.core5.http.HttpResponse;
3939
import org.apache.hc.core5.http.message.BasicHttpResponse;
40+
import org.apache.hc.core5.http.protocol.BasicHttpContext;
41+
import org.apache.hc.core5.http.protocol.HttpContext;
4042
import org.junit.jupiter.api.Test;
4143
import org.junit.jupiter.params.ParameterizedTest;
4244
import org.junit.jupiter.params.provider.ValueSource;
@@ -80,14 +82,20 @@ public void exponentialRetry() {
8082

8183
@Test
8284
public void basicRetry() {
85+
HttpContext context503 = new BasicHttpContext();
8386
BasicHttpResponse response503 = new BasicHttpResponse(503, "Oopsie");
84-
assertThat(retryStrategy.retryRequest(response503, 3, null)).isTrue();
87+
assertThat(retryStrategy.retryRequest(response503, 3, context503)).isTrue();
88+
assertThat(context503.getAttribute("was-retried") == Boolean.TRUE).isTrue();
8589

90+
HttpContext context429 = new BasicHttpContext();
8691
BasicHttpResponse response429 = new BasicHttpResponse(429, "Oopsie");
87-
assertThat(retryStrategy.retryRequest(response429, 3, null)).isTrue();
92+
assertThat(retryStrategy.retryRequest(response429, 3, context429)).isTrue();
93+
assertThat(context429.getAttribute("was-retried") == Boolean.TRUE).isTrue();
8894

95+
HttpContext context404 = new BasicHttpContext();
8996
BasicHttpResponse response404 = new BasicHttpResponse(404, "Oopsie");
90-
assertThat(retryStrategy.retryRequest(response404, 3, null)).isFalse();
97+
assertThat(retryStrategy.retryRequest(response404, 3, context404)).isFalse();
98+
assertThat(context429.getAttribute("was-retried") == Boolean.TRUE).isTrue();
9199
}
92100

93101
@Test
@@ -155,8 +163,9 @@ public void noRetryOnAbortedRequests() {
155163
@Test
156164
public void retryOnNonAbortedRequests() {
157165
HttpGet request = new HttpGet("/");
166+
HttpContext context = new BasicHttpContext();
158167

159-
assertThat(retryStrategy.retryRequest(request, new IOException(), 1, null)).isTrue();
168+
assertThat(retryStrategy.retryRequest(request, new IOException(), 1, context)).isTrue();
160169
}
161170

162171
@Test
@@ -199,13 +208,15 @@ public void invalidRetryAfterHeader() {
199208

200209
@Test
201210
public void testRetryBadGateway() {
211+
HttpContext context = new BasicHttpContext();
202212
BasicHttpResponse response502 = new BasicHttpResponse(502, "Bad gateway failure");
203-
assertThat(retryStrategy.retryRequest(response502, 3, null)).isTrue();
213+
assertThat(retryStrategy.retryRequest(response502, 3, context)).isTrue();
204214
}
205215

206216
@Test
207217
public void testRetryGatewayTimeout() {
218+
HttpContext context = new BasicHttpContext();
208219
BasicHttpResponse response504 = new BasicHttpResponse(504, "Gateway timeout");
209-
assertThat(retryStrategy.retryRequest(response504, 3, null)).isTrue();
220+
assertThat(retryStrategy.retryRequest(response504, 3, context)).isTrue();
210221
}
211222
}

core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.iceberg.catalog.TableCommit;
6363
import org.apache.iceberg.catalog.TableIdentifier;
6464
import org.apache.iceberg.exceptions.CommitFailedException;
65+
import org.apache.iceberg.exceptions.CommitStateUnknownException;
6566
import org.apache.iceberg.exceptions.NotAuthorizedException;
6667
import org.apache.iceberg.exceptions.NotFoundException;
6768
import org.apache.iceberg.exceptions.ServiceFailureException;
@@ -92,6 +93,7 @@
9293
import org.eclipse.jetty.servlet.ServletContextHandler;
9394
import org.eclipse.jetty.servlet.ServletHolder;
9495
import org.junit.jupiter.api.AfterEach;
96+
import org.junit.jupiter.api.Assertions;
9597
import org.junit.jupiter.api.BeforeEach;
9698
import org.junit.jupiter.api.Test;
9799
import org.junit.jupiter.api.io.TempDir;
@@ -2648,6 +2650,24 @@ public void testTableExistsFallbackToGETRequest() {
26482650
ConfigResponse.builder().withEndpoints(ImmutableList.of(Endpoint.V1_LOAD_TABLE)).build());
26492651
}
26502652

2653+
@Test
2654+
public void testErrorHandlingForConflicts() {
2655+
Consumer<ErrorResponse> errorResponseConsumer = ErrorHandlers.tableCommitHandler();
2656+
// server returning 409 with client without retrying
2657+
ErrorResponse errorResponse409WithoutRetries =
2658+
ErrorResponse.builder().responseCode(409).wasRetried(false).build();
2659+
Assertions.assertThrows(
2660+
CommitFailedException.class,
2661+
() -> errorResponseConsumer.accept(errorResponse409WithoutRetries));
2662+
2663+
// server returning 409, with retries.
2664+
ErrorResponse errorResponse409WithRetries =
2665+
ErrorResponse.builder().responseCode(409).wasRetried(true).build();
2666+
Assertions.assertThrows(
2667+
CommitStateUnknownException.class,
2668+
() -> errorResponseConsumer.accept(errorResponse409WithRetries));
2669+
}
2670+
26512671
private void verifyTableExistsFallbackToGETRequest(ConfigResponse configResponse) {
26522672
RESTCatalogAdapter adapter =
26532673
Mockito.spy(

0 commit comments

Comments
 (0)