Skip to content

Commit 6fc9752

Browse files
committed
unwrap Forwarding subchannel
1 parent c1b4da9 commit 6fc9752

File tree

10 files changed

+214
-55
lines changed

10 files changed

+214
-55
lines changed

api/src/main/java/io/grpc/LoadBalancer.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,28 @@ public static PickResult withSubchannel(Subchannel subchannel) {
661661
return withSubchannel(subchannel, null);
662662
}
663663

664+
/**
665+
* Equivalent to {@code PickResult.withSubchannel(subchannel, this.getStreamTracerFactory())},
666+
* but retains the authority override if it exists.
667+
*
668+
* @since 1.80.0
669+
*/
670+
public PickResult withSubchannelReplacement(Subchannel subchannel) {
671+
return new PickResult(checkNotNull(subchannel, "subchannel"), streamTracerFactory,
672+
status, drop, authorityOverride);
673+
}
674+
675+
/**
676+
* Equivalent to {@code PickResult.withSubchannel(this.getSubchannel(), streamTracerFactory)},
677+
* but retains the authority override if it exists.
678+
*
679+
* @since 1.80.0
680+
*/
681+
public PickResult withStreamTracerFactory(
682+
@Nullable ClientStreamTracer.Factory streamTracerFactory) {
683+
return new PickResult(subchannel, streamTracerFactory, status, drop, authorityOverride);
684+
}
685+
664686
/**
665687
* A decision to report a connectivity error to the RPC. If the RPC is {@link
666688
* CallOptions#withWaitForReady wait-for-ready}, it will stay buffered. Otherwise, it will fail

api/src/test/java/io/grpc/LoadBalancerTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,26 @@ public void pickResult_withSubchannelAndTracer() {
6464
assertThat(result.isDrop()).isFalse();
6565
}
6666

67+
@Test
68+
public void pickResult_withSubchannelReplacement() {
69+
PickResult result = PickResult.withSubchannel(subchannel, tracerFactory)
70+
.withSubchannelReplacement(subchannel2);
71+
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel2);
72+
assertThat(result.getStatus()).isSameInstanceAs(Status.OK);
73+
assertThat(result.getStreamTracerFactory()).isSameInstanceAs(tracerFactory);
74+
assertThat(result.isDrop()).isFalse();
75+
}
76+
77+
@Test
78+
public void pickResult_withStreamTracerFactory() {
79+
PickResult result = PickResult.withSubchannel(subchannel)
80+
.withStreamTracerFactory(tracerFactory);
81+
assertThat(result.getSubchannel()).isSameInstanceAs(subchannel);
82+
assertThat(result.getStatus()).isSameInstanceAs(Status.OK);
83+
assertThat(result.getStreamTracerFactory()).isSameInstanceAs(tracerFactory);
84+
assertThat(result.isDrop()).isFalse();
85+
}
86+
6787
@Test
6888
public void pickResult_withNoResult() {
6989
PickResult result = PickResult.withNoResult();

services/src/main/java/io/grpc/protobuf/services/HealthCheckingLoadBalancerFactory.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,30 @@ void setHealthCheckedService(@Nullable String service) {
144144
public String toString() {
145145
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
146146
}
147+
148+
@Override
149+
public void updateBalancingState(
150+
io.grpc.ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {
151+
delegate().updateBalancingState(newState, new HealthCheckPicker(newPicker));
152+
}
153+
154+
private final class HealthCheckPicker extends LoadBalancer.SubchannelPicker {
155+
private final LoadBalancer.SubchannelPicker delegate;
156+
157+
HealthCheckPicker(LoadBalancer.SubchannelPicker delegate) {
158+
this.delegate = delegate;
159+
}
160+
161+
@Override
162+
public LoadBalancer.PickResult pickSubchannel(LoadBalancer.PickSubchannelArgs args) {
163+
LoadBalancer.PickResult result = delegate.pickSubchannel(args);
164+
LoadBalancer.Subchannel subchannel = result.getSubchannel();
165+
if (subchannel instanceof SubchannelImpl) {
166+
return result.withSubchannelReplacement(((SubchannelImpl) subchannel).delegate);
167+
}
168+
return result;
169+
}
170+
}
147171
}
148172

149173
@VisibleForTesting

util/src/main/java/io/grpc/util/HealthProducerHelper.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.google.common.annotations.VisibleForTesting;
2424
import io.grpc.Attributes;
25+
import io.grpc.ConnectivityState;
2526
import io.grpc.ConnectivityStateInfo;
2627
import io.grpc.Internal;
2728
import io.grpc.LoadBalancer;
@@ -84,6 +85,31 @@ protected LoadBalancer.Helper delegate() {
8485
return delegate;
8586
}
8687

88+
@Override
89+
public void updateBalancingState(
90+
ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {
91+
delegate.updateBalancingState(newState, new HealthProducerPicker(newPicker));
92+
}
93+
94+
private static final class HealthProducerPicker extends LoadBalancer.SubchannelPicker {
95+
private final LoadBalancer.SubchannelPicker delegate;
96+
97+
HealthProducerPicker(LoadBalancer.SubchannelPicker delegate) {
98+
this.delegate = delegate;
99+
}
100+
101+
@Override
102+
public LoadBalancer.PickResult pickSubchannel(LoadBalancer.PickSubchannelArgs args) {
103+
LoadBalancer.PickResult result = delegate.pickSubchannel(args);
104+
LoadBalancer.Subchannel subchannel = result.getSubchannel();
105+
if (subchannel instanceof HealthProducerSubchannel) {
106+
return result.withSubchannelReplacement(
107+
((HealthProducerSubchannel) subchannel).delegate());
108+
}
109+
return result;
110+
}
111+
}
112+
87113
// The parent subchannel in the health check producer LB chain. It duplicates subchannel state to
88114
// both the state listener and health listener.
89115
@VisibleForTesting

util/src/main/java/io/grpc/util/OutlierDetectionLoadBalancer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,14 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
442442

443443
Subchannel subchannel = pickResult.getSubchannel();
444444
if (subchannel != null) {
445-
return PickResult.withSubchannel(subchannel, new ResultCountingClientStreamTracerFactory(
446-
subchannel.getAttributes().get(ENDPOINT_TRACKER_KEY),
447-
pickResult.getStreamTracerFactory()));
445+
EndpointTracker tracker = subchannel.getAttributes().get(ENDPOINT_TRACKER_KEY);
446+
if (subchannel instanceof OutlierDetectionSubchannel) {
447+
subchannel = ((OutlierDetectionSubchannel) subchannel).delegate();
448+
}
449+
return pickResult.withSubchannelReplacement(subchannel)
450+
.withStreamTracerFactory(new ResultCountingClientStreamTracerFactory(
451+
tracker,
452+
pickResult.getStreamTracerFactory()));
448453
}
449454

450455
return pickResult;

util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ public void delegatePick() throws Exception {
408408
// Make sure that we can pick the single READY subchannel.
409409
SubchannelPicker picker = pickerCaptor.getAllValues().get(2);
410410
PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class));
411-
Subchannel s = ((OutlierDetectionSubchannel) pickResult.getSubchannel()).delegate();
411+
Subchannel s = pickResult.getSubchannel();
412412
if (s instanceof HealthProducerHelper.HealthProducerSubchannel) {
413413
s = ((HealthProducerHelper.HealthProducerSubchannel) s).delegate();
414414
}

xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -252,42 +252,55 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
252252
args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
253253
final Subchannel subchannel = delegate().createSubchannel(args);
254254

255-
return new ForwardingSubchannel() {
256-
@Override
257-
public void start(SubchannelStateListener listener) {
258-
delegate().start(new SubchannelStateListener() {
259-
@Override
260-
public void onSubchannelState(ConnectivityStateInfo newState) {
261-
// Do nothing if LB has been shutdown
262-
if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
263-
// Get locality based on the connected address attributes
264-
ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
265-
subchannel.getConnectedAddressAttributes());
266-
ClusterLocality oldClusterLocality = localityAtomicReference
267-
.getAndSet(updatedClusterLocality);
268-
oldClusterLocality.release();
255+
return new ClusterImplSubchannel(subchannel, localityAtomicReference);
256+
}
257+
258+
private final class ClusterImplSubchannel extends ForwardingSubchannel {
259+
private final Subchannel delegate;
260+
private final AtomicReference<ClusterLocality> localityAtomicReference;
261+
262+
private ClusterImplSubchannel(
263+
Subchannel delegate, AtomicReference<ClusterLocality> localityAtomicReference) {
264+
this.delegate = delegate;
265+
this.localityAtomicReference = localityAtomicReference;
266+
}
267+
268+
@Override
269+
public void start(SubchannelStateListener listener) {
270+
delegate().start(
271+
new SubchannelStateListener() {
272+
@Override
273+
public void onSubchannelState(ConnectivityStateInfo newState) {
274+
// Do nothing if LB has been shutdown
275+
if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
276+
// Get locality based on the connected address attributes
277+
ClusterLocality updatedClusterLocality =
278+
createClusterLocalityFromAttributes(
279+
delegate.getConnectedAddressAttributes());
280+
ClusterLocality oldClusterLocality =
281+
localityAtomicReference.getAndSet(updatedClusterLocality);
282+
oldClusterLocality.release();
283+
}
284+
listener.onSubchannelState(newState);
269285
}
270-
listener.onSubchannelState(newState);
271-
}
272-
});
273-
}
286+
});
287+
}
274288

275-
@Override
276-
public void shutdown() {
277-
localityAtomicReference.get().release();
278-
delegate().shutdown();
279-
}
289+
@Override
290+
public void shutdown() {
291+
localityAtomicReference.get().release();
292+
delegate().shutdown();
293+
}
280294

281-
@Override
282-
public void updateAddresses(List<EquivalentAddressGroup> addresses) {
283-
delegate().updateAddresses(withAdditionalAttributes(addresses));
284-
}
295+
@Override
296+
public void updateAddresses(List<EquivalentAddressGroup> addresses) {
297+
delegate().updateAddresses(withAdditionalAttributes(addresses));
298+
}
285299

286-
@Override
287-
protected Subchannel delegate() {
288-
return subchannel;
289-
}
290-
};
300+
@Override
301+
protected Subchannel delegate() {
302+
return delegate;
303+
}
291304
}
292305

293306
private List<EquivalentAddressGroup> withAdditionalAttributes(
@@ -411,6 +424,13 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
411424
}
412425
}
413426
PickResult result = delegate.pickSubchannel(args);
427+
Subchannel subchannel = result.getSubchannel();
428+
if (subchannel != null) {
429+
if (subchannel instanceof ClusterImplLbHelper.ClusterImplSubchannel) {
430+
subchannel = ((ClusterImplLbHelper.ClusterImplSubchannel) subchannel).delegate;
431+
result = result.withSubchannelReplacement(subchannel);
432+
}
433+
}
414434
if (result.getStatus().isOk() && result.getSubchannel() != null) {
415435
if (enableCircuitBreaking) {
416436
if (inFlights.get() >= maxConcurrentRequests) {
@@ -437,8 +457,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
437457
stats, inFlights, result.getStreamTracerFactory());
438458
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
439459
.newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
440-
result = PickResult.withSubchannel(result.getSubchannel(),
441-
orcaTracerFactory);
460+
result = result.withStreamTracerFactory(orcaTracerFactory);
442461
}
443462
}
444463
if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null

xds/src/main/java/io/grpc/xds/WeightedRoundRobinLoadBalancer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -508,12 +508,15 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
508508
if (subchannel == null) {
509509
return pickResult;
510510
}
511+
512+
subchannel = ((WrrSubchannel) subchannel).delegate();
511513
if (!enableOobLoadReport) {
512-
return PickResult.withSubchannel(subchannel,
513-
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
514-
reportListeners.get(pick)));
514+
return pickResult.withSubchannelReplacement(subchannel)
515+
.withStreamTracerFactory(
516+
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
517+
reportListeners.get(pick)));
515518
} else {
516-
return PickResult.withSubchannel(subchannel);
519+
return pickResult.withSubchannelReplacement(subchannel);
517520
}
518521
}
519522

xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@
3636
import io.grpc.ChannelLogger;
3737
import io.grpc.ChannelLogger.ChannelLogLevel;
3838
import io.grpc.ClientCall;
39+
import io.grpc.ConnectivityState;
3940
import io.grpc.ConnectivityStateInfo;
4041
import io.grpc.ExperimentalApi;
4142
import io.grpc.LoadBalancer;
4243
import io.grpc.LoadBalancer.CreateSubchannelArgs;
4344
import io.grpc.LoadBalancer.Helper;
45+
import io.grpc.LoadBalancer.PickResult;
46+
import io.grpc.LoadBalancer.PickSubchannelArgs;
4447
import io.grpc.LoadBalancer.Subchannel;
48+
import io.grpc.LoadBalancer.SubchannelPicker;
4549
import io.grpc.LoadBalancer.SubchannelStateListener;
4650
import io.grpc.Metadata;
4751
import io.grpc.Status;
@@ -236,6 +240,29 @@ protected Helper delegate() {
236240
return delegate;
237241
}
238242

243+
@Override
244+
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
245+
delegate.updateBalancingState(newState, new OrcaOobPicker(newPicker));
246+
}
247+
248+
private static final class OrcaOobPicker extends SubchannelPicker {
249+
private final SubchannelPicker delegate;
250+
251+
OrcaOobPicker(SubchannelPicker delegate) {
252+
this.delegate = delegate;
253+
}
254+
255+
@Override
256+
public PickResult pickSubchannel(PickSubchannelArgs args) {
257+
PickResult result = delegate.pickSubchannel(args);
258+
Subchannel subchannel = result.getSubchannel();
259+
if (subchannel instanceof SubchannelImpl) {
260+
return result.withSubchannelReplacement(((SubchannelImpl) subchannel).delegate());
261+
}
262+
return result;
263+
}
264+
}
265+
239266
@Override
240267
public Subchannel createSubchannel(CreateSubchannelArgs args) {
241268
syncContext.throwIfNotInThisSynchronizationContext();

0 commit comments

Comments
 (0)