Skip to content

Commit 68760b2

Browse files
committed
Refine async failure handling and retry record setup
- Use a snapshot-based pass for failed async records - simplify retry record initialization in invokeErrorHandler to avoid duplicate setup. - restore blank lines for readability Signed-off-by: MinChul-Son <smc5236@naver.com>
1 parent 3c79c09 commit 68760b2

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,11 +1514,10 @@ protected void pollAndInvoke() {
15141514
protected void handleAsyncFailure() {
15151515
// Process only the records present at loop start; failures added concurrently
15161516
// are handled in the next loop iteration.
1517-
int capturedRecordsCount = this.failedRecords.size();
1518-
for (int i = 0; i < capturedRecordsCount; i++) {
1519-
FailedRecordTuple<K, V> failedRecord = this.failedRecords.pollFirst();
1520-
if (failedRecord == null) {
1521-
break;
1517+
List<FailedRecordTuple<K, V>> failedRecordsSnapshot = new ArrayList<>(this.failedRecords);
1518+
for (FailedRecordTuple<K, V> failedRecord : failedRecordsSnapshot) {
1519+
if (!this.failedRecords.removeFirstOccurrence(failedRecord)) {
1520+
continue;
15221521
}
15231522
try {
15241523
failedRecord.observation.scoped(() -> invokeErrorHandlerBySingleRecord(failedRecord));
@@ -2582,6 +2581,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
25822581
List<ConsumerRecord<K, V>> list, RuntimeException rte) {
25832582

25842583
try {
2584+
25852585
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling()
25862586
|| this.transactionManager != null || rte instanceof CommitFailedException) {
25872587
this.commonErrorHandler.handleBatch(rte, records, this.consumer,
@@ -2592,6 +2592,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
25922592
ConsumerRecords<K, V> afterHandling = this.commonErrorHandler.handleBatchAndReturnRemaining(rte,
25932593
records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer,
25942594
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
2595+
25952596
if (afterHandling != null && !afterHandling.isEmpty()) {
25962597
this.remainingRecords = afterHandling;
25972598
this.pauseForPending = true;
@@ -3048,10 +3049,12 @@ private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedReco
30483049
private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
30493050
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
30503051

3051-
List<ConsumerRecord<?, ?>> retryRecords = List.of(cRecord);
3052+
List<ConsumerRecord<?, ?>> retryRecords = new ArrayList<>();
3053+
retryRecords.add(cRecord);
30523054
try {
30533055
if (Objects.requireNonNull(this.commonErrorHandler).seeksAfterHandling()
30543056
|| rte instanceof CommitFailedException) {
3057+
30553058
try {
30563059
if (this.producer == null) {
30573060
processCommits();
@@ -3060,8 +3063,6 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
30603063
catch (Exception ex) { // NO SONAR
30613064
this.logger.error(ex, "Failed to commit before handling error");
30623065
}
3063-
retryRecords = new ArrayList<>();
3064-
retryRecords.add(cRecord);
30653066
while (iterator.hasNext()) {
30663067
retryRecords.add(iterator.next());
30673068
}

0 commit comments

Comments
 (0)