Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -80,7 +81,7 @@ public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Met
* @param errorHandler the error handler.
*/
public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method,
@Nullable KafkaListenerErrorHandler errorHandler) {
@Nullable KafkaListenerErrorHandler errorHandler) {

super(bean, method, errorHandler);
}
Expand All @@ -107,6 +108,24 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdap
this.batchToRecordAdapter = batchToRecordAdapter;
}

/**
* Set the {@link SmartMessageConverter} to use with both the default record converter
* and the batch message converter.
* <p>
* When a {@code SmartMessageConverter} is configured via
* {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is
* properly propagated to both the record converter (via the parent class) and the
* batch converter to support message conversion in batch listeners.
* @param messageConverter the converter to set
*/
@Override
public void setMessagingConverter(SmartMessageConverter messageConverter) {
super.setMessagingConverter(messageConverter);
if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) {
batchConverter.setMessagingConverter(messageConverter);
}
}

/**
* Return the {@link BatchMessagingMessageConverter} for this listener,
* being able to convert {@link org.springframework.messaging.Message}.
Expand Down Expand Up @@ -170,7 +189,7 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme

@SuppressWarnings({ "unchecked", "rawtypes" })
protected Message<?> toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {
@Nullable Consumer<?, ?> consumer) {

return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;

/**
Expand Down Expand Up @@ -78,6 +79,8 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {
@Nullable
private final RecordMessageConverter recordConverter;

private @Nullable SmartMessageConverter messagingConverter;

private boolean generateMessageId = false;

private boolean generateTimestamp = false;
Expand Down Expand Up @@ -142,6 +145,20 @@ public RecordMessageConverter getRecordMessageConverter() {
return this.recordConverter;
}

/**
* Set a spring-messaging {@link SmartMessageConverter} to convert the record value to
* the desired type.
* @param messagingConverter the converter.
* @since 3.3.11
*/
public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) {
this.messagingConverter = messagingConverter;

if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) {
messagingRecordConverter.setMessagingConverter(messagingConverter);
}
}

/**
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
* {@link KafkaHeaders#RAW_DATA}.
Expand All @@ -154,7 +171,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) {

@Override // NOSONAR
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer, Type type) {
@Nullable Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders =
new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);
Expand Down Expand Up @@ -275,13 +292,14 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
* @param type the type - must be a {@link ParameterizedType} with a single generic
* type parameter.
* @param conversionFailures Conversion failures.
* @return the converted payload.
* @return the converted payload, potentially further processed by a {@link SmartMessageConverter}.
*/
protected @Nullable Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
try {
if (this.recordConverter != null) {
Type actualType = ((ParameterizedType) type).getActualTypeArguments()[0];
Object payload = this.recordConverter
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
.toMessage(record, null, null, actualType).getPayload();
conversionFailures.add(null);
return payload;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2016-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;

import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.MessageBuilder;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for SmartMessageConverter support in batch listeners.
* Reproduces the issue described in GH-4097.
*
* @author Jujuwryy
* @since 3.3.11
*/
class BatchSmartMessageConverterTests {

@Test
void testSmartMessageConverterWorksInBatchConversion() {
// Given: A BatchMessagingMessageConverter with a record converter and SmartMessageConverter
MessagingMessageConverter recordConverter = new MessagingMessageConverter();
BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);

// Set up SmartMessageConverter that converts byte[] to String
TestStringMessageConverter smartConverter = new TestStringMessageConverter();
batchConverter.setMessagingConverter(smartConverter);

// Create test records with byte[] values that need conversion to String
List<ConsumerRecord<?, ?>> records = Arrays.asList(
new ConsumerRecord<>("topic", 0, 0, "key", "hello".getBytes()),
new ConsumerRecord<>("topic", 0, 1, "key", "world".getBytes())
);

// When: Convert batch with List<String> target type
Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});
Message<?> result = batchConverter.toMessage(records, null, null, targetType);

// Then: Verify the SmartMessageConverter was applied and byte[] was converted to String
assertThat(result).isNotNull();
assertThat(result.getPayload()).isInstanceOf(List.class);

List<?> payloads = (List<?>) result.getPayload();
assertThat(payloads).hasSize(2);
assertThat(payloads.get(0)).isEqualTo("hello");
assertThat(payloads.get(1)).isEqualTo("world");
}

@Test
void testBatchConversionWithoutSmartMessageConverter() {
// Given: A BatchMessagingMessageConverter without SmartMessageConverter
MessagingMessageConverter recordConverter = new MessagingMessageConverter();
BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter);

// Create test records with byte[] values
List<ConsumerRecord<?, ?>> records = Arrays.asList(
new ConsumerRecord<>("topic", 0, 0, "key", "test".getBytes())
);

// When: Convert batch
Type targetType = new TestParameterizedType(List.class, new Type[]{String.class});
Message<?> result = batchConverter.toMessage(records, null, null, targetType);

// Then: Should work but payloads remain as byte[]
assertThat(result).isNotNull();
List<?> payloads = (List<?>) result.getPayload();
assertThat(payloads.get(0)).isInstanceOf(byte[].class);
}

/**
* Test SmartMessageConverter that converts byte[] to String.
*/
static class TestStringMessageConverter implements SmartMessageConverter {

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return convertPayload(message.getPayload());
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass, Object conversionHint) {
return convertPayload(message.getPayload());
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
return toMessage(payload, headers);
}

private Object convertPayload(Object payload) {
// Convert byte[] to String - this is the core functionality being tested
if (payload instanceof byte[] bytes) {
return new String(bytes);
}
return payload;
}
}

/**
* Helper class for creating parameterized types for testing.
*/
static class TestParameterizedType implements java.lang.reflect.ParameterizedType {

private final Type rawType;

private final Type[] typeArguments;

TestParameterizedType(Type rawType, Type[] typeArguments) {
this.rawType = rawType;
this.typeArguments = typeArguments;
}

public Type[] getActualTypeArguments() {
return typeArguments;
}

public Type getRawType() {
return rawType;
}

public Type getOwnerType() {
return null;
}
}
}