Skip to content

Commit d5d6b5e

Browse files
committed
Fix test
1 parent de24bc0 commit d5d6b5e

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

src/test/java/com/rabbitmq/stream/Cli.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,7 @@ public static void setEnv(String parameter, String value) {
260260
}
261261

262262
public static String rabbitmqctlCommand() {
263-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
264-
if (rabbitmqCtl == null) {
265-
rabbitmqCtl = DOCKER_PREFIX + "rabbitmq";
266-
}
263+
String rabbitmqCtl = rabbitmqctlBin();
267264
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
268265
String containerId = rabbitmqCtl.split(":")[1];
269266
return "docker exec " + containerId + " rabbitmqctl";
@@ -272,6 +269,14 @@ public static String rabbitmqctlCommand() {
272269
}
273270
}
274271

272+
private static String rabbitmqctlBin() {
273+
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
274+
if (rabbitmqCtl == null) {
275+
rabbitmqCtl = DOCKER_PREFIX + "rabbitmq";
276+
}
277+
return rabbitmqCtl;
278+
}
279+
275280
private static String rabbitmqStreamsCommand() {
276281
String rabbitmqctl = rabbitmqctlCommand();
277282
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
@@ -330,11 +335,7 @@ private static void clearResourceAlarm(String source) {
330335
}
331336

332337
public static boolean isOnDocker() {
333-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
334-
if (rabbitmqCtl == null) {
335-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
336-
}
337-
return rabbitmqCtl.startsWith(DOCKER_PREFIX);
338+
return rabbitmqctlBin().startsWith(DOCKER_PREFIX);
338339
}
339340

340341
public static List<String> nodes() {

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,14 @@
5353
import org.junit.jupiter.api.extension.ExtendWith;
5454
import org.junit.jupiter.params.ParameterizedTest;
5555
import org.junit.jupiter.params.provider.MethodSource;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
5658

5759
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
5860
public class StreamConsumerTest {
5961

62+
private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerTest.class);
63+
6064
static final Duration RECOVERY_DELAY = Duration.ofSeconds(2);
6165
static final Duration TOPOLOGY_DELAY = Duration.ofSeconds(2);
6266
static volatile Duration recoveryInitialDelay;
@@ -1105,7 +1109,15 @@ private DispatchingMessageHandler(MessageHandler delegate, ThreadFactory tf) {
11051109

11061110
@Override
11071111
public void handle(Context context, Message message) {
1108-
this.queue.add(new ContextMessageWrapper(context, message));
1112+
try {
1113+
boolean inserted =
1114+
this.queue.offer(new ContextMessageWrapper(context, message), 10, TimeUnit.SECONDS);
1115+
if (!inserted) {
1116+
LOGGER.warn("Failed to insert message into queue");
1117+
}
1118+
} catch (InterruptedException e) {
1119+
Thread.currentThread().interrupt();
1120+
}
11091121
}
11101122

11111123
@Override

0 commit comments

Comments
 (0)