1616
1717import static com .rabbitmq .stream .impl .Assertions .assertThat ;
1818import static com .rabbitmq .stream .impl .LoadBalancerClusterTest .LOAD_BALANCER_ADDRESS ;
19+ import static com .rabbitmq .stream .impl .TestUtils .BrokerVersion .RABBITMQ_4_1_2 ;
1920import static com .rabbitmq .stream .impl .TestUtils .newLoggerLevel ;
2021import static com .rabbitmq .stream .impl .TestUtils .sync ;
22+ import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
2123import static com .rabbitmq .stream .impl .ThreadUtils .threadFactory ;
2224import static com .rabbitmq .stream .impl .Tuples .pair ;
2325import static java .util .stream .Collectors .toList ;
2426import static java .util .stream .IntStream .range ;
2527import static org .assertj .core .api .Assertions .assertThat ;
28+ import static org .assertj .core .api .InstanceOfAssertFactories .stream ;
2629
2730import ch .qos .logback .classic .Level ;
2831import com .google .common .collect .Streams ;
2932import com .google .common .util .concurrent .RateLimiter ;
3033import com .rabbitmq .stream .*;
34+ import com .rabbitmq .stream .impl .TestUtils .BrokerVersionAtLeast ;
3135import com .rabbitmq .stream .impl .TestUtils .DisabledIfNotCluster ;
3236import com .rabbitmq .stream .impl .TestUtils .Sync ;
3337import com .rabbitmq .stream .impl .Tuples .Pair ;
4145import java .util .LinkedHashMap ;
4246import java .util .List ;
4347import java .util .Map ;
48+ import java .util .concurrent .Callable ;
49+ import java .util .concurrent .ConcurrentHashMap ;
4450import java .util .concurrent .Executors ;
4551import java .util .concurrent .ScheduledExecutorService ;
4652import java .util .concurrent .ThreadFactory ;
4753import java .util .concurrent .atomic .AtomicBoolean ;
4854import java .util .concurrent .atomic .AtomicInteger ;
55+ import java .util .concurrent .atomic .AtomicLong ;
4956import java .util .concurrent .atomic .AtomicReference ;
57+ import java .util .stream .IntStream ;
5058import org .junit .jupiter .api .*;
5159import org .junit .jupiter .params .ParameterizedTest ;
5260import org .junit .jupiter .params .provider .CsvSource ;
61+ import org .junit .jupiter .params .provider .ValueSource ;
5362import org .slf4j .Logger ;
5463import org .slf4j .LoggerFactory ;
5564
@@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
201210 syncs = consumers .stream ().map (c -> c .waitForNewMessages (100 )).collect (toList ());
202211 syncs .forEach (s -> assertThat (s ).completes ());
203212
204- nodes .forEach (
205- n -> {
206- LOGGER .info ("Restarting node {}..." , n );
207- Cli .restartNode (n );
208- LOGGER .info ("Restarted node {}." , n );
209- });
210- LOGGER .info ("Rebalancing..." );
211- Cli .rebalance ();
212- LOGGER .info ("Rebalancing over." );
213+ restartCluster ();
213214
214215 Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
215216
@@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
291292 }
292293 }
293294
295+ @ ParameterizedTest
296+ @ ValueSource (booleans = {true , false })
297+ @ BrokerVersionAtLeast (RABBITMQ_4_1_2 )
298+ void sacWithClusterRestart (boolean superStream ) throws Exception {
299+ environment =
300+ environmentBuilder
301+ .uris (URIS )
302+ .netty ()
303+ .bootstrapCustomizer (
304+ b -> {
305+ b .option (
306+ ChannelOption .CONNECT_TIMEOUT_MILLIS ,
307+ (int ) BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
308+ })
309+ .environmentBuilder ()
310+ .maxConsumersByConnection (1 )
311+ .build ();
312+
313+ int consumerCount = 3 ;
314+ AtomicLong lastOffset = new AtomicLong (0 );
315+ String app = "app-name" ;
316+ String s = TestUtils .streamName (testInfo );
317+ ProducerState pState = null ;
318+ List <ConsumerState > consumers = Collections .emptyList ();
319+ try {
320+ StreamCreator sCreator = environment .streamCreator ().stream (s );
321+ if (superStream ) {
322+ sCreator = sCreator .superStream ().partitions (1 ).creator ();
323+ }
324+ sCreator .create ();
325+
326+ pState = new ProducerState (s , true , superStream , environment );
327+ pState .start ();
328+
329+ Map <Integer , Boolean > consumerStatus = new ConcurrentHashMap <>();
330+ consumers =
331+ IntStream .range (0 , consumerCount )
332+ .mapToObj (
333+ i ->
334+ new ConsumerState (
335+ s ,
336+ environment ,
337+ b -> {
338+ b .singleActiveConsumer ()
339+ .name (app )
340+ .noTrackingStrategy ()
341+ .consumerUpdateListener (
342+ ctx -> {
343+ consumerStatus .put (i , ctx .isActive ());
344+ return OffsetSpecification .offset (lastOffset .get ());
345+ });
346+ if (superStream ) {
347+ b .superStream (s );
348+ } else {
349+ b .stream (s );
350+ }
351+ },
352+ (ctx , m ) -> lastOffset .set (ctx .offset ())))
353+ .collect (toList ());
354+
355+ Sync sync = pState .waitForNewMessages (100 );
356+ assertThat (sync ).completes ();
357+ sync = consumers .get (0 ).waitForNewMessages (100 );
358+ assertThat (sync ).completes ();
359+
360+ String streamArg = superStream ? s + "-0" : s ;
361+
362+ Callable <Void > checkConsumers =
363+ () -> {
364+ waitAtMost (
365+ () -> {
366+ List <Cli .SubscriptionInfo > subscriptions = Cli .listGroupConsumers (streamArg , app );
367+ LOGGER .info ("Group consumers: {}" , subscriptions );
368+ return subscriptions .size () == consumerCount
369+ && subscriptions .stream ()
370+ .filter (sub -> sub .state ().startsWith ("active" ))
371+ .count ()
372+ == 1
373+ && subscriptions .stream ()
374+ .filter (sub -> sub .state ().startsWith ("waiting" ))
375+ .count ()
376+ == 2 ;
377+ },
378+ () ->
379+ "Group consumers not in expected state: "
380+ + Cli .listGroupConsumers (streamArg , app ));
381+ return null ;
382+ };
383+
384+ checkConsumers .call ();
385+
386+ restartCluster ();
387+
388+ Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).toMillis ());
389+
390+ sync = pState .waitForNewMessages (100 );
391+ assertThat (sync ).completes (ASSERTION_TIMEOUT );
392+ int activeIndex =
393+ consumerStatus .entrySet ().stream ()
394+ .filter (Map .Entry ::getValue )
395+ .map (Map .Entry ::getKey )
396+ .findFirst ()
397+ .orElseThrow (() -> new IllegalStateException ("No active consumer found" ));
398+
399+ sync = consumers .get (activeIndex ).waitForNewMessages (100 );
400+ assertThat (sync ).completes (ASSERTION_TIMEOUT );
401+
402+ checkConsumers .call ();
403+
404+ } finally {
405+ if (pState != null ) {
406+ pState .close ();
407+ }
408+ consumers .forEach (ConsumerState ::close );
409+ if (superStream ) {
410+ environment .deleteSuperStream (s );
411+ } else {
412+ environment .deleteStream (s );
413+ }
414+ }
415+ }
416+
294417 private static class ProducerState implements AutoCloseable {
295418
419+ private static final AtomicLong MSG_ID_SEQ = new AtomicLong (0 );
420+
296421 private static final byte [] BODY = "hello" .getBytes (StandardCharsets .UTF_8 );
297422
298423 private final String stream ;
@@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable {
306431 final AtomicReference <Instant > lastExceptionInstant = new AtomicReference <>();
307432
308433 private ProducerState (String stream , boolean dynamicBatch , Environment environment ) {
434+ this (stream , dynamicBatch , false , environment );
435+ }
436+
437+ private ProducerState (
438+ String stream , boolean dynamicBatch , boolean superStream , Environment environment ) {
309439 this .stream = stream ;
310- this .producer =
311- environment .producerBuilder ().stream (stream ).dynamicBatch (dynamicBatch ).build ();
440+ ProducerBuilder builder = environment .producerBuilder ().dynamicBatch (dynamicBatch );
441+ if (superStream ) {
442+ builder .superStream (stream ).routing (m -> m .getProperties ().getMessageIdAsString ());
443+ } else {
444+ builder .stream (stream );
445+ }
446+ this .producer = builder .build ();
312447 }
313448
314449 void start () {
@@ -327,7 +462,14 @@ void start() {
327462 try {
328463 this .limiter .acquire (1 );
329464 this .producer .send (
330- producer .messageBuilder ().addData (BODY ).build (), confirmationHandler );
465+ producer
466+ .messageBuilder ()
467+ .properties ()
468+ .messageId (MSG_ID_SEQ .getAndIncrement ())
469+ .messageBuilder ()
470+ .addData (BODY )
471+ .build (),
472+ confirmationHandler );
331473 } catch (Throwable e ) {
332474 this .lastException .set (e );
333475 this .lastExceptionInstant .set (Instant .now ());
@@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable {
380522 final AtomicReference <Runnable > postHandle = new AtomicReference <>(() -> {});
381523
382524 private ConsumerState (String stream , Environment environment ) {
525+ this (stream , environment , b -> b .stream (stream ), (ctx , m ) -> {});
526+ }
527+
528+ private ConsumerState (
529+ String stream ,
530+ Environment environment ,
531+ java .util .function .Consumer <ConsumerBuilder > customizer ,
532+ MessageHandler delegateHandler ) {
383533 this .stream = stream ;
384- this .consumer =
385- environment .consumerBuilder ().stream (stream )
534+ ConsumerBuilder builder =
535+ environment
536+ .consumerBuilder ()
386537 .offset (OffsetSpecification .first ())
387538 .messageHandler (
388539 (ctx , m ) -> {
540+ delegateHandler .handle (ctx , m );
389541 receivedCount .incrementAndGet ();
390542 postHandle .get ().run ();
391- })
392- .build ();
543+ });
544+ customizer .accept (builder );
545+ this .consumer = builder .build ();
393546 }
394547
395548 Sync waitForNewMessages (int messageCount ) {
@@ -414,4 +567,16 @@ public void close() {
414567 this .consumer .close ();
415568 }
416569 }
570+
571+ private static void restartCluster () {
572+ nodes .forEach (
573+ n -> {
574+ LOGGER .info ("Restarting node {}..." , n );
575+ Cli .restartNode (n );
576+ LOGGER .info ("Restarted node {}." , n );
577+ });
578+ LOGGER .info ("Rebalancing..." );
579+ Cli .rebalance ();
580+ LOGGER .info ("Rebalancing over." );
581+ }
417582}
0 commit comments