|
23 | 23 | import java.util.ArrayList;
|
24 | 24 | import java.util.List;
|
25 | 25 | import java.util.concurrent.CountDownLatch;
|
26 |
| -import java.util.concurrent.Executor; |
27 | 26 | import java.util.concurrent.Executors;
|
28 | 27 | import java.util.concurrent.TimeUnit;
|
29 | 28 |
|
30 |
| -import org.apache.commons.logging.Log; |
31 |
| -import org.apache.commons.logging.LogFactory; |
32 | 29 | import org.junit.Rule;
|
33 | 30 | import org.junit.Test;
|
34 | 31 |
|
|
40 | 37 | import org.springframework.integration.redis.util.RedisLockRegistry;
|
41 | 38 | import org.springframework.integration.support.leader.LockRegistryLeaderInitiator;
|
42 | 39 | import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
|
| 40 | +import org.springframework.integration.test.support.LongRunningIntegrationTest; |
43 | 41 | import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
44 |
| -import org.springframework.util.ReflectionUtils; |
45 | 42 |
|
46 | 43 | /**
|
47 | 44 | * @author Artem Bilan
|
|
52 | 49 | */
|
53 | 50 | public class RedisLockRegistryLeaderInitiatorTests extends RedisAvailableTests {
|
54 | 51 |
|
55 |
| - private static final Log logger = LogFactory.getLog(RedisLockRegistryLeaderInitiatorTests.class); |
| 52 | + @Rule |
| 53 | + public LongRunningIntegrationTest longTests = new LongRunningIntegrationTest(); |
56 | 54 |
|
57 | 55 | @Rule
|
58 | 56 | public Log4j2LevelAdjuster adjuster =
|
@@ -105,63 +103,37 @@ public void testDistributedLeaderElection() throws Exception {
|
105 | 103 | CountDownLatch acquireLockFailed1 = new CountDownLatch(1);
|
106 | 104 | CountDownLatch acquireLockFailed2 = new CountDownLatch(1);
|
107 | 105 |
|
108 |
| - Executor latchesExecutor = Executors.newCachedThreadPool(); |
109 |
| - |
110 |
| - initiator1.setLeaderEventPublisher(new CountingPublisher(granted1, revoked1, acquireLockFailed1) { |
| 106 | + initiator1.setLeaderEventPublisher(new CountingPublisher(granted1, revoked1, acquireLockFailed1)); |
111 | 107 |
|
112 |
| - @Override |
113 |
| - public void publishOnRevoked(Object source, Context context, String role) { |
114 |
| - latchesExecutor.execute(() -> { |
115 |
| - // It's difficult to see round-robin election, so block one initiator until the second is elected. |
116 |
| - try { |
117 |
| - assertThat(granted2.await(10, TimeUnit.SECONDS), is(true)); |
118 |
| - } |
119 |
| - catch (InterruptedException e) { |
120 |
| - ReflectionUtils.rethrowRuntimeException(e); |
121 |
| - } |
122 |
| - super.publishOnRevoked(source, context, role); |
123 |
| - }); |
| 108 | + initiator2.setLeaderEventPublisher(new CountingPublisher(granted2, revoked2, acquireLockFailed2)); |
124 | 109 |
|
125 |
| - } |
126 |
| - |
127 |
| - }); |
128 |
| - |
129 |
| - initiator2.setLeaderEventPublisher(new CountingPublisher(granted2, revoked2, acquireLockFailed2) { |
130 |
| - |
131 |
| - @Override |
132 |
| - public void publishOnRevoked(Object source, Context context, String role) { |
133 |
| - latchesExecutor.execute(() -> { |
134 |
| - try { |
135 |
| - // It's difficult to see round-robin election, so block one initiator until the second is elected. |
136 |
| - assertThat(granted1.await(10, TimeUnit.SECONDS), is(true)); |
137 |
| - } |
138 |
| - catch (InterruptedException e) { |
139 |
| - ReflectionUtils.rethrowRuntimeException(e); |
140 |
| - } |
141 |
| - super.publishOnRevoked(source, context, role); |
142 |
| - }); |
143 |
| - } |
144 |
| - |
145 |
| - }); |
| 110 | + // It's hard to see round-robin election, so let's make the yielding initiator to sleep long before restarting |
| 111 | + initiator1.setBusyWaitMillis(5000); |
146 | 112 |
|
147 | 113 | initiator1.getContext().yield();
|
148 | 114 |
|
149 | 115 | assertThat(revoked1.await(10, TimeUnit.SECONDS), is(true));
|
| 116 | + assertThat(granted2.await(10, TimeUnit.SECONDS), is(true)); |
150 | 117 |
|
151 | 118 | assertThat(initiator2.getContext().isLeader(), is(true));
|
152 | 119 | assertThat(initiator1.getContext().isLeader(), is(false));
|
153 | 120 |
|
| 121 | + initiator1.setBusyWaitMillis(LockRegistryLeaderInitiator.DEFAULT_BUSY_WAIT_TIME); |
| 122 | + initiator2.setBusyWaitMillis(5000); |
| 123 | + |
154 | 124 | initiator2.getContext().yield();
|
155 | 125 |
|
156 | 126 | assertThat(revoked2.await(10, TimeUnit.SECONDS), is(true));
|
| 127 | + assertThat(granted1.await(10, TimeUnit.SECONDS), is(true)); |
157 | 128 |
|
158 | 129 | assertThat(initiator1.getContext().isLeader(), is(true));
|
159 | 130 | assertThat(initiator2.getContext().isLeader(), is(false));
|
160 | 131 |
|
161 | 132 | initiator2.stop();
|
162 | 133 |
|
163 | 134 | CountDownLatch revoked11 = new CountDownLatch(1);
|
164 |
| - initiator1.setLeaderEventPublisher(new CountingPublisher(new CountDownLatch(1), revoked11, new CountDownLatch(1))); |
| 135 | + initiator1.setLeaderEventPublisher(new CountingPublisher(new CountDownLatch(1), revoked11, |
| 136 | + new CountDownLatch(1))); |
165 | 137 |
|
166 | 138 | initiator1.getContext().yield();
|
167 | 139 |
|
|
0 commit comments