|
13 | 13 |
|
14 | 14 | package io.dapr.it.resiliency;
|
15 | 15 |
|
| 16 | +import com.github.tomakehurst.wiremock.junit5.WireMockTest; |
| 17 | +import eu.rekawek.toxiproxy.Proxy; |
| 18 | +import eu.rekawek.toxiproxy.ToxiproxyClient; |
| 19 | +import eu.rekawek.toxiproxy.model.ToxicDirection; |
| 20 | +import eu.rekawek.toxiproxy.model.toxic.Latency; |
| 21 | +import eu.rekawek.toxiproxy.model.toxic.Timeout; |
16 | 22 | import io.dapr.client.DaprClient;
|
17 | 23 | import io.dapr.client.DaprClientBuilder;
|
18 |
| -import io.dapr.client.DaprClientImpl; |
19 | 24 | import io.dapr.client.resiliency.ResiliencyOptions;
|
20 |
| -import io.dapr.it.BaseIT; |
21 |
| -import io.dapr.it.DaprRun; |
22 |
| -import io.dapr.it.ToxiProxyRun; |
23 |
| -import org.junit.jupiter.api.AfterAll; |
| 25 | +import io.dapr.config.Properties; |
| 26 | +import io.dapr.exceptions.DaprException; |
| 27 | +import io.dapr.testcontainers.DaprContainer; |
| 28 | +import io.dapr.testcontainers.DaprLogLevel; |
| 29 | +import org.junit.jupiter.api.Assertions; |
24 | 30 | import org.junit.jupiter.api.BeforeAll;
|
| 31 | +import org.junit.jupiter.api.BeforeEach; |
| 32 | +import org.junit.jupiter.api.DisplayName; |
| 33 | +import org.junit.jupiter.api.Tag; |
| 34 | +import org.junit.jupiter.api.Tags; |
25 | 35 | import org.junit.jupiter.api.Test;
|
26 |
| - |
27 |
| -import java.nio.charset.StandardCharsets; |
| 36 | +import org.slf4j.LoggerFactory; |
| 37 | +import org.testcontainers.containers.Network; |
| 38 | +import org.testcontainers.containers.ToxiproxyContainer; |
| 39 | +import org.testcontainers.containers.output.Slf4jLogConsumer; |
| 40 | +import org.testcontainers.junit.jupiter.Container; |
| 41 | +import org.testcontainers.junit.jupiter.Testcontainers; |
| 42 | +import org.testcontainers.shaded.org.awaitility.Awaitility; |
| 43 | +import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException; |
| 44 | + |
| 45 | +import java.io.IOException; |
28 | 46 | import java.time.Duration;
|
29 |
| -import java.util.Base64; |
30 |
| -import java.util.UUID; |
31 |
| -import java.util.concurrent.atomic.AtomicInteger; |
32 |
| - |
33 |
| -import static org.junit.jupiter.api.Assertions.assertEquals; |
34 |
| -import static org.junit.jupiter.api.Assertions.assertTrue; |
35 |
| - |
36 |
| -/** |
37 |
| - * Test SDK resiliency. |
38 |
| - */ |
39 |
| -public class SdkResiliencyIT extends BaseIT { |
40 |
| - |
41 |
| - private static final int NUM_ITERATIONS = 35; |
42 |
| - |
43 |
| - private static final Duration TIMEOUT = Duration.ofMillis(100); |
44 |
| - |
45 |
| - private static final Duration LATENCY = TIMEOUT.dividedBy(3); |
46 |
| - |
47 |
| - private static final Duration JITTER = TIMEOUT.multipliedBy(3); |
| 47 | +import java.util.List; |
| 48 | + |
| 49 | +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; |
| 50 | +import static com.github.tomakehurst.wiremock.client.WireMock.any; |
| 51 | +import static com.github.tomakehurst.wiremock.client.WireMock.configureFor; |
| 52 | +import static com.github.tomakehurst.wiremock.client.WireMock.get; |
| 53 | +import static com.github.tomakehurst.wiremock.client.WireMock.post; |
| 54 | +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; |
| 55 | +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; |
| 56 | +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; |
| 57 | +import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; |
| 58 | +import static com.github.tomakehurst.wiremock.client.WireMock.verify; |
| 59 | +import static io.dapr.it.resiliency.SdkResiliencyIT.WIREMOCK_PORT; |
| 60 | +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; |
| 61 | +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; |
| 62 | +import static org.junit.jupiter.api.Assertions.assertThrows; |
| 63 | + |
| 64 | +@Testcontainers |
| 65 | +@WireMockTest(httpPort = WIREMOCK_PORT) |
| 66 | +@Tags({@Tag("testcontainers"), @Tag("resiliency")}) |
| 67 | +public class SdkResiliencyIT { |
| 68 | + |
| 69 | + public static final int WIREMOCK_PORT = 8888; |
| 70 | + private static final Network NETWORK = Network.newNetwork(); |
| 71 | + private static final String STATE_STORE_NAME = "kvstore"; |
| 72 | + private static final int INFINITE_RETRY = -1; |
| 73 | + |
| 74 | + @Container |
| 75 | + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) |
| 76 | + .withAppName("dapr-app") |
| 77 | + .withAppPort(WIREMOCK_PORT) |
| 78 | + .withDaprLogLevel(DaprLogLevel.DEBUG) |
| 79 | + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("dapr-logs"))) |
| 80 | + .withAppHealthCheckPath("/actuator/health") |
| 81 | + .withAppChannelAddress("host.testcontainers.internal") |
| 82 | + .withNetworkAliases(List.of("dapr")) |
| 83 | + .withNetwork(NETWORK); |
| 84 | + |
| 85 | + @Container |
| 86 | + private static final ToxiproxyContainer TOXIPROXY = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0") |
| 87 | + .withNetwork(NETWORK); |
| 88 | + |
| 89 | + private static Proxy proxy; |
| 90 | + |
| 91 | + private void configStub() { |
| 92 | + stubFor(any(urlMatching("/actuator/health")) |
| 93 | + .willReturn(aResponse().withBody("[]").withStatus(200))); |
| 94 | + |
| 95 | + stubFor(any(urlMatching("/dapr/subscribe")) |
| 96 | + .willReturn(aResponse().withBody("[]").withStatus(200))); |
| 97 | + |
| 98 | + stubFor(get(urlMatching("/dapr/config")) |
| 99 | + .willReturn(aResponse().withBody("[]").withStatus(200))); |
| 100 | + |
| 101 | + // create a stub for simulating dapr sidecar with timeout of 1000 ms |
| 102 | + var ms1000 = 1000; |
| 103 | + stubFor(post(urlEqualTo("/dapr.proto.runtime.v1.Dapr/SaveState")) |
| 104 | + .willReturn(aResponse().withStatus(204).withFixedDelay(ms1000))); |
| 105 | + |
| 106 | + stubFor(any(urlMatching("/([a-z1-9]*)")) |
| 107 | + .willReturn(aResponse().withBody("[]").withStatus(200))); |
| 108 | + |
| 109 | + configureFor("localhost", WIREMOCK_PORT); |
| 110 | + } |
48 | 111 |
|
49 |
| - private static final int MAX_RETRIES = -1; // Infinity |
| 112 | + @BeforeAll |
| 113 | + static void configure() throws IOException { |
| 114 | + ToxiproxyClient toxiproxyClient = new ToxiproxyClient(TOXIPROXY.getHost(), TOXIPROXY.getControlPort()); |
| 115 | + proxy = |
| 116 | + toxiproxyClient.createProxy("dapr", "0.0.0.0:8666", "dapr:3500"); |
| 117 | + } |
50 | 118 |
|
51 |
| - private static DaprClient daprClient; |
| 119 | + @BeforeEach |
| 120 | + public void setDaprProperties() { |
| 121 | + configStub(); |
| 122 | + org.testcontainers.Testcontainers.exposeHostPorts(WIREMOCK_PORT); |
| 123 | + } |
52 | 124 |
|
53 |
| - private static ToxiProxyRun toxiProxyRun; |
| 125 | + @Test |
| 126 | + @DisplayName("should throw exception when the configured timeout exceeding waitForSidecar's timeout") |
| 127 | + public void testSidecarWithoutTimeout() { |
| 128 | + // arrange |
| 129 | + var ms3000 = 3000; |
| 130 | + var ms2000 = 2000; |
| 131 | + |
| 132 | + // act |
| 133 | + Assertions.assertThrows(RuntimeException.class, () -> { |
| 134 | + try (DaprClient client = createDaprClientBuilder().build()) { |
| 135 | + Timeout timeout = proxy.toxics().timeout("timeout", ToxicDirection.DOWNSTREAM, ms3000); |
| 136 | + client.waitForSidecar(ms2000).block(); |
| 137 | + timeout.remove(); |
| 138 | + } |
| 139 | + }); |
| 140 | + } |
54 | 141 |
|
55 |
| - private static DaprClient daprToxiClient; |
| 142 | + @Test |
| 143 | + @DisplayName("should fail when resiliency options has 900ms and the latency is 950ms") |
| 144 | + public void shouldFailDueToLatencyExceedingConfiguration() throws Exception { |
| 145 | + // arrange |
| 146 | + Duration ms900 = Duration.ofMillis(900); |
| 147 | + Duration ms950 = Duration.ofMillis(950); |
| 148 | + Latency latency = proxy.toxics().latency("latency", ToxicDirection.DOWNSTREAM, ms950.toMillis()); |
| 149 | + |
| 150 | + DaprClient client = |
| 151 | + createDaprClientBuilder().withResiliencyOptions(new ResiliencyOptions().setTimeout(ms900)) |
| 152 | + .build(); |
56 | 153 |
|
57 |
| - private static DaprClient daprResilientClient; |
| 154 | + // act |
| 155 | + String errorMessage = assertThrows(DaprException.class, () -> { |
| 156 | + client.saveState(STATE_STORE_NAME, "users", "[]").block(); |
| 157 | + }).getMessage(); |
58 | 158 |
|
59 |
| - private static DaprClient daprRetriesOnceClient; |
| 159 | + // assert |
| 160 | + assertThat(errorMessage).contains("DEADLINE_EXCEEDED"); |
60 | 161 |
|
61 |
| - private final String randomStateKeyPrefix = UUID.randomUUID().toString(); |
| 162 | + // clean up |
| 163 | + latency.remove(); |
| 164 | + client.close(); |
| 165 | + } |
62 | 166 |
|
63 |
| - @BeforeAll |
64 |
| - public static void init() throws Exception { |
65 |
| - DaprRun daprRun = startDaprApp(SdkResiliencyIT.class.getSimpleName(), 5000); |
66 |
| - daprClient = daprRun.newDaprClientBuilder().build(); |
67 |
| - daprClient.waitForSidecar(8000).block(); |
| 167 | + @Test |
| 168 | + @DisplayName("should fail when resiliency's options has infinite retry with time 900ms and the latency is 950ms") |
| 169 | + public void shouldFailDueToLatencyExceedingConfigurationWithInfiniteRetry() throws Exception { |
| 170 | + Duration ms900 = Duration.ofMillis(900); |
| 171 | + Duration ms950 = Duration.ofMillis(950); |
68 | 172 |
|
69 |
| - toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER); |
70 |
| - toxiProxyRun.start(); |
| 173 | + Latency latency = proxy.toxics().latency("latency-infinite-retry", ToxicDirection.DOWNSTREAM, ms950.toMillis()); |
71 | 174 |
|
72 |
| - daprToxiClient = toxiProxyRun.newDaprClientBuilder() |
73 |
| - .withResiliencyOptions( |
74 |
| - new ResiliencyOptions().setTimeout(TIMEOUT)) |
75 |
| - .build(); |
76 |
| - daprResilientClient = toxiProxyRun.newDaprClientBuilder() |
77 |
| - .withResiliencyOptions( |
78 |
| - new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(MAX_RETRIES)) |
79 |
| - .build(); |
80 |
| - daprRetriesOnceClient = toxiProxyRun.newDaprClientBuilder() |
81 |
| - .withResiliencyOptions( |
82 |
| - new ResiliencyOptions().setTimeout(TIMEOUT).setMaxRetries(1)) |
| 175 | + DaprClient client = |
| 176 | + createDaprClientBuilder().withResiliencyOptions(new ResiliencyOptions().setTimeout(ms900).setMaxRetries( |
| 177 | + INFINITE_RETRY)) |
83 | 178 | .build();
|
84 | 179 |
|
85 |
| - assertTrue(daprClient instanceof DaprClientImpl); |
86 |
| - assertTrue(daprToxiClient instanceof DaprClientImpl); |
87 |
| - assertTrue(daprResilientClient instanceof DaprClientImpl); |
88 |
| - assertTrue(daprRetriesOnceClient instanceof DaprClientImpl); |
89 |
| - } |
90 |
| - |
91 |
| - @AfterAll |
92 |
| - public static void tearDown() throws Exception { |
93 |
| - if (daprClient != null) { |
94 |
| - daprClient.close(); |
95 |
| - } |
96 |
| - if (daprToxiClient != null) { |
97 |
| - daprToxiClient.close(); |
98 |
| - } |
99 |
| - if (daprResilientClient != null) { |
100 |
| - daprResilientClient.close(); |
101 |
| - } |
102 |
| - if (daprRetriesOnceClient != null) { |
103 |
| - daprRetriesOnceClient.close(); |
104 |
| - } |
105 |
| - if (toxiProxyRun != null) { |
106 |
| - toxiProxyRun.stop(); |
107 |
| - } |
| 180 | + Assertions.assertThrows(ConditionTimeoutException.class, () -> { |
| 181 | + Awaitility.await("10 seconds because the retry should be infinite") |
| 182 | + .atMost(Duration.ofSeconds(10)) |
| 183 | + .until(() -> { |
| 184 | + boolean finished = true; |
| 185 | + client.saveState(STATE_STORE_NAME, "users", "[]").block(); |
| 186 | + return finished; |
| 187 | + }); |
| 188 | + }); |
| 189 | + |
| 190 | + // clean up |
| 191 | + latency.remove(); |
| 192 | + client.close(); |
108 | 193 | }
|
109 | 194 |
|
110 | 195 | @Test
|
111 |
| - public void retryAndTimeout() { |
112 |
| - AtomicInteger toxiClientErrorCount = new AtomicInteger(); |
113 |
| - AtomicInteger retryOnceClientErrorCount = new AtomicInteger(); |
114 |
| - |
115 |
| - while (true){ |
116 |
| - for (int i = 0; i < NUM_ITERATIONS; i++) { |
117 |
| - String key = randomStateKeyPrefix + "_" + i; |
118 |
| - String value = Base64.getEncoder().encodeToString(key.getBytes(StandardCharsets.UTF_8)); |
119 |
| - try { |
120 |
| - daprToxiClient.saveState(STATE_STORE_NAME, key, value).block(); |
121 |
| - } catch (Exception e) { |
122 |
| - // This call should fail sometimes. So, we count. |
123 |
| - toxiClientErrorCount.incrementAndGet(); |
124 |
| - } |
125 |
| - try { |
126 |
| - daprRetriesOnceClient.saveState(STATE_STORE_NAME, key, value).block(); |
127 |
| - } catch (Exception e) { |
128 |
| - // This call should fail sometimes. So, we count. |
129 |
| - retryOnceClientErrorCount.incrementAndGet(); |
130 |
| - } |
131 |
| - |
132 |
| - // We retry forever so that the call below should always work. |
133 |
| - daprResilientClient.saveState(STATE_STORE_NAME, key, value).block(); |
134 |
| - // Makes sure the value was actually saved. |
135 |
| - String savedValue = daprClient.getState(STATE_STORE_NAME, key, String.class).block().getValue(); |
136 |
| - assertEquals(value, savedValue); |
137 |
| - } |
138 |
| - |
139 |
| - // We should have at least one success per client, otherwise retry. |
140 |
| - if(toxiClientErrorCount.get() < NUM_ITERATIONS && retryOnceClientErrorCount.get() < NUM_ITERATIONS){ |
141 |
| - // This assertion makes sure that toxicity is on |
142 |
| - assertTrue(toxiClientErrorCount.get() > 0, "Toxi client error count is 0"); |
143 |
| - assertTrue(retryOnceClientErrorCount.get() > 0, "Retry once client error count is 0"); |
144 |
| - // A client without retries should have more errors than a client with one retry. |
| 196 | + public void shouldFailDueToLatencyExceedingConfigurationWithOnceRetry() throws Exception { |
| 197 | + |
| 198 | + // arrange |
| 199 | + Duration ms900 = Duration.ofMillis(900); |
| 200 | + DaprClient client = |
| 201 | + new DaprClientBuilder().withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + WIREMOCK_PORT) |
| 202 | + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + WIREMOCK_PORT) |
| 203 | + .withResiliencyOptions(new ResiliencyOptions().setTimeout(ms900) |
| 204 | + .setMaxRetries(1)) |
| 205 | + .build(); |
145 | 206 |
|
146 |
| - String failureMessage = formatFailureMessage(toxiClientErrorCount, retryOnceClientErrorCount); |
147 |
| - assertTrue(toxiClientErrorCount.get() > retryOnceClientErrorCount.get(), failureMessage); |
148 |
| - break; |
149 |
| - } |
150 |
| - toxiClientErrorCount.set(0); |
151 |
| - retryOnceClientErrorCount.set(0); |
| 207 | + try { |
| 208 | + client.saveState(STATE_STORE_NAME, "users", "[]").block(); |
| 209 | + } catch (Exception ignored) { |
152 | 210 | }
|
| 211 | + |
| 212 | + // assert |
| 213 | + verify(2, postRequestedFor(urlEqualTo("/dapr.proto.runtime.v1.Dapr/SaveState"))); |
| 214 | + |
| 215 | + // clean up |
| 216 | + client.close(); |
153 | 217 | }
|
154 | 218 |
|
155 |
| - private static String formatFailureMessage( |
156 |
| - AtomicInteger toxiClientErrorCount, |
157 |
| - AtomicInteger retryOnceClientErrorCount |
158 |
| - ) { |
159 |
| - return String.format( |
160 |
| - "Toxi client error count: %d, Retry once client error count: %d", |
161 |
| - toxiClientErrorCount.get(), |
162 |
| - retryOnceClientErrorCount.get() |
163 |
| - ); |
| 219 | + private static DaprClientBuilder createDaprClientBuilder() { |
| 220 | + return new DaprClientBuilder() |
| 221 | + .withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + TOXIPROXY.getMappedPort(8666)) |
| 222 | + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + TOXIPROXY.getMappedPort(8666)); |
164 | 223 | }
|
| 224 | + |
165 | 225 | }
|
0 commit comments