18
18
import static com .rabbitmq .stream .impl .LoadBalancerClusterTest .LOAD_BALANCER_ADDRESS ;
19
19
import static com .rabbitmq .stream .impl .TestUtils .newLoggerLevel ;
20
20
import static com .rabbitmq .stream .impl .TestUtils .sync ;
21
+ import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
21
22
import static com .rabbitmq .stream .impl .ThreadUtils .threadFactory ;
22
23
import static com .rabbitmq .stream .impl .Tuples .pair ;
23
24
import static java .util .stream .Collectors .toList ;
24
25
import static java .util .stream .IntStream .range ;
25
26
import static org .assertj .core .api .Assertions .assertThat ;
27
+ import static org .assertj .core .api .InstanceOfAssertFactories .stream ;
26
28
27
29
import ch .qos .logback .classic .Level ;
28
30
import com .google .common .collect .Streams ;
41
43
import java .util .LinkedHashMap ;
42
44
import java .util .List ;
43
45
import java .util .Map ;
46
+ import java .util .concurrent .Callable ;
44
47
import java .util .concurrent .ConcurrentHashMap ;
45
48
import java .util .concurrent .Executors ;
46
49
import java .util .concurrent .ScheduledExecutorService ;
@@ -289,7 +292,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
289
292
290
293
@ ParameterizedTest
291
294
@ ValueSource (booleans = {true , false })
292
- void sacWithClusterRestart (boolean superStream ) throws InterruptedException {
295
+ void sacWithClusterRestart (boolean superStream ) throws Exception {
293
296
environment =
294
297
environmentBuilder
295
298
.uris (URIS )
@@ -351,13 +354,31 @@ void sacWithClusterRestart(boolean superStream) throws InterruptedException {
351
354
sync = consumers .get (0 ).waitForNewMessages (100 );
352
355
assertThat (sync ).completes ();
353
356
354
- List <Cli .SubscriptionInfo > subscriptions =
355
- Cli .listGroupConsumers (superStream ? s + "-0" : s , app );
356
- assertThat (subscriptions ).hasSize (consumerCount );
357
- assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("active" )).count ())
358
- .isEqualTo (1 );
359
- assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("waiting" )).count ())
360
- .isEqualTo (2 );
357
+ String streamArg = superStream ? s + "-0" : s ;
358
+
359
+ Callable <Void > checkConsumers =
360
+ () -> {
361
+ waitAtMost (
362
+ () -> {
363
+ List <Cli .SubscriptionInfo > subscriptions = Cli .listGroupConsumers (streamArg , app );
364
+ LOGGER .info ("Group consumers: {}" , subscriptions );
365
+ return subscriptions .size () == consumerCount
366
+ && subscriptions .stream ()
367
+ .filter (sub -> sub .state ().startsWith ("active" ))
368
+ .count ()
369
+ == 1
370
+ && subscriptions .stream ()
371
+ .filter (sub -> sub .state ().startsWith ("waiting" ))
372
+ .count ()
373
+ == 2 ;
374
+ },
375
+ () ->
376
+ "Group consumers not in expected state: "
377
+ + Cli .listGroupConsumers (streamArg , app ));
378
+ return null ;
379
+ };
380
+
381
+ checkConsumers .call ();
361
382
362
383
restartCluster ();
363
384
@@ -375,12 +396,7 @@ void sacWithClusterRestart(boolean superStream) throws InterruptedException {
375
396
sync = consumers .get (activeIndex ).waitForNewMessages (100 );
376
397
assertThat (sync ).completes (ASSERTION_TIMEOUT );
377
398
378
- subscriptions = Cli .listGroupConsumers (superStream ? s + "-0" : s , app );
379
- assertThat (subscriptions ).hasSize (consumerCount );
380
- assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("active" )).count ())
381
- .isEqualTo (1 );
382
- assertThat (subscriptions .stream ().filter (sub -> sub .state ().startsWith ("waiting" )).count ())
383
- .isEqualTo (2 );
399
+ checkConsumers .call ();
384
400
385
401
} finally {
386
402
if (pState != null ) {
0 commit comments