Skip to content

Commit daf396e

Browse files
authored
Upgrade to Kafka 4.1.0 and update API usage
- Bump kafkaVersion from 4.0.0 to 4.1.0 in build.gradle - Remove broker properties no longer needed in embedded Kafka tests: * unstable.api.versions.enable=true * group.coordinator.rebalance.protocols=classic,share - Replace AcknowledgeType.ACCEPT with commitSync() in share consumer tests - Remove explicit acknowledge() calls in implicit mode tests since 4.1.0 client disallowed that - Leverage implicit acknowledgment where records are automatically treated as ACCEPT - Use commitSync() for explicit commit timing instead of relying on next poll() auto-commit - Maintains same test semantics while following Kafka 4.1 implicit mode best practices - Update SerializationIntegrationTests to use Plugin-based deserializer access - Update SerializationIntegrationTests to use Plugin-based deserializer access in 4.1.0 client - Replace direct deserializer access with plugin mechanism retrieval - Use KafkaTestUtils.getPropertyValue() to access valueDeserializerPlugin from consumer internals - Verify plugin returns expected delegating deserializer instance Signed-off-by: Soby Chacko <[email protected]>
1 parent fde7d31 commit daf396e

File tree

5 files changed

+12
-11
lines changed

5 files changed

+12
-11
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ ext {
6060
jaywayJsonPathVersion = '2.9.0'
6161
junit4Version = '4.13.2'
6262
junitJupiterVersion = '5.13.4'
63-
kafkaVersion = '4.0.0'
63+
kafkaVersion = '4.1.0'
6464
kotlinCoroutinesVersion = '1.10.2'
6565
log4jVersion = '2.25.1'
6666
micrometerDocsVersion = '1.0.4'

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.clients.admin.AdminClient;
3232
import org.apache.kafka.clients.admin.AlterConfigOp;
3333
import org.apache.kafka.clients.admin.ConfigEntry;
34-
import org.apache.kafka.clients.consumer.AcknowledgeType;
3534
import org.apache.kafka.clients.consumer.ShareConsumer;
3635
import org.apache.kafka.clients.producer.KafkaProducer;
3736
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -56,8 +55,6 @@
5655
@EmbeddedKafka(
5756
topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1,
5857
brokerProperties = {
59-
"unstable.api.versions.enable=true",
60-
"group.coordinator.rebalance.protocols=classic,share",
6158
"share.coordinator.state.topic.replication.factor=1",
6259
"share.coordinator.state.topic.min.isr=1"
6360
})
@@ -248,7 +245,9 @@ private static List<String> runSharedConsumerTest(String topic, String groupId,
248245
var records = consumer.poll(Duration.ofMillis(200));
249246
for (var r : records) {
250247
allReceived.add(r.value());
251-
consumer.acknowledge(r, AcknowledgeType.ACCEPT);
248+
// Leverage implicit acknowledgment where records are automatically treated as ACCEPT
249+
// Use commitSync() for explicit commit timing instead of relying on next poll() auto-commit
250+
consumer.commitSync(Duration.ofMillis(10000));
252251
latch.countDown();
253252
}
254253
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,6 @@
6363
@DirtiesContext
6464
@EmbeddedKafka(topics = "share-listener-integration-test",
6565
brokerProperties = {
66-
"unstable.api.versions.enable=true",
67-
"group.coordinator.rebalance.protocols=classic,share",
6866
"share.coordinator.state.topic.replication.factor=1",
6967
"share.coordinator.state.topic.min.isr=1"
7068
})

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
@EmbeddedKafka(
5151
topics = {"share-listener-integration-test"}, partitions = 1,
5252
brokerProperties = {
53-
"unstable.api.versions.enable=true",
54-
"group.coordinator.rebalance.protocols=classic,share",
5553
"share.coordinator.state.topic.replication.factor=1",
5654
"share.coordinator.state.topic.min.isr=1"
5755
}

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.regex.Pattern;
2121

2222
import org.apache.kafka.common.header.Headers;
23+
import org.apache.kafka.common.internals.Plugin;
2324
import org.apache.kafka.common.serialization.Deserializer;
2425
import org.apache.kafka.common.serialization.StringDeserializer;
2526
import org.junit.jupiter.api.Test;
@@ -37,6 +38,7 @@
3738

3839
/**
3940
* @author Gary Russell
41+
* @author Soby Chacko
4042
* @since 2.8.1
4143
*
4244
*/
@@ -46,6 +48,7 @@ public class SerializationIntegrationTests {
4648
public static final String DBTD_TOPIC = "dbtd";
4749

4850
@Test
51+
@SuppressWarnings("unchecked")
4952
void configurePreLoadedDelegates() {
5053
Map<String, Object> consumerProps =
5154
KafkaTestUtils.consumerProps(EmbeddedKafkaCondition.getBroker(), DBTD_TOPIC, false);
@@ -63,8 +66,11 @@ void configurePreLoadedDelegates() {
6366
props.setMessageListener(mock(MessageListener.class));
6467
KafkaMessageListenerContainer<String, Object> container = new KafkaMessageListenerContainer<>(cFact, props);
6568
container.start();
66-
assertThat(KafkaTestUtils.getPropertyValue(container,
67-
"listenerConsumer.consumer.delegate.deserializers.valueDeserializer"))
69+
//The Deserializers class uses a plugin mechanism to retrieve the actual deserializer.
70+
Plugin<Deserializer<?>> valueDeserializerPlugin = (Plugin<Deserializer<?>>) KafkaTestUtils.getPropertyValue(container,
71+
"listenerConsumer.consumer.delegate.deserializers.valueDeserializerPlugin");
72+
assertThat(valueDeserializerPlugin).isNotNull();
73+
assertThat(valueDeserializerPlugin.get())
6874
.isSameAs(delegating);
6975
Map<?, ?> delegates = KafkaTestUtils.getPropertyValue(delegating, "delegates", Map.class);
7076
assertThat(delegates).hasSize(1);

0 commit comments

Comments
 (0)