Skip to content

Add SAC test against cluster #772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

class StreamProducerBuilder implements ProducerBuilder {

static final boolean DEFAULT_DYNAMIC_BATCH = true;
// Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
static final boolean DEFAULT_DYNAMIC_BATCH =
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));

private final StreamEnvironment environment;

Expand Down Expand Up @@ -201,7 +201,7 @@ public Producer build() {

if (this.routingConfiguration == null && this.superStream != null) {
throw new IllegalArgumentException(
"A routing configuration must specified when a super stream is set");
"A routing configuration must be specified when a super stream is set");
}

if (this.stream != null) {
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/com/rabbitmq/stream/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,30 @@ static List<ConnectionInfo> toConnectionInfoList(String json) {
return GSON.fromJson(json, new TypeToken<List<ConnectionInfo>>() {}.getType());
}

public static List<SubscriptionInfo> listGroupConsumers(String stream, String reference) {
ProcessState process =
rabbitmqStreams(
format(
"list_stream_group_consumers -q --stream %s --reference %s "
+ "--formatter table subscription_id,state",
stream, reference));

List<SubscriptionInfo> itemList = Collections.emptyList();
String content = process.output();
String[] lines = content.split(System.lineSeparator());
if (lines.length > 1) {
itemList = new ArrayList<>(lines.length - 1);
for (int i = 1; i < lines.length; i++) {
String line = lines[i];
String[] fields = line.split("\t");
String id = fields[0];
String state = fields[1].replace("\"", "");
itemList.add(new SubscriptionInfo(Integer.parseInt(id), state));
}
}
return itemList;
}

public static void restartStream(String stream) {
rabbitmqStreams(" restart_stream " + stream);
}
Expand Down Expand Up @@ -420,6 +444,30 @@ public String toString() {
}
}

public static final class SubscriptionInfo {

private final int id;
private final String state;

public SubscriptionInfo(int id, String state) {
this.id = id;
this.state = state;
}

public int id() {
return this.id;
}

public String state() {
return this.state;
}

@Override
public String toString() {
return "SubscriptionInfo{id='" + id + '\'' + ", state='" + state + '\'' + '}';
}
}

public static class ProcessState {

private final InputStreamPumpState inputState;
Expand Down
197 changes: 181 additions & 16 deletions src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@

import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS;
import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_1_2;
import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel;
import static com.rabbitmq.stream.impl.TestUtils.sync;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory;
import static com.rabbitmq.stream.impl.Tuples.pair;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.stream;

import ch.qos.logback.classic.Level;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.RateLimiter;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster;
import com.rabbitmq.stream.impl.TestUtils.Sync;
import com.rabbitmq.stream.impl.Tuples.Pair;
Expand All @@ -41,15 +45,20 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList());
syncs.forEach(s -> assertThat(s).completes());

nodes.forEach(
n -> {
LOGGER.info("Restarting node {}...", n);
Cli.restartNode(n);
LOGGER.info("Restarted node {}.", n);
});
LOGGER.info("Rebalancing...");
Cli.rebalance();
LOGGER.info("Rebalancing over.");
restartCluster();

Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis());

Expand Down Expand Up @@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@BrokerVersionAtLeast(RABBITMQ_4_1_2)
void sacWithClusterRestart(boolean superStream) throws Exception {
environment =
environmentBuilder
.uris(URIS)
.netty()
.bootstrapCustomizer(
b -> {
b.option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) BACK_OFF_DELAY_POLICY.delay(0).toMillis());
})
.environmentBuilder()
.maxConsumersByConnection(1)
.build();

int consumerCount = 3;
AtomicLong lastOffset = new AtomicLong(0);
String app = "app-name";
String s = TestUtils.streamName(testInfo);
ProducerState pState = null;
List<ConsumerState> consumers = Collections.emptyList();
try {
StreamCreator sCreator = environment.streamCreator().stream(s);
if (superStream) {
sCreator = sCreator.superStream().partitions(1).creator();
}
sCreator.create();

pState = new ProducerState(s, true, superStream, environment);
pState.start();

Map<Integer, Boolean> consumerStatus = new ConcurrentHashMap<>();
consumers =
IntStream.range(0, consumerCount)
.mapToObj(
i ->
new ConsumerState(
s,
environment,
b -> {
b.singleActiveConsumer()
.name(app)
.noTrackingStrategy()
.consumerUpdateListener(
ctx -> {
consumerStatus.put(i, ctx.isActive());
return OffsetSpecification.offset(lastOffset.get());
});
if (superStream) {
b.superStream(s);
} else {
b.stream(s);
}
},
(ctx, m) -> lastOffset.set(ctx.offset())))
.collect(toList());

Sync sync = pState.waitForNewMessages(100);
assertThat(sync).completes();
sync = consumers.get(0).waitForNewMessages(100);
assertThat(sync).completes();

String streamArg = superStream ? s + "-0" : s;

Callable<Void> checkConsumers =
() -> {
waitAtMost(
() -> {
List<Cli.SubscriptionInfo> subscriptions = Cli.listGroupConsumers(streamArg, app);
LOGGER.info("Group consumers: {}", subscriptions);
return subscriptions.size() == consumerCount
&& subscriptions.stream()
.filter(sub -> sub.state().startsWith("active"))
.count()
== 1
&& subscriptions.stream()
.filter(sub -> sub.state().startsWith("waiting"))
.count()
== 2;
},
() ->
"Group consumers not in expected state: "
+ Cli.listGroupConsumers(streamArg, app));
return null;
};

checkConsumers.call();

restartCluster();

Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis());

sync = pState.waitForNewMessages(100);
assertThat(sync).completes(ASSERTION_TIMEOUT);
int activeIndex =
consumerStatus.entrySet().stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.findFirst()
.orElseThrow(() -> new IllegalStateException("No active consumer found"));

sync = consumers.get(activeIndex).waitForNewMessages(100);
assertThat(sync).completes(ASSERTION_TIMEOUT);

checkConsumers.call();

} finally {
if (pState != null) {
pState.close();
}
consumers.forEach(ConsumerState::close);
if (superStream) {
environment.deleteSuperStream(s);
} else {
environment.deleteStream(s);
}
}
}

private static class ProducerState implements AutoCloseable {

private static final AtomicLong MSG_ID_SEQ = new AtomicLong(0);

private static final byte[] BODY = "hello".getBytes(StandardCharsets.UTF_8);

private final String stream;
Expand All @@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable {
final AtomicReference<Instant> lastExceptionInstant = new AtomicReference<>();

private ProducerState(String stream, boolean dynamicBatch, Environment environment) {
this(stream, dynamicBatch, false, environment);
}

private ProducerState(
String stream, boolean dynamicBatch, boolean superStream, Environment environment) {
this.stream = stream;
this.producer =
environment.producerBuilder().stream(stream).dynamicBatch(dynamicBatch).build();
ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch);
if (superStream) {
builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString());
} else {
builder.stream(stream);
}
this.producer = builder.build();
}

void start() {
Expand All @@ -327,7 +462,14 @@ void start() {
try {
this.limiter.acquire(1);
this.producer.send(
producer.messageBuilder().addData(BODY).build(), confirmationHandler);
producer
.messageBuilder()
.properties()
.messageId(MSG_ID_SEQ.getAndIncrement())
.messageBuilder()
.addData(BODY)
.build(),
confirmationHandler);
} catch (Throwable e) {
this.lastException.set(e);
this.lastExceptionInstant.set(Instant.now());
Expand Down Expand Up @@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable {
final AtomicReference<Runnable> postHandle = new AtomicReference<>(() -> {});

private ConsumerState(String stream, Environment environment) {
this(stream, environment, b -> b.stream(stream), (ctx, m) -> {});
}

private ConsumerState(
String stream,
Environment environment,
java.util.function.Consumer<ConsumerBuilder> customizer,
MessageHandler delegateHandler) {
this.stream = stream;
this.consumer =
environment.consumerBuilder().stream(stream)
ConsumerBuilder builder =
environment
.consumerBuilder()
.offset(OffsetSpecification.first())
.messageHandler(
(ctx, m) -> {
delegateHandler.handle(ctx, m);
receivedCount.incrementAndGet();
postHandle.get().run();
})
.build();
});
customizer.accept(builder);
this.consumer = builder.build();
}

Sync waitForNewMessages(int messageCount) {
Expand All @@ -414,4 +567,16 @@ public void close() {
this.consumer.close();
}
}

private static void restartCluster() {
nodes.forEach(
n -> {
LOGGER.info("Restarting node {}...", n);
Cli.restartNode(n);
LOGGER.info("Restarted node {}.", n);
});
LOGGER.info("Rebalancing...");
Cli.rebalance();
LOGGER.info("Rebalancing over.");
}
}
3 changes: 2 additions & 1 deletion src/test/java/com/rabbitmq/stream/impl/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,8 @@ public enum BrokerVersion {
RABBITMQ_3_11_11("3.11.11"),
RABBITMQ_3_11_14("3.11.14"),
RABBITMQ_3_13_0("3.13.0"),
RABBITMQ_4_0_0("4.0.0");
RABBITMQ_4_0_0("4.0.0"),
RABBITMQ_4_1_2("4.1.2");

final String value;

Expand Down
Loading