Skip to content

Commit c4e2cee

Browse files
authored
Use RabbitTestContainer for AMQP integration tests
Signed-off-by: Jiandong Ma <[email protected]>
1 parent aa392ac commit c4e2cee

12 files changed

+164
-62
lines changed

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/channel/ChannelTests-context.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
<int-amqp:publish-subscribe-channel id="channel" />
1313

14-
<rabbit:connection-factory id="rabbitConnectionFactory" host="localhost" />
14+
<rabbit:connection-factory id="rabbitConnectionFactory"
15+
port="#{T(org.springframework.integration.amqp.support.RabbitTestContainer).amqpPort()}" />
1516

1617
<int-amqp:channel id="withEP" extract-payload="true" message-converter="jackson" />
1718

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/channel/ChannelTests.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package org.springframework.integration.amqp.channel;
1818

19+
import java.io.IOException;
1920
import java.util.Collection;
21+
import java.util.List;
2022
import java.util.Set;
2123
import java.util.concurrent.CyclicBarrier;
2224
import java.util.concurrent.TimeUnit;
2325
import java.util.concurrent.locks.Lock;
2426

25-
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeAll;
2629
import org.junit.jupiter.api.Test;
2730

2831
import org.springframework.amqp.core.AmqpTemplate;
@@ -31,8 +34,6 @@
3134
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3235
import org.springframework.amqp.rabbit.core.RabbitAdmin;
3336
import org.springframework.amqp.rabbit.core.RabbitTemplate;
34-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
35-
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
3637
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer;
3738
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
3839
import org.springframework.amqp.support.converter.MessageConversionException;
@@ -41,6 +42,7 @@
4142
import org.springframework.beans.factory.annotation.Autowired;
4243
import org.springframework.integration.amqp.config.AmqpChannelFactoryBean;
4344
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
45+
import org.springframework.integration.amqp.support.RabbitTestContainer;
4446
import org.springframework.integration.support.MessageBuilder;
4547
import org.springframework.integration.test.util.TestUtils;
4648
import org.springframework.messaging.Message;
@@ -64,9 +66,28 @@
6466
*
6567
*/
6668
@SpringJUnitConfig
67-
@RabbitAvailable(queues = {"pollableWithEP", "withEP", "testConvertFail"})
6869
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
69-
public class ChannelTests {
70+
public class ChannelTests implements RabbitTestContainer {
71+
72+
static final String QUEUE_POLLABLE_WITH_EP = "pollableWithEP";
73+
74+
static final String QUEUE_WITH_EP = "withEP";
75+
76+
static final String QUEUE_CONVERT_FAIL = "testConvertFail";
77+
78+
@BeforeAll
79+
static void initQueue() throws IOException, InterruptedException {
80+
for (String queue : List.of(QUEUE_POLLABLE_WITH_EP, QUEUE_WITH_EP, QUEUE_CONVERT_FAIL)) {
81+
RABBITMQ.execInContainer("rabbitmqadmin", "declare", "queue", "name=" + queue);
82+
}
83+
}
84+
85+
@AfterAll
86+
static void deleteQueue() throws IOException, InterruptedException {
87+
for (String queue : List.of(QUEUE_POLLABLE_WITH_EP, QUEUE_WITH_EP, QUEUE_CONVERT_FAIL)) {
88+
RABBITMQ.execInContainer("rabbitmqadmin", "delete", "queue", "name=" + queue);
89+
}
90+
}
7091

7192
@Autowired
7293
private PublishSubscribeAmqpChannel channel;
@@ -92,12 +113,6 @@ public class ChannelTests {
92113
@Autowired
93114
private AmqpHeaderMapper mapperOut;
94115

95-
@AfterEach
96-
public void tearDown() {
97-
RabbitAvailableCondition.getBrokerRunning().deleteExchanges("si.fanout.foo", "si.fanout.channel", "si.fanout.pubSubWithEP");
98-
RabbitAvailableCondition.getBrokerRunning().removeTestQueues();
99-
}
100-
101116
@Test
102117
public void pubSubLostConnectionTest() throws Exception {
103118
final CyclicBarrier latch = new CyclicBarrier(2);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/OutboundGatewayIntegrationTests-context.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
4242

43-
<rabbit:connection-factory id="connectionFactory" host="localhost" />
43+
<rabbit:connection-factory id="connectionFactory"
44+
port="#{T(org.springframework.integration.amqp.support.RabbitTestContainer).amqpPort()}" />
4445

4546
</beans:beans>

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/OutboundGatewayIntegrationTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
import org.junit.jupiter.api.Test;
2020

21-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
2221
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.integration.amqp.support.RabbitTestContainer;
2323
import org.springframework.messaging.Message;
2424
import org.springframework.messaging.MessageChannel;
2525
import org.springframework.messaging.PollableChannel;
@@ -36,9 +36,8 @@
3636
* @since 2.1
3737
*/
3838
@SpringJUnitConfig
39-
@RabbitAvailable
4039
@DirtiesContext
41-
public class OutboundGatewayIntegrationTests {
40+
public class OutboundGatewayIntegrationTests implements RabbitTestContainer {
4241

4342
@Autowired
4443
private MessageChannel toRabbit;

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.integration.amqp.dsl;
1818

19+
import java.io.IOException;
1920
import java.util.Collections;
2021
import java.util.HashMap;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.atomic.AtomicReference;
2325

2426
import com.rabbitmq.stream.ConsumerBuilder;
2527
import com.rabbitmq.stream.Environment;
2628
import org.junit.jupiter.api.AfterAll;
29+
import org.junit.jupiter.api.BeforeAll;
2730
import org.junit.jupiter.api.Test;
2831

2932
import org.springframework.amqp.core.AmqpTemplate;
@@ -38,8 +41,6 @@
3841
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
3942
import org.springframework.amqp.rabbit.core.RabbitAdmin;
4043
import org.springframework.amqp.rabbit.core.RabbitTemplate;
41-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
42-
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
4344
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
4445
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
4546
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
@@ -59,6 +60,7 @@
5960
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
6061
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
6162
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
63+
import org.springframework.integration.amqp.support.RabbitTestContainer;
6264
import org.springframework.integration.annotation.Publisher;
6365
import org.springframework.integration.channel.QueueChannel;
6466
import org.springframework.integration.config.EnableIntegration;
@@ -91,11 +93,30 @@
9193
* @since 5.0
9294
*/
9395
@SpringJUnitConfig
94-
@RabbitAvailable(queues = {"amqpOutboundInput", "amqpReplyChannel", "asyncReplies",
95-
"defaultReplyTo", "si.dsl.test", "si.dsl.exception.test.dlq",
96-
"si.dsl.conv.exception.test.dlq", "testTemplateChannelTransacted", "publisherQueue"})
9796
@DirtiesContext
98-
public class AmqpTests {
97+
public class AmqpTests implements RabbitTestContainer {
98+
99+
static final String QUEUE_AMQP_OUTBOND_INPUT = "amqpOutboundInput";
100+
101+
static final String QUEUE_AMQP_REPLY_CHANNEL = "amqpReplyChannel";
102+
103+
static final String QUEUE_ASYNC_REPLIES = "asyncReplies";
104+
105+
static final String QUEUE_DEFAULT_REPLY_TO = "defaultReplyTo";
106+
107+
static final String QUEUE_DSL_TEST = "si.dsl.test";
108+
109+
static final String QUEUE_DSL_EXCEPTION_TEST_DLQ = "si.dsl.exception.test.dlq";
110+
111+
static final String QUEUE_DSL_CONV_EXCEPTION_TEST_DLQ = "si.dsl.conv.exception.test.dlq";
112+
113+
static final String QUEUE_TEMPLATE_CHANNEL_TRANSACTED = "testTemplateChannelTransacted";
114+
115+
static final String QUEUE_PUBLISHER = "publisherQueue";
116+
117+
static final List<String> QUEUE_NAMES = List.of(QUEUE_AMQP_OUTBOND_INPUT, QUEUE_AMQP_REPLY_CHANNEL,
118+
QUEUE_ASYNC_REPLIES, QUEUE_DEFAULT_REPLY_TO, QUEUE_DSL_TEST, QUEUE_DSL_EXCEPTION_TEST_DLQ,
119+
QUEUE_DSL_CONV_EXCEPTION_TEST_DLQ, QUEUE_TEMPLATE_CHANNEL_TRANSACTED, QUEUE_PUBLISHER);
99120

100121
@Autowired
101122
private ConnectionFactory rabbitConnectionFactory;
@@ -124,11 +145,22 @@ public class AmqpTests {
124145
@Autowired
125146
private Lifecycle asyncOutboundGateway;
126147

148+
@BeforeAll
149+
static void initQueue() throws IOException, InterruptedException {
150+
for (String queue : QUEUE_NAMES) {
151+
RABBITMQ.execInContainer("rabbitmqadmin", "declare", "queue", "name=" + queue);
152+
}
153+
}
154+
127155
@AfterAll
128-
public static void tearDown(ConfigurableApplicationContext context) {
156+
static void tearDown(ConfigurableApplicationContext context) throws IOException, InterruptedException {
129157
context.stop(); // prevent queues from being redeclared after deletion
130-
RabbitAvailableCondition.getBrokerRunning().removeTestQueues("si.dsl.exception.test",
131-
"si.dsl.conv.exception.test");
158+
for (String queue : QUEUE_NAMES) {
159+
RABBITMQ.execInContainer("rabbitmqadmin", "delete", "queue", "name=" + queue);
160+
}
161+
for (String additionalQueue : List.of("si.dsl.exception.test", "si.dsl.conv.exception.test")) {
162+
RABBITMQ.execInContainer("rabbitmqadmin", "delete", "queue", "name=" + additionalQueue);
163+
}
132164
}
133165

134166
@Test
@@ -308,7 +340,7 @@ public static class ContextConfiguration {
308340

309341
@Bean
310342
public ConnectionFactory rabbitConnectionFactory() {
311-
return new CachingConnectionFactory("localhost");
343+
return new CachingConnectionFactory(RabbitTestContainer.amqpPort());
312344
}
313345

314346
@Bean

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/ManualAckTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
2626
import org.springframework.amqp.rabbit.core.RabbitAdmin;
2727
import org.springframework.amqp.rabbit.core.RabbitTemplate;
28-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
2928
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
3029
import org.springframework.amqp.support.AmqpHeaders;
3130
import org.springframework.beans.factory.BeanFactory;
3231
import org.springframework.beans.factory.annotation.Autowired;
3332
import org.springframework.context.annotation.Bean;
3433
import org.springframework.context.annotation.ComponentScan;
3534
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.integration.amqp.support.RabbitTestContainer;
3636
import org.springframework.integration.annotation.MessageEndpoint;
3737
import org.springframework.integration.annotation.ServiceActivator;
3838
import org.springframework.integration.channel.QueueChannel;
@@ -53,10 +53,9 @@
5353
* @since 4.0
5454
*
5555
*/
56-
@RabbitAvailable
5756
@SpringJUnitConfig
5857
@DirtiesContext
59-
public class ManualAckTests {
58+
public class ManualAckTests implements RabbitTestContainer {
6059

6160
@Autowired
6261
private MessageChannel foo;
@@ -120,7 +119,7 @@ public QueueChannel bar() {
120119
@Bean
121120
public CachingConnectionFactory connectionFactory() {
122121
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
123-
connectionFactory.setHost("localhost");
122+
connectionFactory.setPort(RabbitTestContainer.amqpPort());
124123
return connectionFactory;
125124
}
126125

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,12 @@
111111
</bean>
112112

113113
<rabbit:connection-factory id="connectionFactory"
114-
host="localhost" publisher-returns="true" confirm-type="CORRELATED" />
114+
port="#{T(org.springframework.integration.amqp.support.RabbitTestContainer).amqpPort()}"
115+
publisher-returns="true" confirm-type="CORRELATED" />
115116

116117
<rabbit:connection-factory id="multiSendFactory"
117-
host="localhost" publisher-returns="true" confirm-type="SIMPLE" />
118+
port="#{T(org.springframework.integration.amqp.support.RabbitTestContainer).amqpPort()}"
119+
publisher-returns="true" confirm-type="SIMPLE" />
118120

119121
<rabbit:admin connection-factory="connectionFactory" />
120122

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@
3030
import org.springframework.amqp.rabbit.connection.CorrelationData;
3131
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
3232
import org.springframework.amqp.rabbit.core.RabbitTemplate;
33-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
3433
import org.springframework.amqp.support.AmqpHeaders;
3534
import org.springframework.amqp.utils.test.TestUtils;
3635
import org.springframework.beans.factory.BeanFactory;
3736
import org.springframework.beans.factory.annotation.Autowired;
3837
import org.springframework.beans.factory.annotation.Qualifier;
3938
import org.springframework.integration.amqp.support.NackedAmqpMessageException;
39+
import org.springframework.integration.amqp.support.RabbitTestContainer;
4040
import org.springframework.integration.amqp.support.ReturnedAmqpMessageException;
4141
import org.springframework.integration.channel.QueueChannel;
4242
import org.springframework.integration.mapping.support.JsonHeaders;
@@ -69,9 +69,8 @@
6969
*
7070
*/
7171
@SpringJUnitConfig
72-
@RabbitAvailable
7372
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
74-
public class AmqpOutboundEndpointTests {
73+
public class AmqpOutboundEndpointTests implements RabbitTestContainer {
7574

7675
@Autowired
7776
BeanFactory beanFactory;

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests2.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19+
import java.io.IOException;
1920
import java.util.Collections;
2021
import java.util.concurrent.TimeUnit;
2122

23+
import org.junit.jupiter.api.AfterAll;
24+
import org.junit.jupiter.api.BeforeAll;
2225
import org.junit.jupiter.api.Test;
2326

2427
import org.springframework.amqp.AmqpException;
@@ -30,13 +33,12 @@
3033
import org.springframework.amqp.rabbit.connection.CorrelationData;
3134
import org.springframework.amqp.rabbit.core.RabbitAdmin;
3235
import org.springframework.amqp.rabbit.core.RabbitTemplate;
33-
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
34-
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
3536
import org.springframework.amqp.support.AmqpHeaders;
3637
import org.springframework.beans.factory.annotation.Autowired;
3738
import org.springframework.context.annotation.Bean;
3839
import org.springframework.context.annotation.Configuration;
3940
import org.springframework.integration.amqp.dsl.Amqp;
41+
import org.springframework.integration.amqp.support.RabbitTestContainer;
4042
import org.springframework.integration.config.EnableIntegration;
4143
import org.springframework.integration.dsl.IntegrationFlow;
4244
import org.springframework.integration.support.MessageBuilder;
@@ -56,14 +58,25 @@
5658
*
5759
*/
5860
@SpringJUnitConfig
59-
@RabbitAvailable(queues = "testConfirmOk")
6061
@DirtiesContext
61-
public class AmqpOutboundEndpointTests2 {
62+
public class AmqpOutboundEndpointTests2 implements RabbitTestContainer {
63+
64+
static final String QUEUE_TEST_CONFIRM_OK = "testConfirmOk";
65+
66+
@BeforeAll
67+
static void initQueue() throws IOException, InterruptedException {
68+
RABBITMQ.execInContainer("rabbitmqadmin", "declare", "queue", "name=" + QUEUE_TEST_CONFIRM_OK);
69+
}
70+
71+
@AfterAll
72+
static void deleteQueue() throws IOException, InterruptedException {
73+
RABBITMQ.execInContainer("rabbitmqadmin", "delete", "queue", "name=" + QUEUE_TEST_CONFIRM_OK);
74+
}
6275

6376
@Test
6477
void testConfirmOk(@Autowired IntegrationFlow flow, @Autowired RabbitTemplate template) {
65-
flow.getInputChannel().send(new GenericMessage<>("test", Collections.singletonMap("rk", "testConfirmOk")));
66-
assertThat(template.receive("testConfirmOk")).isNotNull();
78+
flow.getInputChannel().send(new GenericMessage<>("test", Collections.singletonMap("rk", QUEUE_TEST_CONFIRM_OK)));
79+
assertThat(template.receive(QUEUE_TEST_CONFIRM_OK)).isNotNull();
6780
}
6881

6982
@Test
@@ -127,8 +140,7 @@ public IntegrationFlow flow2(RabbitTemplate template) {
127140

128141
@Bean
129142
public CachingConnectionFactory cf() {
130-
CachingConnectionFactory ccf = new CachingConnectionFactory(
131-
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
143+
CachingConnectionFactory ccf = new CachingConnectionFactory(RabbitTestContainer.amqpPort());
132144
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
133145
ccf.setPublisherReturns(true);
134146
return ccf;

0 commit comments

Comments
 (0)