diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java index 0c20fd1b..76acd94a 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java @@ -36,14 +36,18 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; +import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.ConnectException; import java.net.NoRouteToHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -80,6 +84,9 @@ public class Elasticsearch8AsyncWriter extends AsyncSinkWriter ELASTICSEARCH_NON_RETRYABLE_STATUS = + new HashSet<>(Arrays.asList(400, 404)); + public Elasticsearch8AsyncWriter( ElementConverter elementConverter, WriterInitContext context, @@ -160,21 +167,45 @@ private void handlePartiallyFailedRequest( ResultHandler resultHandler, BulkResponse response) { LOG.debug("The BulkRequest has failed partially. Response: {}", response); - ArrayList failedItems = new ArrayList<>(); + + ArrayList failedItemsToRetry = new ArrayList<>(); + int totalFailedItems = 0; + FlinkRuntimeException nonRetryableException = null; + for (int i = 0; i < response.items().size(); i++) { - if (response.items().get(i).error() != null) { - failedItems.add(requestEntries.get(i)); + BulkResponseItem responseItem = response.items().get(i); + if (responseItem.error() != null) { + totalFailedItems++; + if (isOperationRetryable(responseItem.status())) { + failedItemsToRetry.add(requestEntries.get(i)); + } else { + LOG.error( + "Failed to process non-retryable operation: {}, response: {}", + requestEntries.get(i), + responseItem); + nonRetryableException = + new FlinkRuntimeException( + "Failed to process non-retryable operation, reason=%s" + + responseItem.error().reason()); + break; + } } } - numRecordsOutErrorsCounter.inc(failedItems.size()); - numRecordsSendPartialFailureCounter.inc(failedItems.size()); + numRecordsOutErrorsCounter.inc(totalFailedItems); LOG.info( - "The BulkRequest with {} operation(s) has {} failure(s). It took {}ms", + "The BulkRequest with {} operation(s) has {} failure(s), {} retryable. It took {}ms", requestEntries.size(), - failedItems.size(), + totalFailedItems, + failedItemsToRetry.size(), response.took()); - resultHandler.retryForEntries(failedItems); + + if (nonRetryableException != null) { + resultHandler.completeExceptionally(nonRetryableException); + } else { + numRecordsSendPartialFailureCounter.inc(failedItemsToRetry.size()); + resultHandler.retryForEntries(failedItemsToRetry); + } } private void handleSuccessfulRequest( @@ -190,6 +221,11 @@ private boolean isRetryable(Throwable error) { return !ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(error, getFatalExceptionCons()); } + /** Given the response status, check if an operation is retryable. */ + private static boolean isOperationRetryable(int status) { + return !ELASTICSEARCH_NON_RETRYABLE_STATUS.contains(status); + } + @Override protected long getSizeInBytes(Operation requestEntry) { return new OperationSerializer().size(requestEntry); diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java index c401e761..c8f6db5d 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; import org.apache.flink.metrics.Gauge; +import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation; import org.apache.http.HttpHost; import org.junit.jupiter.api.BeforeEach; @@ -37,6 +38,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,6 +52,7 @@ public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase private final Lock lock = new ReentrantLock(); private final Condition completed = lock.newCondition(); + private final AtomicBoolean completedExceptionally = new AtomicBoolean(false); @BeforeEach void setUp() { @@ -171,8 +174,59 @@ public void testSendTimeMetric() throws Exception { @Timeout(5) public void testHandlePartiallyFailedBulk() throws Exception { String index = "test-partially-failed-bulk"; + int maxBatchSize = 3; + + // First create a document to enable version conflict + try (final Elasticsearch8AsyncWriter setupWriter = createWriter(index, 1)) { + setupWriter.write(new DummyData("test-3", "test-3"), null); + await(); + } + + // Create converter that triggers 409 version conflict for test-3 + Elasticsearch8AsyncSinkBuilder.OperationConverter conflictConverter = + new Elasticsearch8AsyncSinkBuilder.OperationConverter<>( + (element, ctx) -> { + if (element.getId().equals("test-3")) { + // Use wrong version to trigger 409 conflict (retryable) + return new IndexOperation.Builder<>() + .id(element.getId()) + .index(index) + .document(element) + .ifSeqNo(999L) // Wrong sequence number + .ifPrimaryTerm(1L) + .build(); + } else { + return new IndexOperation.Builder<>() + .id(element.getId()) + .index(index) + .document(element) + .build(); + } + }); + + try (final Elasticsearch8AsyncWriter writer = + createWriter(maxBatchSize, conflictConverter)) { + writer.write(new DummyData("test-1", "test-1"), null); + writer.write(new DummyData("test-2", "test-2"), null); + writer.write(new DummyData("test-3", "version-conflict"), null); + } + + await(); + + // 409 is retryable, so test-3 should have not completed the rest handler exceptionally + assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1); + assertThat(completedExceptionally.get()).isFalse(); + assertIdsAreWritten(index, new String[] {"test-1", "test-2"}); + } + + @TestTemplate + @Timeout(5) + public void testFailFastUponPartiallyFailedBulk() throws Exception { + String index = "test-fail-fast-partially-failed-bulk"; int maxBatchSize = 2; + // This simulates a scenario where some operations fail with non-retryable errors. + // test-1 gets docAsUpsert=false on non-existing doc (404 error). Elasticsearch8AsyncSinkBuilder.OperationConverter elementConverter = new Elasticsearch8AsyncSinkBuilder.OperationConverter<>( (element, ctx) -> @@ -195,7 +249,9 @@ public void testHandlePartiallyFailedBulk() throws Exception { await(); + // Verify that non-retryable error (404) increments error counter and fails fast assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1); + assertThat(completedExceptionally.get()).isTrue(); assertIdsAreWritten(index, new String[] {"test-2"}); assertIdsAreNotWritten(index, new String[] {"test-1"}); } @@ -264,6 +320,7 @@ public void complete() { @Override public void completeExceptionally(Exception e) { resultHandler.completeExceptionally(e); + completedExceptionally.set(true); signal(); }