Skip to content

Commit cebd276

Browse files
Replace ConsumerGroupMetadata initialization with mock
ConsumerGroupMetadata will become an interface in Kafka 5.0, so replace direct initialization with a mock implementation. Fixes gh-4337
1 parent 19070c3 commit cebd276

10 files changed

+36
-11
lines changed

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaSaslHandlerClassloadingTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.io.Serial;
1920
import java.util.HashMap;
2021
import java.util.Map;
2122
import java.util.concurrent.atomic.AtomicReference;
@@ -141,6 +142,9 @@ static class TestableProducerFactory<K, V> extends DefaultKafkaProducerFactory<K
141142
* Marker exception to exit producer creation after capturing classloader.
142143
*/
143144
static class TestAbortedException extends RuntimeException {
145+
146+
@Serial
147+
private static final long serialVersionUID = -3667799411878547727L;
144148
}
145149

146150
/**

spring-kafka/src/test/java/org/springframework/kafka/listener/CommitOnAssignmentTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,9 @@ public Consumer consumer() {
196196
this.closeLatch.countDown();
197197
return null;
198198
}).given(consumer).close();
199-
willReturn(new ConsumerGroupMetadata("")).given(consumer).groupMetadata();
199+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
200+
given(consumer.groupMetadata()).willReturn(metadata);
201+
given(metadata.groupId()).willReturn("");
200202
return consumer;
201203
}
202204

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ void testClassifier() {
7979
IllegalStateException illegalState = new IllegalStateException();
8080
@SuppressWarnings("unchecked")
8181
Consumer<String, String> consumer = mock(Consumer.class);
82-
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
82+
ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
83+
given(consumer.groupMetadata()).willReturn(metadata);
84+
given(metadata.groupId()).willReturn("foo");
8385
MessageListenerContainer container = mock(MessageListenerContainer.class);
8486
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
8587
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
@@ -130,7 +132,9 @@ void testBackOffNoBatchRecover() {
130132
IllegalStateException illegalState = new IllegalStateException();
131133
@SuppressWarnings("unchecked")
132134
Consumer<String, String> consumer = mock(Consumer.class);
133-
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
135+
ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
136+
given(consumer.groupMetadata()).willReturn(metadata);
137+
given(metadata.groupId()).willReturn("foo");
134138
MessageListenerContainer container = mock(MessageListenerContainer.class);
135139
given(container.isRunning()).willReturn(true);
136140
processor.processBatch(consumerRecords, records, consumer, container, illegalState, false, EOSMode.V2);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,9 @@ public Consumer consumer() {
218218
this.closeLatch.countDown();
219219
return null;
220220
}).given(consumer).close();
221-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
221+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
222+
given(consumer.groupMetadata()).willReturn(metadata);
223+
given(metadata.groupId()).willReturn(CONTAINER_ID);
222224
return consumer;
223225
}
224226

@@ -250,7 +252,9 @@ public Consumer consumer2() {
250252
return new ConsumerRecords(Collections.emptyMap(), Map.of());
251253
}
252254
}).given(consumer).poll(any());
253-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID_2)).given(consumer).groupMetadata();
255+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
256+
given(consumer.groupMetadata()).willReturn(metadata);
257+
given(metadata.groupId()).willReturn(CONTAINER_ID_2);
254258
return consumer;
255259
}
256260

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerSeekAfterCommitExceptionBatchListenerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ public Consumer consumer() {
200200
this.closeLatch.countDown();
201201
return null;
202202
}).given(consumer).close();
203-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
203+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
204+
given(consumer.groupMetadata()).willReturn(metadata);
205+
given(metadata.groupId()).willReturn(CONTAINER_ID);
204206
return consumer;
205207
}
206208

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,9 @@ public Consumer consumer() {
236236
this.closeLatch.countDown();
237237
return null;
238238
}).given(consumer).close();
239-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
239+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
240+
given(consumer.groupMetadata()).willReturn(metadata);
241+
given(metadata.groupId()).willReturn(CONTAINER_ID);
240242
return consumer;
241243
}
242244

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,9 @@ public Consumer consumer() {
237237
this.closeLatch.countDown();
238238
return null;
239239
}).given(consumer).close();
240-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
240+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
241+
given(consumer.groupMetadata()).willReturn(metadata);
242+
given(metadata.groupId()).willReturn(CONTAINER_ID);
241243
return consumer;
242244
}
243245

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxRollbackTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ public Consumer consumer() {
210210
this.closeLatch.countDown();
211211
return null;
212212
}).given(consumer).close();
213-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
213+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
214+
given(consumer.groupMetadata()).willReturn(metadata);
215+
given(metadata.groupId()).willReturn(CONTAINER_ID);
214216
return consumer;
215217
}
216218

spring-kafka/src/test/java/org/springframework/kafka/listener/SubBatchPerPartitionTxTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,9 @@ public Consumer consumer() {
196196
this.closeLatch.countDown();
197197
return null;
198198
}).given(consumer).close();
199-
willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata();
199+
final ConsumerGroupMetadata metadata = mock(ConsumerGroupMetadata.class);
200+
given(consumer.groupMetadata()).willReturn(metadata);
201+
given(metadata.groupId()).willReturn(CONTAINER_ID);
200202
return consumer;
201203
}
202204

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,9 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
244244
props.setAssignmentCommitOption(AssignmentCommitOption.ALWAYS);
245245
props.setEosMode(eosMode);
246246
props.setStopContainerWhenFenced(stopWhenFenced);
247-
ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("group");
247+
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
248248
given(consumer.groupMetadata()).willReturn(consumerGroupMetadata);
249+
given(consumerGroupMetadata.groupId()).willReturn("group");
249250
final KafkaTemplate template = new KafkaTemplate(pf);
250251
if (AckMode.MANUAL_IMMEDIATE.equals(ackMode)) {
251252
props.setMessageListener((AcknowledgingMessageListener<Object, Object>) (data, acknowledgment) -> {

0 commit comments

Comments
 (0)