diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 43a57bbc5c..b44290868c 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -28,8 +28,8 @@ class StreamProducerBuilder implements ProducerBuilder { - static final boolean DEFAULT_DYNAMIC_BATCH = true; -// Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); + static final boolean DEFAULT_DYNAMIC_BATCH = + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); private final StreamEnvironment environment; @@ -201,7 +201,7 @@ public Producer build() { if (this.routingConfiguration == null && this.superStream != null) { throw new IllegalArgumentException( - "A routing configuration must specified when a super stream is set"); + "A routing configuration must be specified when a super stream is set"); } if (this.stream != null) { diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index 7097725af4..5c583a461b 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -178,6 +178,30 @@ static List toConnectionInfoList(String json) { return GSON.fromJson(json, new TypeToken>() {}.getType()); } + public static List listGroupConsumers(String stream, String reference) { + ProcessState process = + rabbitmqStreams( + format( + "list_stream_group_consumers -q --stream %s --reference %s " + + "--formatter table subscription_id,state", + stream, reference)); + + List itemList = Collections.emptyList(); + String content = process.output(); + String[] lines = content.split(System.lineSeparator()); + if (lines.length > 1) { + itemList = new ArrayList<>(lines.length - 1); + for (int i = 1; i < lines.length; i++) { + String line = lines[i]; + String[] fields = line.split("\t"); + String id = fields[0]; + String state = fields[1].replace("\"", ""); + itemList.add(new SubscriptionInfo(Integer.parseInt(id), state)); + } + } + return itemList; + } + public static void restartStream(String stream) { rabbitmqStreams(" restart_stream " + stream); } @@ -420,6 +444,30 @@ public String toString() { } } + public static final class SubscriptionInfo { + + private final int id; + private final String state; + + public SubscriptionInfo(int id, String state) { + this.id = id; + this.state = state; + } + + public int id() { + return this.id; + } + + public String state() { + return this.state; + } + + @Override + public String toString() { + return "SubscriptionInfo{id='" + id + '\'' + ", state='" + state + '\'' + '}'; + } + } + public static class ProcessState { private final InputStreamPumpState inputState; diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 24f50ee0cd..e234d723c4 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -16,18 +16,22 @@ import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS; +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_1_2; import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel; import static com.rabbitmq.stream.impl.TestUtils.sync; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory; import static com.rabbitmq.stream.impl.Tuples.pair; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.stream; import ch.qos.logback.classic.Level; import com.google.common.collect.Streams; import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; import com.rabbitmq.stream.impl.TestUtils.Sync; import com.rabbitmq.stream.impl.Tuples.Pair; @@ -41,15 +45,20 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList()); syncs.forEach(s -> assertThat(s).completes()); - nodes.forEach( - n -> { - LOGGER.info("Restarting node {}...", n); - Cli.restartNode(n); - LOGGER.info("Restarted node {}.", n); - }); - LOGGER.info("Rebalancing..."); - Cli.rebalance(); - LOGGER.info("Rebalancing over."); + restartCluster(); Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); @@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @BrokerVersionAtLeast(RABBITMQ_4_1_2) + void sacWithClusterRestart(boolean superStream) throws Exception { + environment = + environmentBuilder + .uris(URIS) + .netty() + .bootstrapCustomizer( + b -> { + b.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, + (int) BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + }) + .environmentBuilder() + .maxConsumersByConnection(1) + .build(); + + int consumerCount = 3; + AtomicLong lastOffset = new AtomicLong(0); + String app = "app-name"; + String s = TestUtils.streamName(testInfo); + ProducerState pState = null; + List consumers = Collections.emptyList(); + try { + StreamCreator sCreator = environment.streamCreator().stream(s); + if (superStream) { + sCreator = sCreator.superStream().partitions(1).creator(); + } + sCreator.create(); + + pState = new ProducerState(s, true, superStream, environment); + pState.start(); + + Map consumerStatus = new ConcurrentHashMap<>(); + consumers = + IntStream.range(0, consumerCount) + .mapToObj( + i -> + new ConsumerState( + s, + environment, + b -> { + b.singleActiveConsumer() + .name(app) + .noTrackingStrategy() + .consumerUpdateListener( + ctx -> { + consumerStatus.put(i, ctx.isActive()); + return OffsetSpecification.offset(lastOffset.get()); + }); + if (superStream) { + b.superStream(s); + } else { + b.stream(s); + } + }, + (ctx, m) -> lastOffset.set(ctx.offset()))) + .collect(toList()); + + Sync sync = pState.waitForNewMessages(100); + assertThat(sync).completes(); + sync = consumers.get(0).waitForNewMessages(100); + assertThat(sync).completes(); + + String streamArg = superStream ? s + "-0" : s; + + Callable checkConsumers = + () -> { + waitAtMost( + () -> { + List subscriptions = Cli.listGroupConsumers(streamArg, app); + LOGGER.info("Group consumers: {}", subscriptions); + return subscriptions.size() == consumerCount + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("active")) + .count() + == 1 + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("waiting")) + .count() + == 2; + }, + () -> + "Group consumers not in expected state: " + + Cli.listGroupConsumers(streamArg, app)); + return null; + }; + + checkConsumers.call(); + + restartCluster(); + + Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + + sync = pState.waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + int activeIndex = + consumerStatus.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No active consumer found")); + + sync = consumers.get(activeIndex).waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + + checkConsumers.call(); + + } finally { + if (pState != null) { + pState.close(); + } + consumers.forEach(ConsumerState::close); + if (superStream) { + environment.deleteSuperStream(s); + } else { + environment.deleteStream(s); + } + } + } + private static class ProducerState implements AutoCloseable { + private static final AtomicLong MSG_ID_SEQ = new AtomicLong(0); + private static final byte[] BODY = "hello".getBytes(StandardCharsets.UTF_8); private final String stream; @@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable { final AtomicReference lastExceptionInstant = new AtomicReference<>(); private ProducerState(String stream, boolean dynamicBatch, Environment environment) { + this(stream, dynamicBatch, false, environment); + } + + private ProducerState( + String stream, boolean dynamicBatch, boolean superStream, Environment environment) { this.stream = stream; - this.producer = - environment.producerBuilder().stream(stream).dynamicBatch(dynamicBatch).build(); + ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch); + if (superStream) { + builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString()); + } else { + builder.stream(stream); + } + this.producer = builder.build(); } void start() { @@ -327,7 +462,14 @@ void start() { try { this.limiter.acquire(1); this.producer.send( - producer.messageBuilder().addData(BODY).build(), confirmationHandler); + producer + .messageBuilder() + .properties() + .messageId(MSG_ID_SEQ.getAndIncrement()) + .messageBuilder() + .addData(BODY) + .build(), + confirmationHandler); } catch (Throwable e) { this.lastException.set(e); this.lastExceptionInstant.set(Instant.now()); @@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable { final AtomicReference postHandle = new AtomicReference<>(() -> {}); private ConsumerState(String stream, Environment environment) { + this(stream, environment, b -> b.stream(stream), (ctx, m) -> {}); + } + + private ConsumerState( + String stream, + Environment environment, + java.util.function.Consumer customizer, + MessageHandler delegateHandler) { this.stream = stream; - this.consumer = - environment.consumerBuilder().stream(stream) + ConsumerBuilder builder = + environment + .consumerBuilder() .offset(OffsetSpecification.first()) .messageHandler( (ctx, m) -> { + delegateHandler.handle(ctx, m); receivedCount.incrementAndGet(); postHandle.get().run(); - }) - .build(); + }); + customizer.accept(builder); + this.consumer = builder.build(); } Sync waitForNewMessages(int messageCount) { @@ -414,4 +567,16 @@ public void close() { this.consumer.close(); } } + + private static void restartCluster() { + nodes.forEach( + n -> { + LOGGER.info("Restarting node {}...", n); + Cli.restartNode(n); + LOGGER.info("Restarted node {}.", n); + }); + LOGGER.info("Rebalancing..."); + Cli.rebalance(); + LOGGER.info("Rebalancing over."); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 60aa2cc2a5..52214838b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1064,7 +1064,8 @@ public enum BrokerVersion { RABBITMQ_3_11_11("3.11.11"), RABBITMQ_3_11_14("3.11.14"), RABBITMQ_3_13_0("3.13.0"), - RABBITMQ_4_0_0("4.0.0"); + RABBITMQ_4_0_0("4.0.0"), + RABBITMQ_4_1_2("4.1.2"); final String value;