Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -80,6 +84,9 @@ public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, O
"Could not connect to Elasticsearch cluster using the provided hosts",
err)));

private static final Set<Integer> ELASTICSEARCH_NON_RETRYABLE_STATUS =
new HashSet<>(Arrays.asList(400, 404));

public Elasticsearch8AsyncWriter(
ElementConverter<InputT, Operation> elementConverter,
WriterInitContext context,
Expand Down Expand Up @@ -160,21 +167,45 @@ private void handlePartiallyFailedRequest(
ResultHandler<Operation> resultHandler,
BulkResponse response) {
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList<Operation> failedItems = new ArrayList<>();

ArrayList<Operation> failedItemsToRetry = new ArrayList<>();
int totalFailedItems = 0;
FlinkRuntimeException nonRetryableException = null;

for (int i = 0; i < response.items().size(); i++) {
if (response.items().get(i).error() != null) {
failedItems.add(requestEntries.get(i));
BulkResponseItem responseItem = response.items().get(i);
if (responseItem.error() != null) {
totalFailedItems++;
if (isOperationRetryable(responseItem.status())) {
failedItemsToRetry.add(requestEntries.get(i));
} else {
LOG.error(
"Failed to process non-retryable operation: {}, response: {}",
requestEntries.get(i),
responseItem);
nonRetryableException =
new FlinkRuntimeException(
"Failed to process non-retryable operation, reason=%s"
+ responseItem.error().reason());
break;
}
}
}

numRecordsOutErrorsCounter.inc(failedItems.size());
numRecordsSendPartialFailureCounter.inc(failedItems.size());
numRecordsOutErrorsCounter.inc(totalFailedItems);
LOG.info(
"The BulkRequest with {} operation(s) has {} failure(s). It took {}ms",
"The BulkRequest with {} operation(s) has {} failure(s), {} retryable. It took {}ms",
requestEntries.size(),
failedItems.size(),
totalFailedItems,
failedItemsToRetry.size(),
response.took());
resultHandler.retryForEntries(failedItems);

if (nonRetryableException != null) {
resultHandler.completeExceptionally(nonRetryableException);
} else {
numRecordsSendPartialFailureCounter.inc(failedItemsToRetry.size());
resultHandler.retryForEntries(failedItemsToRetry);
}
}

private void handleSuccessfulRequest(
Expand All @@ -190,6 +221,11 @@ private boolean isRetryable(Throwable error) {
return !ELASTICSEARCH_FATAL_EXCEPTION_CLASSIFIER.isFatal(error, getFatalExceptionCons());
}

/** Given the response status, check if an operation is retryable. */
private static boolean isOperationRetryable(int status) {
return !ELASTICSEARCH_NON_RETRYABLE_STATUS.contains(status);
}

@Override
protected long getSizeInBytes(Operation requestEntry) {
return new OperationSerializer().size(requestEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.metrics.Gauge;

import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
import org.apache.http.HttpHost;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -37,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -50,6 +52,7 @@ public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase
private final Lock lock = new ReentrantLock();

private final Condition completed = lock.newCondition();
private final AtomicBoolean completedExceptionally = new AtomicBoolean(false);

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -171,8 +174,59 @@ public void testSendTimeMetric() throws Exception {
@Timeout(5)
public void testHandlePartiallyFailedBulk() throws Exception {
String index = "test-partially-failed-bulk";
int maxBatchSize = 3;

// First create a document to enable version conflict
try (final Elasticsearch8AsyncWriter<DummyData> setupWriter = createWriter(index, 1)) {
setupWriter.write(new DummyData("test-3", "test-3"), null);
await();
}

// Create converter that triggers 409 version conflict for test-3
Elasticsearch8AsyncSinkBuilder.OperationConverter<DummyData> conflictConverter =
new Elasticsearch8AsyncSinkBuilder.OperationConverter<>(
(element, ctx) -> {
if (element.getId().equals("test-3")) {
// Use wrong version to trigger 409 conflict (retryable)
return new IndexOperation.Builder<>()
.id(element.getId())
.index(index)
.document(element)
.ifSeqNo(999L) // Wrong sequence number
.ifPrimaryTerm(1L)
.build();
} else {
return new IndexOperation.Builder<>()
.id(element.getId())
.index(index)
.document(element)
.build();
}
});

try (final Elasticsearch8AsyncWriter<DummyData> writer =
createWriter(maxBatchSize, conflictConverter)) {
writer.write(new DummyData("test-1", "test-1"), null);
writer.write(new DummyData("test-2", "test-2"), null);
writer.write(new DummyData("test-3", "version-conflict"), null);
}

await();

// 409 is retryable, so test-3 should have not completed the rest handler exceptionally
assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1);
assertThat(completedExceptionally.get()).isFalse();
assertIdsAreWritten(index, new String[] {"test-1", "test-2"});
}

@TestTemplate
@Timeout(5)
public void testFailFastUponPartiallyFailedBulk() throws Exception {
String index = "test-fail-fast-partially-failed-bulk";
int maxBatchSize = 2;

// This simulates a scenario where some operations fail with non-retryable errors.
// test-1 gets docAsUpsert=false on non-existing doc (404 error).
Elasticsearch8AsyncSinkBuilder.OperationConverter<DummyData> elementConverter =
new Elasticsearch8AsyncSinkBuilder.OperationConverter<>(
(element, ctx) ->
Expand All @@ -195,7 +249,9 @@ public void testHandlePartiallyFailedBulk() throws Exception {

await();

// Verify that non-retryable error (404) increments error counter and fails fast
assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1);
assertThat(completedExceptionally.get()).isTrue();
assertIdsAreWritten(index, new String[] {"test-2"});
assertIdsAreNotWritten(index, new String[] {"test-1"});
}
Expand Down Expand Up @@ -264,6 +320,7 @@ public void complete() {
@Override
public void completeExceptionally(Exception e) {
resultHandler.completeExceptionally(e);
completedExceptionally.set(true);
signal();
}

Expand Down