Skip to content

Commit 3fed2ac

Browse files
committed
Fix some bug of flowcontrol plugin
Signed-off-by: hanbingleixue <[email protected]>
1 parent 0d7a205 commit 3fed2ac

File tree

14 files changed

+1145
-21
lines changed

14 files changed

+1145
-21
lines changed

.github/actions/common/plugin-change-check/action.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,6 @@ runs:
966966
# ==========xds service is needed to test?==========
967967
if [ ${{ env.sermantAgentCoreXdsServiceChanged }} == 'true' -o \
968968
${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' -o \
969-
${{ env.sermantFlowcontrolChanged }} == 'true'];then
969+
${{ env.sermantFlowcontrolChanged }} == 'true' ];then
970970
echo "enableXdsFlowControl=true" >> $GITHUB_ENV
971971
fi

sermant-integration-tests/xds-service-test/xds-service-integration-test/src/test/java/io/sermant/xds/service/flowcontrol/XdsFlowControlTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,15 @@ public void testRetry() {
129129

130130
// Test meet the matching rules, retry will not be triggered
131131
result = doGet(buildGateWayErrorUrl(HttpClientType.HTTP_CLIENT, "v1"));
132-
Assertions.assertEquals("3", result.getData());
133-
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP2, "v1"));
134-
Assertions.assertEquals("3", result.getData());
132+
Assertions.assertEquals("2", result.getData());
135133
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP2, "v1"));
136134
Assertions.assertEquals("4", result.getData());
135+
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP2, "v1"));
136+
Assertions.assertEquals("3", result.getData());
137137
result = doGet(buildGateWayErrorUrl(HttpClientType.HTTP_URL_CONNECTION, "v1"));
138-
Assertions.assertEquals("4", result.getData());
139-
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP3, "v1"));
140138
Assertions.assertEquals("5", result.getData());
139+
result = doGet(buildGateWayErrorUrl(HttpClientType.OK_HTTP3, "v1"));
140+
Assertions.assertTrue("4".equals(result.getData()) || "6".equals(result.getData()));
141141
resetRequestCount();
142142

143143
// Test the retry be triggered

sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,16 +245,15 @@ protected Optional<ServiceInstance> chooseServiceInstanceForXds() {
245245
if (CollectionUtils.isEmpty(serviceInstanceSet)) {
246246
return Optional.empty();
247247
}
248+
removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet);
248249
if (RetryContext.INSTANCE.isPolicyNeedRetry()) {
249250
removeRetriedServiceInstance(serviceInstanceSet);
250251
}
251-
removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet);
252252
return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo));
253253
}
254254

255255
private void removeRetriedServiceInstance(Set<ServiceInstance> serviceInstanceSet) {
256256
RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy();
257-
retryPolicy.retryMark();
258257
Set<Object> retriedInstance = retryPolicy.getAllRetriedInstance();
259258
Set<ServiceInstance> allInstance = new HashSet<>(serviceInstanceSet);
260259
for (Object retryInstance : retriedInstance) {
@@ -271,7 +270,9 @@ private ServiceInstance chooseServiceInstanceByLoadBalancer(Set<ServiceInstance>
271270
FlowControlScenario scenarioInfo) {
272271
XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory.getLoadBalancer(scenarioInfo.getServiceName(),
273272
scenarioInfo.getClusterName());
274-
return loadBalancer.selectInstance(new ArrayList<>(instanceSet));
273+
ServiceInstance serviceInstance = loadBalancer.selectInstance(new ArrayList<>(instanceSet));
274+
RetryContext.INSTANCE.updateRetriedServiceInstance(serviceInstance);
275+
return serviceInstance;
275276
}
276277

277278
private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set<ServiceInstance> instanceSet) {

sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/DispatcherServletInterceptor.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.sermant.core.utils.LogUtils;
2323
import io.sermant.core.utils.ReflectUtils;
2424
import io.sermant.flowcontrol.common.config.CommonConst;
25-
import io.sermant.flowcontrol.common.config.ConfigConst;
2625
import io.sermant.flowcontrol.common.entity.FlowControlResult;
2726
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
2827
import io.sermant.flowcontrol.common.entity.HttpRequestEntity;
@@ -57,8 +56,6 @@ public class DispatcherServletInterceptor extends InterceptorSupporter {
5756

5857
private Function<Object, String> getRequestUri;
5958

60-
private Function<Object, String> getPathInfo;
61-
6259
private Function<Object, String> getMethod;
6360

6461
private Function<Object, Enumeration<String>> getHeaderNames;
@@ -89,11 +86,9 @@ private Optional<HttpRequestEntity> convertToHttpEntity(Object request) {
8986
if (request == null) {
9087
return Optional.empty();
9188
}
92-
String uri = getRequestUri.apply(request);
9389
return Optional.of(new HttpRequestEntity.Builder()
9490
.setRequestType(RequestType.SERVER)
95-
.setPathInfo(getPathInfo.apply(request))
96-
.setServletPath(uri)
91+
.setApiPath(getRequestUri.apply(request))
9792
.setHeaders(getHeaders(request))
9893
.setMethod(getMethod.apply(request))
9994
.setServiceName(getHeader.apply(request, ConfigConst.FLOW_REMOTE_SERVICE_NAME_HEADER_KEY))
@@ -232,7 +227,6 @@ private void initFunction() {
232227
boolean canLoadLowVersion = canLoadLowVersion();
233228
if (canLoadLowVersion) {
234229
getRequestUri = obj -> ((HttpServletRequest) obj).getRequestURI();
235-
getPathInfo = obj -> ((HttpServletRequest) obj).getPathInfo();
236230
getMethod = obj -> ((HttpServletRequest) obj).getMethod();
237231
getHeaderNames = obj -> ((HttpServletRequest) obj).getHeaderNames();
238232
getHeader = (obj, key) -> ((HttpServletRequest) obj).getHeader(key);
@@ -246,7 +240,6 @@ private void initFunction() {
246240
setStatus = (obj, code) -> ((HttpServletResponse) obj).setStatus(code);
247241
} else {
248242
getRequestUri = this::getRequestUri;
249-
getPathInfo = this::getPathInfo;
250243
getMethod = this::getMethod;
251244
getHeaderNames = this::getHeaderNames;
252245
getHeader = this::getHeader;

sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientDeclarer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616

1717
package io.sermant.flowcontrol.retry.client;
1818

19-
import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
2019
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
2120
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
2221
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
22+
import io.sermant.flowcontrol.AbstractXdsDeclarer;
2323

2424
/**
2525
* For OKHTTP requests, obtain the instance list from the registry to block them
2626
*
2727
* @author zhp
2828
* @since 2024-12-20
2929
*/
30-
public class OkHttp3ClientDeclarer extends AbstractPluginDeclarer {
30+
public class OkHttp3ClientDeclarer extends AbstractXdsDeclarer {
3131
/**
3232
* The fully qualified name of the enhanced okhttp request
3333
*/

sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainDeclarer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616

1717
package io.sermant.flowcontrol.retry.client;
1818

19-
import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
2019
import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
2120
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
2221
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
22+
import io.sermant.flowcontrol.AbstractXdsDeclarer;
2323

2424
/**
2525
* For OKHTTP requests, modify the URL of request
2626
*
2727
* @author zhp
2828
* @since 2024-12-20
2929
*/
30-
public class OkHttpClientInterceptorChainDeclarer extends AbstractPluginDeclarer {
30+
public class OkHttpClientInterceptorChainDeclarer extends AbstractXdsDeclarer {
3131
private static final String ENHANCE_CLASSES =
3232
"com.squareup.okhttp.Call$ApplicationInterceptorChain";
3333

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright (C) 2025-2025 Sermant Authors. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.sermant.flowcontrol.retry.client;
18+
19+
20+
import io.sermant.core.plugin.agent.entity.ExecuteContext;
21+
import io.sermant.core.plugin.config.PluginConfigManager;
22+
import io.sermant.core.plugin.service.PluginServiceManager;
23+
import io.sermant.core.service.ServiceManager;
24+
import io.sermant.core.service.xds.XdsCoreService;
25+
import io.sermant.core.service.xds.XdsFlowControlService;
26+
import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers;
27+
import io.sermant.flowcontrol.common.config.FlowControlConfig;
28+
import io.sermant.flowcontrol.common.entity.FlowControlResponse;
29+
import io.sermant.flowcontrol.common.entity.FlowControlResult;
30+
import io.sermant.flowcontrol.common.entity.FlowControlScenario;
31+
import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil;
32+
import io.sermant.flowcontrol.common.xds.circuit.XdsCircuitBreakerManager;
33+
import io.sermant.flowcontrol.inject.ErrorCloseableHttpResponse;
34+
import io.sermant.flowcontrol.service.rest4j.XdsHttpFlowControlService;
35+
36+
import org.apache.http.HttpStatus;
37+
import org.apache.http.client.methods.HttpGet;
38+
import org.junit.After;
39+
import org.junit.Assert;
40+
import org.junit.Before;
41+
import org.junit.Test;
42+
import org.mockito.MockedStatic;
43+
import org.mockito.Mockito;
44+
45+
import java.lang.reflect.Method;
46+
import java.net.URI;
47+
import java.util.Optional;
48+
import java.util.concurrent.atomic.AtomicBoolean;
49+
50+
import static org.mockito.ArgumentMatchers.any;
51+
import static org.mockito.Mockito.doAnswer;
52+
53+
/**
54+
* HttpClient4xIntercepto test
55+
*
56+
* @author zhp
57+
* @since 2025-04-11
58+
*/
59+
public class HttpClient4xInterceptorTest {
60+
private MockedStatic<PluginConfigManager> pluginConfigManagerMockedStatic;
61+
62+
private MockedStatic<PluginServiceManager> pluginServiceManagerMockedStatic;
63+
64+
private MockedStatic<ServiceManager> serviceManagerMockedStatic;
65+
66+
private XdsHttpFlowControlService xdsHttpFlowControlService;
67+
68+
private HttpGet httpRequest;
69+
70+
private Method method = this.getClass().getMethod("getResult", null);
71+
72+
private XdsFlowControlService xdsFlowControlService;
73+
74+
public HttpClient4xInterceptorTest() throws NoSuchMethodException {
75+
}
76+
77+
@After
78+
public void tearDown() {
79+
pluginConfigManagerMockedStatic.close();
80+
pluginServiceManagerMockedStatic.close();
81+
serviceManagerMockedStatic.close();
82+
}
83+
84+
/**
85+
* preinitialization
86+
*
87+
* @throws Exception initialization failure thrown
88+
*/
89+
@Before
90+
public void before() throws Exception {
91+
pluginConfigManagerMockedStatic = Mockito
92+
.mockStatic(PluginConfigManager.class);
93+
pluginConfigManagerMockedStatic.when(() -> PluginConfigManager.getPluginConfig(FlowControlConfig.class))
94+
.thenReturn(new FlowControlConfig());
95+
pluginServiceManagerMockedStatic = Mockito.mockStatic(PluginServiceManager.class);
96+
xdsHttpFlowControlService = Mockito.mock(XdsHttpFlowControlService.class);
97+
pluginServiceManagerMockedStatic.when(() -> PluginServiceManager.getPluginService(XdsHttpFlowControlService.class))
98+
.thenReturn(xdsHttpFlowControlService);
99+
serviceManagerMockedStatic = Mockito.mockStatic(ServiceManager.class);
100+
XdsCoreService xdsCoreService = Mockito.mock(XdsCoreService.class);
101+
serviceManagerMockedStatic.when(() -> ServiceManager.getService(XdsCoreService.class)).thenReturn(xdsCoreService);
102+
xdsFlowControlService = Mockito.mock(XdsFlowControlService.class);
103+
Mockito.when(xdsCoreService.getXdsFlowControlService()).thenReturn(xdsFlowControlService);
104+
method = Mockito.mock(Method.class);
105+
httpRequest = new HttpGet();
106+
}
107+
108+
@Test
109+
public void test() throws NoSuchMethodException {
110+
HttpClient4xInterceptor interceptor = new HttpClient4xInterceptor();
111+
// test parameter type is incorrect
112+
ExecuteContext executeContext = buildErrorContext();
113+
interceptor.doBefore(executeContext);
114+
Assert.assertNull(executeContext.getResult());
115+
116+
// test request URL does not meet the requirements
117+
executeContext = buildContext();
118+
String resultMsg = "success";
119+
int code = HttpStatus.SC_BAD_REQUEST;
120+
AtomicBoolean triggeredFlag = new AtomicBoolean(true);
121+
doAnswer(invocation -> {
122+
if (triggeredFlag.get()) {
123+
Object[] args = invocation.getArguments();
124+
FlowControlResult flowControlResult = (FlowControlResult) args[1];
125+
flowControlResult.setSkip(true);
126+
flowControlResult.setResponse(new FlowControlResponse(resultMsg, code));
127+
}
128+
return null;
129+
}).when(xdsHttpFlowControlService).onBefore(any(), any());
130+
interceptor.doBefore(executeContext);
131+
Assert.assertNull(executeContext.getResult());
132+
133+
// test triggered flow control rules, abort the request
134+
httpRequest.setURI(URI.create("http://provider:8080/path"));
135+
interceptor.doBefore(executeContext);
136+
Object result = executeContext.getResult();
137+
Assert.assertTrue(result instanceof ErrorCloseableHttpResponse);
138+
ErrorCloseableHttpResponse response = (ErrorCloseableHttpResponse) result;
139+
Assert.assertEquals(code, response.getStatusLine().getStatusCode());
140+
Assert.assertEquals(resultMsg, response.getStatusLine().getReasonPhrase());
141+
142+
// test trigger circuit breaker rules
143+
triggeredFlag.set(false);
144+
XdsRequestCircuitBreakers xdsRequestCircuitBreakers = new XdsRequestCircuitBreakers();
145+
Mockito.when(xdsFlowControlService.getRequestCircuitBreakers(any(), any()))
146+
.thenReturn(Optional.of(xdsRequestCircuitBreakers));
147+
xdsRequestCircuitBreakers.setMaxRequests(1);
148+
FlowControlScenario scenario = new FlowControlScenario();
149+
scenario.setClusterName("test");
150+
scenario.setServiceName("provider");
151+
XdsCircuitBreakerManager.incrementActiveRequests(scenario.getServiceName(), scenario.getClusterName());
152+
XdsThreadLocalUtil.setScenarioInfo(scenario);
153+
interceptor.doBefore(executeContext);
154+
result = executeContext.getResult();
155+
Assert.assertTrue(result instanceof ErrorCloseableHttpResponse);
156+
response = (ErrorCloseableHttpResponse) result;
157+
Assert.assertEquals(HttpStatus.SC_INTERNAL_SERVER_ERROR, response.getStatusLine().getStatusCode());
158+
Assert.assertEquals("CircuitBreaker has forced open and deny any requests",
159+
response.getStatusLine().getReasonPhrase());
160+
}
161+
162+
private ExecuteContext buildContext() throws NoSuchMethodException {
163+
httpRequest.setURI(URI.create("http://127.0.0.1:8080/path"));
164+
return ExecuteContext.forMemberMethod(new HttpClient4xInterceptorTest(), method, new Object[]{null, httpRequest}, null, null);
165+
}
166+
167+
private ExecuteContext buildErrorContext() throws NoSuchMethodException {
168+
return ExecuteContext.forMemberMethod(new HttpClient4xInterceptorTest(), method, new Object[]{null, ""}, null, null);
169+
}
170+
171+
public String getResult(){
172+
return "success";
173+
}
174+
}

0 commit comments

Comments
 (0)