-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.10
Describe the bug
Conversion is not performed when using a SmartMessageConverter in a batch Listener.
To Reproduce
Steps to reproduce the behavior.
- Set up a batch Kafka listener
- Configure a
SmartMessageConverterusing thecontentTypeConverterproperty of the@KafkaListener - Send a record that should be converted by the
SmartMessageConverterconfigured
--> A ClassCastException is thrown by the listener method as no conversion has been done
Expected behavior
Messages should be converted with the configured SmartMessageConverter when using a batch listener.
Sample
Here is a test case to reproduce this behavior :
@SpringJUnitConfig
@EmbeddedKafka(partitions = 1, topics = {ListenerConvertionTest.TOPIC1, ListenerConvertionTest.TOPIC2})
public class ListenerConvertionTest {
public static final String TOPIC1 = "topic1";
public static final String TOPIC2 = "topic2";
@Autowired
private Config config;
@Autowired
private KafkaTemplate<Integer, byte[]> template;
@Test
public void testContentTypeConversionInRecordListener() throws Exception {
RecordListener listener = this.config.recordListener();
template.send(TOPIC1, "foo".getBytes());
template.send(TOPIC1, "bar".getBytes());
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.received).hasSize(2).containsExactly("foo", "bar");
}
@Test
public void testContentTypeConversionInBatchListener() throws Exception {
BatchListener listener = this.config.batchListener();
template.send(TOPIC2, "foo".getBytes());
template.send(TOPIC2, "bar".getBytes());
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.received).hasSize(2).containsExactly("foo", "bar");
}
@Configuration
@EnableKafka
public static class Config {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Integer, byte[]> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test_group", "false", this.embeddedKafka);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return consumerProps;
}
@Bean
public KafkaTemplate<Integer, byte[]> template() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<Integer, byte[]> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = KafkaTestUtils.producerProps(this.embeddedKafka);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
return props;
}
@Bean
public SmartMessageConverter stringMessageConverter() {
return new StringMessageConverter();
}
@Bean
public RecordListener recordListener() {
return new RecordListener();
}
@Bean
public BatchListener batchListener() {
return new BatchListener();
}
}
public static class RecordListener {
private final CountDownLatch latch = new CountDownLatch(2);
private final List<String> received = new ArrayList<>();
@KafkaListener(
id = "recordListener",
topics = TOPIC1,
contentTypeConverter = "stringMessageConverter" // Works fine as expected
)
public void listen(String message) {
received.add(message);
latch.countDown();
}
}
public static class BatchListener {
private final CountDownLatch latch = new CountDownLatch(2);
private final List<String> received = new ArrayList<>();
@KafkaListener(
id = "batchListener",
topics = TOPIC2,
contentTypeConverter = "stringMessageConverter", // converter is not invoked when in batch mode
batch = "true"
)
public void listen(List<String> messages) {
messages.forEach(message -> {
received.add(message);
latch.countDown();
}
);
}
}
}