Skip to content

Commit 9e87d70

Browse files
committed
Add test for failing fast case
1 parent 191b4cb commit 9e87d70

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Collections;
3838
import java.util.List;
3939
import java.util.Optional;
40+
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.locks.Condition;
4142
import java.util.concurrent.locks.Lock;
4243
import java.util.concurrent.locks.ReentrantLock;
@@ -50,6 +51,7 @@ public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase
5051
private final Lock lock = new ReentrantLock();
5152

5253
private final Condition completed = lock.newCondition();
54+
private final AtomicBoolean completedExceptionally = new AtomicBoolean(false);
5355

5456
@BeforeEach
5557
void setUp() {
@@ -171,8 +173,59 @@ public void testSendTimeMetric() throws Exception {
171173
@Timeout(5)
172174
public void testHandlePartiallyFailedBulk() throws Exception {
173175
String index = "test-partially-failed-bulk";
176+
int maxBatchSize = 3;
177+
178+
// First create a document to enable version conflict
179+
try (final Elasticsearch8AsyncWriter<DummyData> setupWriter = createWriter(index, 1)) {
180+
setupWriter.write(new DummyData("test-3", "test-3"), null);
181+
await();
182+
}
183+
184+
// Create converter that triggers 409 version conflict for test-3
185+
Elasticsearch8AsyncSinkBuilder.OperationConverter<DummyData> conflictConverter =
186+
new Elasticsearch8AsyncSinkBuilder.OperationConverter<>(
187+
(element, ctx) -> {
188+
if (element.getId().equals("test-3")) {
189+
// Use wrong version to trigger 409 conflict (retryable)
190+
return new co.elastic.clients.elasticsearch.core.bulk.IndexOperation.Builder<>()
191+
.id(element.getId())
192+
.index(index)
193+
.document(element)
194+
.ifSeqNo(999L) // Wrong sequence number
195+
.ifPrimaryTerm(1L)
196+
.build();
197+
} else {
198+
return new co.elastic.clients.elasticsearch.core.bulk.IndexOperation.Builder<>()
199+
.id(element.getId())
200+
.index(index)
201+
.document(element)
202+
.build();
203+
}
204+
});
205+
206+
try (final Elasticsearch8AsyncWriter<DummyData> writer =
207+
createWriter(maxBatchSize, conflictConverter)) {
208+
writer.write(new DummyData("test-1", "test-1"), null);
209+
writer.write(new DummyData("test-2", "test-2"), null);
210+
writer.write(new DummyData("test-3", "version-conflict"), null);
211+
}
212+
213+
await();
214+
215+
// 409 is retryable, so test-3 should have not completed the rest handler exceptionally
216+
assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1);
217+
assertThat(completedExceptionally.get()).isFalse();
218+
assertIdsAreWritten(index, new String[] {"test-1", "test-2"});
219+
}
220+
221+
@TestTemplate
222+
@Timeout(5)
223+
public void testFailFastUponPartiallyFailedBulk() throws Exception {
224+
String index = "test-fail-fast-partially-failed-bulk";
174225
int maxBatchSize = 2;
175226

227+
// This simulates a scenario where some operations fail with non-retryable errors.
228+
// test-1 gets docAsUpsert=false on non-existing doc (404 error).
176229
Elasticsearch8AsyncSinkBuilder.OperationConverter<DummyData> elementConverter =
177230
new Elasticsearch8AsyncSinkBuilder.OperationConverter<>(
178231
(element, ctx) ->
@@ -195,7 +248,9 @@ public void testHandlePartiallyFailedBulk() throws Exception {
195248

196249
await();
197250

251+
// Verify that non-retryable error (404) increments error counter and fails fast
198252
assertThat(context.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(1);
253+
assertThat(completedExceptionally.get()).isTrue();
199254
assertIdsAreWritten(index, new String[] {"test-2"});
200255
assertIdsAreNotWritten(index, new String[] {"test-1"});
201256
}
@@ -264,6 +319,7 @@ public void complete() {
264319
@Override
265320
public void completeExceptionally(Exception e) {
266321
resultHandler.completeExceptionally(e);
322+
completedExceptionally.set(true);
267323
signal();
268324
}
269325

0 commit comments

Comments
 (0)