27
27
import org .apache .flink .connector .base .sink .writer .TestSinkInitContext ;
28
28
import org .apache .flink .metrics .Gauge ;
29
29
30
+ import co .elastic .clients .elasticsearch .core .bulk .IndexOperation ;
30
31
import co .elastic .clients .elasticsearch .core .bulk .UpdateOperation ;
31
32
import org .apache .http .HttpHost ;
32
33
import org .junit .jupiter .api .BeforeEach ;
37
38
import java .util .Collections ;
38
39
import java .util .List ;
39
40
import java .util .Optional ;
41
+ import java .util .concurrent .atomic .AtomicBoolean ;
40
42
import java .util .concurrent .locks .Condition ;
41
43
import java .util .concurrent .locks .Lock ;
42
44
import java .util .concurrent .locks .ReentrantLock ;
@@ -50,6 +52,7 @@ public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase
50
52
private final Lock lock = new ReentrantLock ();
51
53
52
54
private final Condition completed = lock .newCondition ();
55
+ private final AtomicBoolean completedExceptionally = new AtomicBoolean (false );
53
56
54
57
@ BeforeEach
55
58
void setUp () {
@@ -171,8 +174,59 @@ public void testSendTimeMetric() throws Exception {
171
174
@ Timeout (5 )
172
175
public void testHandlePartiallyFailedBulk () throws Exception {
173
176
String index = "test-partially-failed-bulk" ;
177
+ int maxBatchSize = 3 ;
178
+
179
+ // First create a document to enable version conflict
180
+ try (final Elasticsearch8AsyncWriter <DummyData > setupWriter = createWriter (index , 1 )) {
181
+ setupWriter .write (new DummyData ("test-3" , "test-3" ), null );
182
+ await ();
183
+ }
184
+
185
+ // Create converter that triggers 409 version conflict for test-3
186
+ Elasticsearch8AsyncSinkBuilder .OperationConverter <DummyData > conflictConverter =
187
+ new Elasticsearch8AsyncSinkBuilder .OperationConverter <>(
188
+ (element , ctx ) -> {
189
+ if (element .getId ().equals ("test-3" )) {
190
+ // Use wrong version to trigger 409 conflict (retryable)
191
+ return new IndexOperation .Builder <>()
192
+ .id (element .getId ())
193
+ .index (index )
194
+ .document (element )
195
+ .ifSeqNo (999L ) // Wrong sequence number
196
+ .ifPrimaryTerm (1L )
197
+ .build ();
198
+ } else {
199
+ return new IndexOperation .Builder <>()
200
+ .id (element .getId ())
201
+ .index (index )
202
+ .document (element )
203
+ .build ();
204
+ }
205
+ });
206
+
207
+ try (final Elasticsearch8AsyncWriter <DummyData > writer =
208
+ createWriter (maxBatchSize , conflictConverter )) {
209
+ writer .write (new DummyData ("test-1" , "test-1" ), null );
210
+ writer .write (new DummyData ("test-2" , "test-2" ), null );
211
+ writer .write (new DummyData ("test-3" , "version-conflict" ), null );
212
+ }
213
+
214
+ await ();
215
+
216
+ // 409 is retryable, so test-3 should have not completed the rest handler exceptionally
217
+ assertThat (context .metricGroup ().getNumRecordsOutErrorsCounter ().getCount ()).isEqualTo (1 );
218
+ assertThat (completedExceptionally .get ()).isFalse ();
219
+ assertIdsAreWritten (index , new String [] {"test-1" , "test-2" });
220
+ }
221
+
222
+ @ TestTemplate
223
+ @ Timeout (5 )
224
+ public void testFailFastUponPartiallyFailedBulk () throws Exception {
225
+ String index = "test-fail-fast-partially-failed-bulk" ;
174
226
int maxBatchSize = 2 ;
175
227
228
+ // This simulates a scenario where some operations fail with non-retryable errors.
229
+ // test-1 gets docAsUpsert=false on non-existing doc (404 error).
176
230
Elasticsearch8AsyncSinkBuilder .OperationConverter <DummyData > elementConverter =
177
231
new Elasticsearch8AsyncSinkBuilder .OperationConverter <>(
178
232
(element , ctx ) ->
@@ -195,7 +249,9 @@ public void testHandlePartiallyFailedBulk() throws Exception {
195
249
196
250
await ();
197
251
252
+ // Verify that non-retryable error (404) increments error counter and fails fast
198
253
assertThat (context .metricGroup ().getNumRecordsOutErrorsCounter ().getCount ()).isEqualTo (1 );
254
+ assertThat (completedExceptionally .get ()).isTrue ();
199
255
assertIdsAreWritten (index , new String [] {"test-2" });
200
256
assertIdsAreNotWritten (index , new String [] {"test-1" });
201
257
}
@@ -264,6 +320,7 @@ public void complete() {
264
320
@ Override
265
321
public void completeExceptionally (Exception e ) {
266
322
resultHandler .completeExceptionally (e );
323
+ completedExceptionally .set (true );
267
324
signal ();
268
325
}
269
326
0 commit comments