16
16
17
17
package com .rabbitmq .client .test .functional ;
18
18
19
+ import static com .rabbitmq .client .test .TestUtils .waitAtMost ;
19
20
import static org .junit .jupiter .api .Assertions .assertNotNull ;
20
21
import static org .junit .jupiter .api .Assertions .assertNull ;
21
22
import static org .junit .jupiter .api .Assertions .fail ;
22
23
23
24
import java .io .IOException ;
25
+ import java .util .ArrayList ;
26
+ import java .util .List ;
24
27
import java .util .concurrent .TimeoutException ;
25
28
26
29
import com .rabbitmq .client .AMQP ;
30
+ import com .rabbitmq .client .Channel ;
27
31
import com .rabbitmq .client .GetResponse ;
28
32
import com .rabbitmq .client .QueueingConsumer ;
29
33
@@ -63,40 +67,47 @@ protected void deleteExchangeAndQueue(Binding binding) throws IOException {
63
67
}
64
68
65
69
protected void doAutoDelete (boolean durable , int queues ) throws IOException , TimeoutException {
66
- String [] queueNames = null ;
70
+ List < String > queueNames = new ArrayList <>() ;
67
71
Binding binding = Binding .randomBinding ();
68
72
channel .exchangeDeclare (binding .x , "direct" , durable , true , null );
69
73
channel .queueDeclare (binding .q , durable , false , true , null );
70
74
channel .queueBind (binding .q , binding .x , binding .k );
71
75
if (queues > 1 ) {
72
76
int j = queues - 1 ;
73
- queueNames = new String [j ];
74
77
for (int i = 0 ; i < j ; i ++) {
75
- queueNames [ i ] = randomString ();
76
- channel .queueDeclare (queueNames [ i ] , durable , false , false , null );
77
- channel .queueBind (queueNames [ i ] , binding .x , binding .k );
78
- channel .basicConsume (queueNames [ i ] , true , new QueueingConsumer (channel ));
78
+ queueNames . add ( randomString () );
79
+ channel .queueDeclare (queueNames . get ( i ) , durable , false , false , null );
80
+ channel .queueBind (queueNames . get ( i ) , binding .x , binding .k );
81
+ channel .basicConsume (queueNames . get ( i ) , true , new QueueingConsumer (channel ));
79
82
}
80
83
}
81
84
subscribeSendUnsubscribe (binding );
82
85
if (durable ) {
83
86
restart ();
84
87
}
85
- if (queues > 1 && queueNames != null ) {
86
- for (String s : queueNames ) {
87
- channel .basicConsume (s , true , new QueueingConsumer (channel ));
88
- Binding tmp = new Binding (s , binding .x , binding .k );
88
+ if (queues > 1 ) {
89
+ for (String q : queueNames ) {
90
+ channel .basicConsume (q , true , new QueueingConsumer (channel ));
91
+ Binding tmp = new Binding (q , binding .x , binding .k );
89
92
sendUnroutable (tmp );
90
93
}
91
94
}
95
+ waitAtMost (() -> {
96
+ Channel ch = connection .createChannel ();
97
+ try {
98
+ ch .queueDeclarePassive (binding .q );
99
+ } catch (IOException e ) {
100
+ return true ;
101
+ }
102
+ return false ;
103
+ });
92
104
channel .queueDeclare (binding .q , durable , true , true , null );
93
105
// if (queues == 1): Because the exchange does not exist, this
94
106
// bind should fail
95
107
try {
96
108
channel .queueBind (binding .q , binding .x , binding .k );
97
109
sendRoutable (binding );
98
- }
99
- catch (IOException e ) {
110
+ } catch (IOException e ) {
100
111
checkShutdownSignal (AMQP .NOT_FOUND , e );
101
112
channel = null ;
102
113
return ;
@@ -106,7 +117,7 @@ protected void doAutoDelete(boolean durable, int queues) throws IOException, Tim
106
117
fail ("Queue bind should have failed" );
107
118
}
108
119
// Do some cleanup
109
- if (queues > 1 && queueNames != null ) {
120
+ if (queues > 1 ) {
110
121
for (String q : queueNames ) {
111
122
channel .queueDelete (q );
112
123
}
0 commit comments