Skip to content

Commit 30c4b43

Browse files
committed
Refactor async retry error handling and add offset cleanup test
* Use common RecordInRetryException try/catch in invokeBatchErrorHandler and invokeErrorHandler * Keep single-record retry list creation with List.of(cRecord) * Add removeOffsetsInBatchForRetryRecords to verify offsetsInThisBatch/deferredOffsets cleanup * Remove IDE-specific SpringJavaInjectionPointsAutowiringInspection suppressions from async mono retry test Signed-off-by: MinChul-Son <smc5236@naver.com>
1 parent 970ff25 commit 30c4b43

File tree

3 files changed

+133
-62
lines changed

3 files changed

+133
-62
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 52 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2581,35 +2581,28 @@ private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
25812581
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
25822582
List<ConsumerRecord<K, V>> list, RuntimeException rte) {
25832583

2584-
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling() || this.transactionManager != null
2585-
|| rte instanceof CommitFailedException) {
2586-
2587-
try {
2584+
try {
2585+
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling()
2586+
|| this.transactionManager != null || rte instanceof CommitFailedException) {
25882587
this.commonErrorHandler.handleBatch(rte, records, this.consumer,
25892588
KafkaMessageListenerContainer.this.thisOrParentContainer,
25902589
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
25912590
}
2592-
catch (RecordInRetryException e) {
2593-
removeOffsetsInBatch(list);
2594-
throw e;
2595-
}
2596-
}
25972591
else {
2598-
try {
2599-
ConsumerRecords<K, V> afterHandling = this.commonErrorHandler.handleBatchAndReturnRemaining(rte,
2600-
records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer,
2601-
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
2602-
if (afterHandling != null && !afterHandling.isEmpty()) {
2603-
this.remainingRecords = afterHandling;
2604-
this.pauseForPending = true;
2605-
}
2606-
}
2607-
catch (RecordInRetryException e) {
2608-
removeOffsetsInBatch(list);
2609-
throw e;
2592+
ConsumerRecords<K, V> afterHandling = this.commonErrorHandler.handleBatchAndReturnRemaining(rte,
2593+
records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer,
2594+
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
2595+
if (afterHandling != null && !afterHandling.isEmpty()) {
2596+
this.remainingRecords = afterHandling;
2597+
this.pauseForPending = true;
26102598
}
26112599
}
26122600
}
2601+
catch (RecordInRetryException e) {
2602+
removeOffsetsInBatch(list);
2603+
throw e;
2604+
}
2605+
}
26132606

26142607
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
26152608
if (this.transactionTemplate != null) {
@@ -3021,8 +3014,7 @@ private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedReco
30213014
catch (Exception ex) { // NO SONAR
30223015
this.logger.error(ex, "Failed to commit before handling error");
30233016
}
3024-
List<ConsumerRecord<?, ?>> retryRecords = new ArrayList<>();
3025-
retryRecords.add(cRecord);
3017+
List<ConsumerRecord<?, ?>> retryRecords = List.of(cRecord);
30263018
try {
30273019
this.commonErrorHandler.handleRemaining(rte, retryRecords, this.consumer,
30283020
KafkaMessageListenerContainer.this.thisOrParentContainer);
@@ -3056,54 +3048,54 @@ private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedReco
30563048
private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
30573049
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
30583050

3059-
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling()
3060-
|| rte instanceof CommitFailedException) {
3061-
3062-
try {
3063-
if (this.producer == null) {
3064-
processCommits();
3051+
List<ConsumerRecord<?, ?>> retryRecords = List.of(cRecord);
3052+
try {
3053+
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling()
3054+
|| rte instanceof CommitFailedException) {
3055+
try {
3056+
if (this.producer == null) {
3057+
processCommits();
3058+
}
30653059
}
3066-
}
3067-
catch (Exception ex) { // NO SONAR
3068-
this.logger.error(ex, "Failed to commit before handling error");
3069-
}
3070-
List<ConsumerRecord<?, ?>> retryRecords = new ArrayList<>();
3060+
catch (Exception ex) { // NO SONAR
3061+
this.logger.error(ex, "Failed to commit before handling error");
3062+
}
3063+
retryRecords = new ArrayList<>();
30713064
retryRecords.add(cRecord);
30723065
while (iterator.hasNext()) {
30733066
retryRecords.add(iterator.next());
30743067
}
3068+
this.commonErrorHandler.handleRemaining(rte, retryRecords, this.consumer,
3069+
KafkaMessageListenerContainer.this.thisOrParentContainer);
3070+
}
3071+
else {
3072+
boolean handled = false;
30753073
try {
3076-
this.commonErrorHandler.handleRemaining(rte, retryRecords, this.consumer,
3074+
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
30773075
KafkaMessageListenerContainer.this.thisOrParentContainer);
30783076
}
3079-
catch (RecordInRetryException e) {
3080-
removeOffsetsInBatch(retryRecords);
3081-
throw e;
3077+
catch (Exception ex) {
3078+
this.logger.error(ex, "ErrorHandler threw unexpected exception");
30823079
}
3083-
}
3084-
else {
3085-
boolean handled = false;
3086-
try {
3087-
handled = this.commonErrorHandler.handleOne(rte, cRecord, this.consumer,
3088-
KafkaMessageListenerContainer.this.thisOrParentContainer);
3089-
}
3090-
catch (Exception ex) {
3091-
this.logger.error(ex, "ErrorHandler threw unexpected exception");
3092-
}
3093-
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
3094-
if (!handled) {
3095-
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
3096-
tp -> new ArrayList<>()).add(cRecord);
3097-
while (iterator.hasNext()) {
3098-
ConsumerRecord<K, V> next = iterator.next();
3099-
records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
3100-
tp -> new ArrayList<>()).add(next);
3080+
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
3081+
if (!handled) {
3082+
records.computeIfAbsent(new TopicPartition(cRecord.topic(), cRecord.partition()),
3083+
tp -> new ArrayList<>()).add(cRecord);
3084+
while (iterator.hasNext()) {
3085+
ConsumerRecord<K, V> next = iterator.next();
3086+
records.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
3087+
tp -> new ArrayList<>()).add(next);
3088+
}
3089+
}
3090+
if (!records.isEmpty()) {
3091+
this.remainingRecords = new ConsumerRecords<>(records, Map.of());
3092+
this.pauseForPending = true;
31013093
}
31023094
}
3103-
if (!records.isEmpty()) {
3104-
this.remainingRecords = new ConsumerRecords<>(records, Map.of());
3105-
this.pauseForPending = true;
3106-
}
3095+
}
3096+
catch (RecordInRetryException e) {
3097+
removeOffsetsInBatch(retryRecords);
3098+
throw e;
31073099
}
31083100
}
31093101

spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncMonoDefaultErrorHandlerRetryTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,11 @@ Listener listener() {
9191
}
9292

9393
@Bean
94-
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
9594
KafkaTemplate<String, String> kafkaTemplate(EmbeddedKafkaBroker embeddedKafka) {
9695
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)));
9796
}
9897

9998
@Bean
100-
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
10199
DefaultKafkaConsumerFactory<String, String> consumerFactory(EmbeddedKafkaBroker embeddedKafka) {
102100
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka,
103101
"async-mono-default-eh-group", false);

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
* @author Gary Russell
7979
* @author Wang Zhiyang
8080
* @author Soby Chacko
81+
* @author Minchul Son
8182
*
8283
* @since 2.2.4
8384
*
@@ -1285,6 +1286,86 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int
12851286
container.stop();
12861287
}
12871288

1289+
@SuppressWarnings({ "unchecked", "rawtypes" })
1290+
@Test
1291+
void removeOffsetsInBatchForRetryRecords() throws InterruptedException {
1292+
TopicPartition topicPartition = new TopicPartition("foo", 0);
1293+
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new LinkedHashMap<>();
1294+
recordMap.put(topicPartition,
1295+
List.of(new ConsumerRecord("foo", 0, 0, null, "bar-0"),
1296+
new ConsumerRecord("foo", 0, 1, null, "bar-1")));
1297+
ConsumerRecords polledRecords = new ConsumerRecords<>(recordMap, Map.of());
1298+
AtomicInteger pollCount = new AtomicInteger();
1299+
1300+
Consumer consumer = mock(Consumer.class);
1301+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1302+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1303+
willAnswer(invocation -> {
1304+
rebal.set(invocation.getArgument(1));
1305+
subscribeLatch.countDown();
1306+
return null;
1307+
}).given(consumer).subscribe(any(Collection.class), any());
1308+
willAnswer(invocation -> {
1309+
if (pollCount.getAndIncrement() == 0) {
1310+
rebal.get().onPartitionsAssigned(List.of(topicPartition));
1311+
return polledRecords;
1312+
}
1313+
Thread.sleep(50);
1314+
return ConsumerRecords.empty();
1315+
}).given(consumer).poll(any());
1316+
ConsumerFactory cf = mock(ConsumerFactory.class);
1317+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1318+
given(cf.getConfigurationProperties())
1319+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1320+
ContainerProperties containerProperties = new ContainerProperties("foo");
1321+
containerProperties.setGroupId("grp");
1322+
containerProperties.setAckMode(AckMode.MANUAL);
1323+
containerProperties.setAsyncAcks(true);
1324+
containerProperties.setMessageListener((MessageListener) rec -> {
1325+
throw new RuntimeException("test");
1326+
});
1327+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf, containerProperties);
1328+
CountDownLatch handleRemainingLatch = new CountDownLatch(1);
1329+
container.setCommonErrorHandler(new CommonErrorHandler() {
1330+
1331+
@Override
1332+
public boolean seeksAfterHandling() {
1333+
return true;
1334+
}
1335+
1336+
@Override
1337+
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> failedRecords,
1338+
Consumer<?, ?> kafkaConsumer, MessageListenerContainer listenerContainer) {
1339+
1340+
handleRemainingLatch.countDown();
1341+
throw new RecordInRetryException("retrying", thrownException);
1342+
}
1343+
1344+
});
1345+
container.start();
1346+
try {
1347+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1348+
assertThat(handleRemainingLatch.await(10, TimeUnit.SECONDS)).isTrue();
1349+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1350+
.getPropertyValue(container, "containers", List.class).get(0);
1351+
Map offsets = null;
1352+
Map deferred = null;
1353+
for (int i = 0; i < 20; i++) {
1354+
offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
1355+
deferred = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.deferredOffsets", Map.class);
1356+
if ((offsets == null || offsets.isEmpty()) && (deferred == null || deferred.isEmpty())) {
1357+
break;
1358+
}
1359+
Thread.sleep(50);
1360+
}
1361+
assertThat(offsets).isNullOrEmpty();
1362+
assertThat(deferred).isNullOrEmpty();
1363+
}
1364+
finally {
1365+
container.stop();
1366+
}
1367+
}
1368+
12881369
@SuppressWarnings("rawtypes")
12891370
private AcknowledgingMessageListener ackOffset1() {
12901371
return new AcknowledgingMessageListener() {

0 commit comments

Comments
 (0)