Skip to content

Commit bf5f815

Browse files
committed
Implement queue purge
Fixes #136
1 parent 44fd22c commit bf5f815

File tree

3 files changed

+60
-5
lines changed

3 files changed

+60
-5
lines changed

src/main/java/com/rabbitmq/client/amqp/Management.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public interface Management extends AutoCloseable {
9494
*/
9595
UnbindSpecification unbind();
9696

97+
void queuePurge(String queue);
98+
9799
/** Close the management instance and release its resources. */
98100
@Override
99101
void close();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class AmqpManagement implements Management {
6868
private static final int CODE_200 = 200;
6969
private static final int CODE_201 = 201;
7070
private static final int CODE_204 = 204;
71+
private static final int CODE_400 = 400;
7172
private static final int CODE_404 = 404;
7273
private static final int CODE_409 = 409;
7374

@@ -173,6 +174,14 @@ public UnbindSpecification unbind() {
173174
return new AmqpBindingManagement.AmqpUnbindSpecification(this);
174175
}
175176

177+
@Override
178+
public void queuePurge(String queue) {
179+
Map<String, Object> responseBody = delete(queueLocation(queue) + "/messages", CODE_200);
180+
if (!responseBody.containsKey("message_count")) {
181+
throw new AmqpException("Response body should contain message_count");
182+
}
183+
}
184+
176185
void setToken(String token) {
177186
if (!this.connection.setTokenSupported()) {
178187
throw new UnsupportedOperationException("Token renewal requires at least RabbitMQ 4.1.0");
@@ -477,18 +486,23 @@ private static void checkResponse(
477486
if (!requestId.equals(response.correlationId())) {
478487
throw new AmqpException("Unexpected correlation ID");
479488
}
480-
int responseCode = request.mapResponse().code();
489+
int responseCode = request.response().code();
490+
String explanation =
491+
request.response().body() instanceof String ? (String) request.response().body() : null;
481492
if (IntStream.of(expectedResponseCodes).noneMatch(c -> c == responseCode)) {
482-
if (responseCode == CODE_404) {
483-
throw new AmqpException.AmqpEntityDoesNotExistException("Entity does not exist");
493+
if (responseCode == CODE_404
494+
|| (responseCode == CODE_400 && queueDoesNotExist(explanation))) {
495+
explanation = explanation == null ? "Entity does not exist" : explanation;
496+
throw new AmqpException.AmqpEntityDoesNotExistException(explanation);
484497
} else {
485498
String message =
486499
String.format(
487-
"Unexpected response code: %d instead of %s",
500+
"Unexpected response code: %d instead of %s%s",
488501
responseCode,
489502
IntStream.of(expectedResponseCodes)
490503
.mapToObj(String::valueOf)
491-
.collect(Collectors.joining(", ")));
504+
.collect(Collectors.joining(", ")),
505+
explanation != null ? " (message: '" + explanation : "')");
492506
try {
493507
LOGGER.info(
494508
"Management request failed: '{}'. Response body: '{}'",
@@ -502,6 +516,11 @@ private static void checkResponse(
502516
}
503517
}
504518

519+
private static boolean queueDoesNotExist(String explanation) {
520+
return explanation != null
521+
&& (explanation.contains("no queue '") && explanation.contains("in vhost '"));
522+
}
523+
505524
void bind(Map<String, Object> body) {
506525
declare(body, "/bindings", POST, CODE_204);
507526
}
@@ -637,6 +656,10 @@ private <K, V> Response<Map<K, V>> mapResponse() {
637656
return (Response<Map<K, V>>) this.response.get();
638657
}
639658

659+
private Response<?> response() {
660+
return this.response.get();
661+
}
662+
640663
@SuppressWarnings("unchecked")
641664
private <K, V> Map<K, V> responseBodyAsMap() throws ClientException {
642665
return (Map<K, V>) this.responseMessage.get().body();

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,36 @@ void publishedMessageShouldBeRejectedWhenQueueLimitIsReached(TestInfo info) {
707707
}
708708
}
709709

710+
@Test
711+
void queuePurgeShouldRemoveAllMessages(TestInfo info) {
712+
Management management = connection.management();
713+
String q = TestUtils.name(info);
714+
management.queue(q).exclusive(true).declare();
715+
int messageCount = 100;
716+
Sync publishSync = sync(messageCount);
717+
Publisher.Callback callback =
718+
context -> {
719+
if (context.status() == Publisher.Status.ACCEPTED) {
720+
publishSync.down();
721+
}
722+
};
723+
Publisher publisher = connection.publisherBuilder().queue(q).build();
724+
IntStream.range(0, messageCount)
725+
.forEach(ignored -> publisher.publish(publisher.message(), callback));
726+
assertThat(publishSync).completes();
727+
assertThat(management.queueInfo(q)).hasMessageCount(messageCount);
728+
management.queuePurge(q);
729+
assertThat(management.queueInfo(q)).isEmpty();
730+
}
731+
732+
@Test
733+
void queuePurgeOnNonExistingQueueShouldThrowException(TestInfo info) {
734+
String q = TestUtils.name(info);
735+
assertThatThrownBy(() -> connection.management().queuePurge(q))
736+
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class)
737+
.hasMessageContaining(q);
738+
}
739+
710740
private static String uuid() {
711741
return UUID.randomUUID().toString();
712742
}

0 commit comments

Comments
 (0)