From 8ead33a0bcdd3e0070a64f3857aeda1e6fe48146 Mon Sep 17 00:00:00 2001 From: mipo256 Date: Sat, 12 Jul 2025 20:30:22 +0300 Subject: [PATCH] GH-3989 Consider the custom name of the reply topic name in sendAndReceive Signed-off-by: mipo256 --- .../kafka/requestreply/CorrelationKey.java | 8 ++ .../requestreply/ReplyingKafkaTemplate.java | 10 +- .../ReplyingKafkaTemplateTests.java | 112 +++++++++++++++++- 3 files changed, 124 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java index 33ed61683d..0039b1b5c5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/CorrelationKey.java @@ -35,8 +35,16 @@ public final class CorrelationKey { private final byte[] correlationId; + /** + * Cached hex representation of the {@link #correlationId}. + * TODO: Migrate to stable values JEP 502 + */ private @Nullable String asString; + /** + * Cached hash code. + * TODO: Migrate to stable values JEP 502 + */ private volatile @Nullable Integer hashCode; public CorrelationKey(byte[] correlationId) { // NOSONAR array reference diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index b03b64527a..2bdf3ae80d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -73,6 +73,7 @@ * @author Artem Bilan * @author Borahm Lee * @author Francois Rosiere + * @author Mikhail Polivakha * * @since 2.1.3 * @@ -422,12 +423,13 @@ public RequestReplyFuture sendAndReceive(ProducerRecord record, @ CorrelationKey correlationId = this.correlationStrategy.apply(record); Assert.notNull(correlationId, "the created 'correlationId' cannot be null"); Headers headers = record.headers(); - boolean hasReplyTopic = headers.lastHeader(KafkaHeaders.REPLY_TOPIC) != null; + boolean hasReplyTopic = headers.lastHeader(this.replyTopicHeaderName) != null; if (!hasReplyTopic && this.replyTopic != null) { headers.add(new RecordHeader(this.replyTopicHeaderName, this.replyTopic)); - if (this.replyPartition != null) { - headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition)); - } + } + boolean hasReplyPartition = headers.lastHeader(this.replyPartitionHeaderName) != null; + if (!hasReplyPartition && this.replyPartition != null) { + headers.add(new RecordHeader(this.replyPartitionHeaderName, this.replyPartition)); } Object correlation = this.binaryCorrelation ? correlationId : correlationId.toString(); byte[] correlationValue = this.binaryCorrelation diff --git a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java index 6201665538..bbf454a0c4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java @@ -102,12 +102,14 @@ * @author Gary Russell * @author Nathan Xu * @author Soby Chacko + * @author Mikhail Polivakha * @since 2.1.3 * */ @SpringJUnitConfig @DirtiesContext -@EmbeddedKafka(partitions = 5, topics = { ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST, +@EmbeddedKafka(partitions = 5, topics = { + ReplyingKafkaTemplateTests.A_REPLY, ReplyingKafkaTemplateTests.A_REQUEST, ReplyingKafkaTemplateTests.B_REPLY, ReplyingKafkaTemplateTests.B_REQUEST, ReplyingKafkaTemplateTests.C_REPLY, ReplyingKafkaTemplateTests.C_REQUEST, ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST, @@ -119,7 +121,10 @@ ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST, ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST, ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST, - ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST }) + ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST, + ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_REQUEST, + ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY, ReplyingKafkaTemplateTests.CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST +}) public class ReplyingKafkaTemplateTests { public static final String A_REPLY = "aReply"; @@ -174,6 +179,14 @@ public class ReplyingKafkaTemplateTests { public static final String M_REQUEST = "mRequest"; + public static final String CUSTOM_REPLY_HEADER_REPLY = "CUSTOM_REPLY_HEADER_REPLY"; + + public static final String CUSTOM_REPLY_HEADER_REQUEST = "CUSTOM_REPLY_HEADER_REQUEST"; + + public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY"; + + public static final String CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST = "CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST"; + @Autowired private EmbeddedKafkaBroker embeddedKafka; @@ -365,6 +378,54 @@ public void testHandlerReturn() throws Exception { } } + @Test + public void testCustomReplyTopicHeaderIsNotDuplicated() throws Exception { + String customReplyHeaderName = "X-Custom-Reply-Header"; + ReplyingKafkaTemplate template = createTemplate(CUSTOM_REPLY_HEADER_REPLY); + template.setReplyTopicHeaderName(customReplyHeaderName); + try { + Message message = MessageBuilder.withPayload("expected_message") + .setHeader(customReplyHeaderName, CUSTOM_REPLY_HEADER_REPLY) + .setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_REQUEST) + .build(); + + RequestReplyMessageFuture future = template.sendAndReceive(message, Duration.ofSeconds(30)); + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok + Message resultingMessage = future.get(30, TimeUnit.SECONDS); + assertThat(resultingMessage.getPayload()).isEqualTo("OK"); + } + finally { + template.stop(); + template.destroy(); + } + } + + @Test + public void testCustomReplyHeadersAreNotDuplicated() throws Exception { + String customReplyTopicHeaderName = "X-Custom-Reply-Header"; + String customReplyPartitionHeaderName = "X-Custom-Reply-Partition"; + ReplyingKafkaTemplate template = createTemplate(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY); + template.setReplyTopicHeaderName(customReplyTopicHeaderName); + template.setReplyPartitionHeaderName(customReplyPartitionHeaderName); + + try { + Message message = MessageBuilder.withPayload("expected_message") + .setHeader(customReplyTopicHeaderName, CUSTOM_REPLY_HEADER_REPLY) + .setHeader(customReplyPartitionHeaderName, "test-partition") + .setHeader(KafkaHeaders.TOPIC, CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST) + .build(); + + RequestReplyMessageFuture future = template.sendAndReceive(message, Duration.ofSeconds(30)); + future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok + Message resultingMessage = future.get(30, TimeUnit.SECONDS); + assertThat(resultingMessage.getPayload()).isEqualTo("OK"); + } + finally { + template.stop(); + template.destroy(); + } + } + @Test public void testMessageReturnNoHeadersProvidedByListener() throws Exception { ReplyingKafkaTemplate template = createTemplate(H_REPLY); @@ -871,6 +932,14 @@ void testMessageIterableReturn() throws Exception { } } + private static int length(Iterable iterable) { + int counter = 0; + for (Object o : iterable) { + counter++; + } + return counter; + } + @Configuration @EnableKafka public static class Config { @@ -1046,6 +1115,45 @@ public List> handleM(String in) throws InterruptedException { return Collections.singletonList(message); } + @KafkaListener(id = CUSTOM_REPLY_HEADER_REQUEST, topics = CUSTOM_REPLY_HEADER_REQUEST) + @SendTo(CUSTOM_REPLY_HEADER_REPLY) // send to custom topic back + public String handleCustomReplyHeaderNoReplyPartition(ConsumerRecord inputMessage) { + Headers headers = inputMessage.headers(); + + if (length(headers.headers("X-Custom-Reply-Header")) != 1) { + return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once"; + } + + if (length(headers.headers(KafkaHeaders.REPLY_PARTITION)) != 0) { + return "It is expected that the user does NOT specify the reply partition in this test case"; + } + + if (!"expected_message".equals(inputMessage.value())) { + return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value()); + } + + return "OK"; + } + + @KafkaListener(id = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST, topics = CUSTOM_REPLY_HEADER_WITH_PARTITION_REQUEST) + @SendTo(CUSTOM_REPLY_HEADER_WITH_PARTITION_REPLY) // send to custom topic back + public String handleCustomReplyHeaderDefaultPartitionHeader(ConsumerRecord inputMessage) { + Headers headers = inputMessage.headers(); + + if (length(headers.headers("X-Custom-Reply-Header")) != 1) { + return "The X-Custom-Reply-Header header that signify the custom reply topic header name is duplicated. It is supposed to present only once"; + } + + if (length(headers.headers("X-Custom-Reply-Partition")) != 1) { + return "Executed a single reply partition header '%s' in the incoming message".formatted(KafkaHeaders.REPLY_PARTITION); + } + + if (!"expected_message".equals(inputMessage.value())) { + return "Expected message is 'expected_message', but got %s".formatted(inputMessage.value()); + } + + return "OK"; + } } @KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)