Skip to content

Commit b4e97f9

Browse files
committed
Fix reply-to handling logic for AMQP 1.0
The `reply-to` property from request messages is parsed to the `org.springframework.amqp.core.Address` object. This one removes the first `/` leaving `routingKey`, based on the mentioned `reply-to`, with a `queues/` prefix. The value in the `reply-to` is already encoded, so we don't need to go extra parsing logic in the `RabbitAmqpTemplate.toAmqpMessage()` in regards `com.rabbitmq.client.amqp.Message.MessageAddressBuilder` * Fix `RabbitAmqpTemplate.toAmqpMessage()` to check for `queues/` prefix before going down to the `com.rabbitmq.client.amqp.Message.MessageAddressBuilder` logic. Instead, use the `queue` value as is in the `com.rabbitmq.client.amqp.Message.to()` property adding required `/` at the beginning of the value * Revert `RabbitAmqpMessageListenerAdapter.sendResponse()` logic to `this.rabbitAmqpTemplate.send()` without modifying `replyToRoutingKey` value * Fix typos in the `Address` Javadocs
1 parent f4adbf5 commit b4e97f9

File tree

3 files changed

+22
-22
lines changed

3 files changed

+22
-22
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/Address.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* Represents an address for publication of an AMQP message. The AMQP 0.9
2727
* specification has an unstructured string that is used as a "reply to" address.
28-
* There are however conventions in use and this class makes it easier to
28+
* There are, however, conventions in use, and this class makes it easier to
2929
* follow these conventions, which can be easily summarised as:
3030
*
3131
* <pre class="code">
@@ -35,7 +35,7 @@
3535
* Here we also the exchange name to default to empty
3636
* (so just a routing key will work as a queue name).
3737
* <p>
38-
* For AMQP 1.0, only routing key is treated as target destination.
38+
* For AMQP 1.0, only a routing key is treated as a target destination.
3939
*
4040
*
4141
* @author Mark Pollack
@@ -65,7 +65,8 @@ public class Address {
6565
* (exchange)/(routingKey)
6666
* </pre>
6767
* .
68-
* If exchange is parsed to empty string, then routing key is treated as a queue name.
68+
* If exchange is parsed to an empty string, then a routing key is treated as a queue name.
69+
* The {@link #AMQ_RABBITMQ_REPLY_TO} matching address is treated as a routing key.
6970
* @param address a structured string.
7071
*/
7172
public Address(String address) {

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpTemplate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -663,13 +663,26 @@ private static com.rabbitmq.client.amqp.Message toAmqpMessage(@Nullable String e
663663
amqpMessageSupplier.get()
664664
.toAddress();
665665

666-
JavaUtils.INSTANCE
667-
.acceptIfNotNull(exchange, address::exchange)
668-
.acceptIfNotNull(routingKey, address::key)
669-
.acceptIfNotNull(queue, address::queue);
666+
/*
667+
* The 'reply-to' property in the request comes already encoded and with the '/queues/' prefix.
668+
* However, the 'org.springframework.amqp.core.Address' removes the first '/' on parsing.
669+
* Use this value as is, skipping 'com.rabbitmq.client.amqp.Message.MessageAddressBuilder' logic.
670+
*/
671+
boolean replyToProperty = queue != null && queue.startsWith("queues/");
672+
673+
if (!replyToProperty) {
674+
JavaUtils.INSTANCE
675+
.acceptIfNotNull(exchange, address::exchange)
676+
.acceptIfNotNull(routingKey, address::key)
677+
.acceptIfNotNull(queue, address::queue);
678+
}
670679

671680
com.rabbitmq.client.amqp.Message amqpMessage = address.message();
672681

682+
if (replyToProperty) {
683+
amqpMessage.to('/' + queue);
684+
}
685+
673686
RabbitAmqpUtils.toAmqpMessage(message, amqpMessage);
674687

675688
return amqpMessage;

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpMessageListenerAdapter.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@
2424

2525
import com.rabbitmq.client.Channel;
2626
import com.rabbitmq.client.amqp.Consumer;
27-
import com.rabbitmq.client.amqp.Publisher;
2827
import org.jspecify.annotations.Nullable;
2928

3029
import org.springframework.amqp.core.Address;
3130
import org.springframework.amqp.core.AmqpAcknowledgment;
3231
import org.springframework.amqp.core.Message;
3332
import org.springframework.amqp.core.MessagePostProcessor;
34-
import org.springframework.amqp.rabbit.core.AmqpNackReceivedException;
3533
import org.springframework.amqp.rabbit.listener.adapter.InvocationResult;
3634
import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter;
3735
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
@@ -157,19 +155,7 @@ protected void sendResponse(@Nullable Channel channel, Address replyTo, Message
157155
sendFuture = this.rabbitAmqpTemplate.send(replyToExchange, replyToRoutingKey, replyMessage);
158156
}
159157
else {
160-
Assert.hasText(replyToRoutingKey, "The 'replyTo' must be provided, in request message or in @SendTo.");
161-
Publisher publisher = this.rabbitAmqpTemplate.getPublisher();
162-
com.rabbitmq.client.amqp.Message amqpMessage = publisher.message();
163-
RabbitAmqpUtils.toAmqpMessage(replyMessage, amqpMessage);
164-
amqpMessage.to("/queues/" + replyToRoutingKey.replaceFirst("queues/", ""));
165-
sendFuture = new CompletableFuture<>();
166-
publisher.publish(amqpMessage, (context) -> {
167-
switch (context.status()) {
168-
case ACCEPTED -> sendFuture.complete(true);
169-
case REJECTED, RELEASED -> sendFuture.completeExceptionally(
170-
new AmqpNackReceivedException("The message was rejected", messageIn));
171-
}
172-
});
158+
sendFuture = this.rabbitAmqpTemplate.send(replyToRoutingKey, replyMessage);
173159
}
174160

175161
sendFuture.join();

0 commit comments

Comments
 (0)