Skip to content

Commit 1079868

Browse files
committed
Support discard and requeue with annotations
Maps to modified outcome.
1 parent 3ace5d0 commit 1079868

File tree

5 files changed

+382
-1
lines changed

5 files changed

+382
-1
lines changed

src/main/java/com/rabbitmq/client/amqp/Consumer.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.util.Map;
21+
2022
/**
2123
* API to consume messages from a RabbitMQ queue.
2224
*
@@ -75,12 +77,48 @@ interface Context {
7577
*/
7678
void discard();
7779

80+
/**
81+
* Discard the message with annotations to combine with the existing message annotations.
82+
*
83+
* <p>This means the message cannot be processed because it is invalid, the broker can drop it
84+
* or dead-letter it if it is configured.
85+
*
86+
* <p>Annotation keys must start with the <code>x-opt-</code> prefix.
87+
*
88+
* <p>This maps to the AMQP 1.0 <code>
89+
* modified{delivery-failed = true, undeliverable-here = true}</code> outcome.
90+
*
91+
* @param annotations message annotations to combine with existing ones
92+
* @see <a
93+
* href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
94+
* 1.0 <code>modified</code> outcome</a>
95+
*/
96+
void discard(Map<String, Object> annotations);
97+
7898
/**
7999
* Requeue the message (AMQP 1.0 <code>released</code> outcome).
80100
*
81101
* <p>This means the message has not been processed and the broker can requeue it and deliver it
82102
* to the same or a different consumer.
83103
*/
84104
void requeue();
105+
106+
/**
107+
* Requeue the message with annotations to combine with the existing message annotations.
108+
*
109+
* <p>This means the message has not been processed and the broker can requeue it and deliver it
110+
* to the same or a different consumer.
111+
*
112+
* <p>Annotation keys must start with the <code>x-opt-</code> prefix.
113+
*
114+
* <p>This maps to the AMQP 1.0 <code>
115+
* modified{delivery-failed = false, undeliverable-here = false}</code> outcome.
116+
*
117+
* @param annotations message annotations to combine with existing ones
118+
* @see <a
119+
* href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-modified">AMQP
120+
* 1.0 <code>modified</code> outcome</a>
121+
*/
122+
void requeue(Map<String, Object> annotations);
85123
}
86124
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.client.amqp.BackOffDelayPolicy;
2525
import com.rabbitmq.client.amqp.Consumer;
2626
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.concurrent.*;
@@ -387,6 +388,24 @@ public void discard() {
387388
}
388389
}
389390

391+
@Override
392+
public void discard(Map<String, Object> annotations) {
393+
if (settled.compareAndSet(false, true)) {
394+
try {
395+
annotations = annotations == null ? Collections.emptyMap() : annotations;
396+
checkAnnotations(annotations);
397+
protonExecutor.execute(replenishCreditOperation);
398+
delivery.disposition(DeliveryState.modified(true, true, annotations), true);
399+
unsettledMessageCount.decrementAndGet();
400+
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
401+
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
402+
LOGGER.debug("message discard (modified) failed: {}", e.getMessage());
403+
} catch (ClientException e) {
404+
throw ExceptionUtils.convert(e);
405+
}
406+
}
407+
}
408+
390409
@Override
391410
public void requeue() {
392411
if (settled.compareAndSet(false, true)) {
@@ -402,5 +421,33 @@ public void requeue() {
402421
}
403422
}
404423
}
424+
425+
@Override
426+
public void requeue(Map<String, Object> annotations) {
427+
if (settled.compareAndSet(false, true)) {
428+
try {
429+
annotations = annotations == null ? Collections.emptyMap() : annotations;
430+
checkAnnotations(annotations);
431+
protonExecutor.execute(replenishCreditOperation);
432+
delivery.disposition(DeliveryState.modified(false, false, annotations), true);
433+
unsettledMessageCount.decrementAndGet();
434+
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
435+
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
436+
LOGGER.debug("message requeue (modified) failed: {}", e.getMessage());
437+
} catch (ClientException e) {
438+
throw ExceptionUtils.convert(e);
439+
}
440+
}
441+
}
442+
}
443+
444+
static void checkAnnotations(Map<String, Object> annotations) {
445+
annotations.forEach(
446+
(k, v) -> {
447+
if (!k.startsWith("x-opt-")) {
448+
throw new IllegalArgumentException(
449+
"Message annotation keys must start with 'x-opt-': " + k);
450+
}
451+
});
405452
}
406453
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import static com.rabbitmq.client.amqp.impl.AmqpConsumer.checkAnnotations;
21+
import static java.util.Map.of;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
public class AmqpConsumerTest {
27+
28+
@Test
29+
void checkAnnotationsOK() {
30+
checkAnnotations(of());
31+
checkAnnotations(of("x-opt-foo", "bar"));
32+
checkAnnotations(of("x-opt-foo-1", "bar1", "x-opt-foo-2", "bar2"));
33+
}
34+
35+
@Test
36+
void checkAnnotationsKO() {
37+
assertThatThrownBy(() -> checkAnnotations(of("foo", "bar")))
38+
.isInstanceOf(IllegalArgumentException.class);
39+
assertThatThrownBy(() -> checkAnnotations(of("x-opt-foo", "bar1", "foo", "bar2")))
40+
.isInstanceOf(IllegalArgumentException.class);
41+
}
42+
}

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.assertj.core.api.Assertions.fail;
2121

2222
import com.rabbitmq.client.amqp.Management;
23+
import com.rabbitmq.client.amqp.Message;
2324
import java.time.Duration;
2425
import java.util.List;
2526
import java.util.concurrent.CountDownLatch;
@@ -44,6 +45,10 @@ static SyncAssert assertThat(TestUtils.Sync sync) {
4445
return new SyncAssert(sync);
4546
}
4647

48+
static MessageAssert assertThat(Message message) {
49+
return new MessageAssert(message);
50+
}
51+
4752
static ConnectionAssert assertThat(AmqpConnection connection) {
4853
return new ConnectionAssert(connection);
4954
}
@@ -208,6 +213,51 @@ private QueueInfoAssert flag(String label, boolean expected, boolean actual) {
208213
}
209214
}
210215

216+
static class MessageAssert extends AbstractObjectAssert<MessageAssert, Message> {
217+
218+
private MessageAssert(Message message) {
219+
super(message, MessageAssert.class);
220+
}
221+
222+
MessageAssert hasId(Object id) {
223+
isNotNull();
224+
if (!actual.messageId().equals(id)) {
225+
fail("Message ID should be '%s' but is '%s'", id, actual.messageId());
226+
}
227+
return this;
228+
}
229+
230+
MessageAssert hasAnnotation(String key) {
231+
isNotNull();
232+
if (!actual.hasAnnotation(key)) {
233+
fail("Message should have annotation '%s' but does not", key);
234+
}
235+
return this;
236+
}
237+
238+
MessageAssert hasAnnotation(String key, Object value) {
239+
if (key == null || value == null) {
240+
throw new IllegalArgumentException();
241+
}
242+
isNotNull();
243+
hasAnnotation(key);
244+
if (!value.equals(this.actual.annotation(key))) {
245+
fail(
246+
"Message should have annotation '%s = %s' but has '%s = %s'",
247+
key, value, key, this.actual.annotation(key));
248+
}
249+
return this;
250+
}
251+
252+
MessageAssert doesNotHaveAnnotation(String key) {
253+
isNotNull();
254+
if (actual.hasAnnotation(key)) {
255+
fail("Message should not have annotation '%s' but has it", key);
256+
}
257+
return this;
258+
}
259+
}
260+
211261
static class ConnectionAssert extends AbstractObjectAssert<ConnectionAssert, AmqpConnection> {
212262

213263
private ConnectionAssert(AmqpConnection connection) {
@@ -219,7 +269,7 @@ ConnectionAssert hasNodename(String nodename) {
219269
isNotNull();
220270
if (!actual.connectionNodename().equals(nodename)) {
221271
fail(
222-
"Connection should on node '%s' but is on node '%s'",
272+
"Connection should be on node '%s' but is on node '%s'",
223273
nodename, actual.connectionNodename());
224274
}
225275
return this;

0 commit comments

Comments
 (0)