Skip to content

Commit d4e1b69

Browse files
authored
otel: subchannel metrics A94 (#12202)
Implements [A94](https://github.com/grpc/proposal/pull/485/files) except for the exact reason for disconnect_error
1 parent 5a54372 commit d4e1b69

21 files changed

+678
-30
lines changed

api/src/main/java/io/grpc/EquivalentAddressGroup.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public final class EquivalentAddressGroup {
5050
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6138")
5151
public static final Attributes.Key<String> ATTR_AUTHORITY_OVERRIDE =
5252
Attributes.Key.create("io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE");
53+
/**
54+
* The name of the locality that this EquivalentAddressGroup is in.
55+
*/
56+
public static final Attributes.Key<String> ATTR_LOCALITY_NAME =
57+
Attributes.Key.create("io.grpc.EquivalentAddressGroup.LOCALITY");
5358
private final List<SocketAddress> addrs;
5459
private final Attributes attrs;
5560

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2025 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import java.util.List;
20+
21+
/**
22+
* Represents a long-valued up down counter metric instrument.
23+
*/
24+
@Internal
25+
public final class LongUpDownCounterMetricInstrument extends PartialMetricInstrument {
26+
public LongUpDownCounterMetricInstrument(int index, String name, String description, String unit,
27+
List<String> requiredLabelKeys,
28+
List<String> optionalLabelKeys,
29+
boolean enableByDefault) {
30+
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
31+
}
32+
}

api/src/main/java/io/grpc/MetricInstrumentRegistry.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,47 @@ public LongCounterMetricInstrument registerLongCounter(String name,
144144
}
145145
}
146146

147+
/**
148+
* Registers a new Long Up Down Counter metric instrument.
149+
*
150+
* @param name the name of the metric
151+
* @param description a description of the metric
152+
* @param unit the unit of measurement for the metric
153+
* @param requiredLabelKeys a list of required label keys
154+
* @param optionalLabelKeys a list of optional label keys
155+
* @param enableByDefault whether the metric should be enabled by default
156+
* @return the newly created LongUpDownCounterMetricInstrument
157+
* @throws IllegalStateException if a metric with the same name already exists
158+
*/
159+
public LongUpDownCounterMetricInstrument registerLongUpDownCounter(String name,
160+
String description,
161+
String unit,
162+
List<String> requiredLabelKeys,
163+
List<String> optionalLabelKeys,
164+
boolean enableByDefault) {
165+
checkArgument(!Strings.isNullOrEmpty(name), "missing metric name");
166+
checkNotNull(description, "description");
167+
checkNotNull(unit, "unit");
168+
checkNotNull(requiredLabelKeys, "requiredLabelKeys");
169+
checkNotNull(optionalLabelKeys, "optionalLabelKeys");
170+
synchronized (lock) {
171+
if (registeredMetricNames.contains(name)) {
172+
throw new IllegalStateException("Metric with name " + name + " already exists");
173+
}
174+
int index = nextAvailableMetricIndex;
175+
if (index + 1 == metricInstruments.length) {
176+
resizeMetricInstruments();
177+
}
178+
LongUpDownCounterMetricInstrument instrument = new LongUpDownCounterMetricInstrument(
179+
index, name, description, unit, requiredLabelKeys, optionalLabelKeys,
180+
enableByDefault);
181+
metricInstruments[index] = instrument;
182+
registeredMetricNames.add(name);
183+
nextAvailableMetricIndex += 1;
184+
return instrument;
185+
}
186+
}
187+
147188
/**
148189
* Registers a new Double Histogram metric instrument.
149190
*

api/src/main/java/io/grpc/MetricRecorder.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do
5050
* Adds a value for a long valued counter metric instrument.
5151
*
5252
* @param metricInstrument The counter metric instrument to add the value against.
53-
* @param value The value to add.
53+
* @param value The value to add. MUST be non-negative.
5454
* @param requiredLabelValues A list of required label values for the metric.
5555
* @param optionalLabelValues A list of additional, optional label values for the metric.
5656
*/
@@ -66,6 +66,29 @@ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long v
6666
metricInstrument.getOptionalLabelKeys().size());
6767
}
6868

69+
/**
70+
* Adds a value for a long valued up down counter metric instrument.
71+
*
72+
* @param metricInstrument The counter metric instrument to add the value against.
73+
* @param value The value to add. May be positive, negative or zero.
74+
* @param requiredLabelValues A list of required label values for the metric.
75+
* @param optionalLabelValues A list of additional, optional label values for the metric.
76+
*/
77+
default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument,
78+
long value,
79+
List<String> requiredLabelValues,
80+
List<String> optionalLabelValues) {
81+
checkArgument(requiredLabelValues != null
82+
&& requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(),
83+
"Incorrect number of required labels provided. Expected: %s",
84+
metricInstrument.getRequiredLabelKeys().size());
85+
checkArgument(optionalLabelValues != null
86+
&& optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(),
87+
"Incorrect number of optional labels provided. Expected: %s",
88+
metricInstrument.getOptionalLabelKeys().size());
89+
}
90+
91+
6992
/**
7093
* Records a value for a double-precision histogram metric instrument.
7194
*

api/src/main/java/io/grpc/MetricSink.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,26 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do
6565
* Adds a value for a long valued counter metric associated with specified metric instrument.
6666
*
6767
* @param metricInstrument The counter metric instrument identifies metric measure to add.
68-
* @param value The value to record.
68+
* @param value The value to record. MUST be non-negative.
6969
* @param requiredLabelValues A list of required label values for the metric.
7070
* @param optionalLabelValues A list of additional, optional label values for the metric.
7171
*/
7272
default void addLongCounter(LongCounterMetricInstrument metricInstrument, long value,
73-
List<String> requiredLabelValues, List<String> optionalLabelValues) {
73+
List<String> requiredLabelValues, List<String> optionalLabelValues) {
74+
}
75+
76+
/**
77+
* Adds a value for a long valued up down counter metric associated with specified metric
78+
* instrument.
79+
*
80+
* @param metricInstrument The counter metric instrument identifies metric measure to add.
81+
* @param value The value to record. May be positive, negative or zero.
82+
* @param requiredLabelValues A list of required label values for the metric.
83+
* @param optionalLabelValues A list of additional, optional label values for the metric.
84+
*/
85+
default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
86+
List<String> requiredLabelValues,
87+
List<String> optionalLabelValues) {
7488
}
7589

7690
/**

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
import io.grpc.LoadBalancer;
4949
import io.grpc.Metadata;
5050
import io.grpc.MethodDescriptor;
51+
import io.grpc.MetricRecorder;
52+
import io.grpc.NameResolver;
53+
import io.grpc.SecurityLevel;
5154
import io.grpc.Status;
5255
import io.grpc.SynchronizationContext;
5356
import io.grpc.SynchronizationContext.ScheduledHandle;
@@ -160,6 +163,8 @@ protected void handleNotInUse() {
160163
private Status shutdownReason;
161164

162165
private volatile Attributes connectedAddressAttributes;
166+
private final SubchannelMetrics subchannelMetrics;
167+
private final String target;
163168

164169
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
165170
BackoffPolicy.Provider backoffPolicyProvider,
@@ -168,7 +173,9 @@ protected void handleNotInUse() {
168173
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
169174
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
170175
ChannelTracer channelTracer, InternalLogId logId,
171-
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
176+
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
177+
String target,
178+
MetricRecorder metricRecorder) {
172179
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
173180
Preconditions.checkNotNull(addressGroups, "addressGroups");
174181
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
@@ -192,6 +199,8 @@ protected void handleNotInUse() {
192199
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
193200
this.transportFilters = transportFilters;
194201
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
202+
this.target = target;
203+
this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
195204
}
196205

197206
ChannelLogger getChannelLogger() {
@@ -593,6 +602,13 @@ public void run() {
593602
pendingTransport = null;
594603
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
595604
gotoNonErrorState(READY);
605+
subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target,
606+
/* backendService= */ getAttributeOrDefault(
607+
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
608+
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
609+
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
610+
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
611+
.get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
596612
}
597613
}
598614
});
@@ -618,11 +634,25 @@ public void run() {
618634
activeTransport = null;
619635
addressIndex.reset();
620636
gotoNonErrorState(IDLE);
637+
subchannelMetrics.recordDisconnection(/* target= */ target,
638+
/* backendService= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
639+
NameResolver.ATTR_BACKEND_SERVICE),
640+
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
641+
EquivalentAddressGroup.ATTR_LOCALITY_NAME),
642+
/* disconnectError= */ SubchannelMetrics.DisconnectError.UNKNOWN
643+
.getErrorString(null),
644+
/* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
645+
.get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
621646
} else if (pendingTransport == transport) {
647+
subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target,
648+
/* backendService= */getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
649+
NameResolver.ATTR_BACKEND_SERVICE),
650+
/* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
651+
EquivalentAddressGroup.ATTR_LOCALITY_NAME));
622652
Preconditions.checkState(state.getState() == CONNECTING,
623653
"Expected state is CONNECTING, actual state is %s", state.getState());
624654
addressIndex.increment();
625-
// Continue reconnect if there are still addresses to try.
655+
// Continue to reconnect if there are still addresses to try.
626656
if (!addressIndex.isValid()) {
627657
pendingTransport = null;
628658
addressIndex.reset();
@@ -658,6 +688,27 @@ public void run() {
658688
}
659689
});
660690
}
691+
692+
private String extractSecurityLevel(SecurityLevel securityLevel) {
693+
if (securityLevel == null) {
694+
return "none";
695+
}
696+
switch (securityLevel) {
697+
case NONE:
698+
return "none";
699+
case INTEGRITY:
700+
return "integrity_only";
701+
case PRIVACY_AND_INTEGRITY:
702+
return "privacy_and_integrity";
703+
default:
704+
throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);
705+
}
706+
}
707+
708+
private String getAttributeOrDefault(Attributes attributes, Attributes.Key<String> key) {
709+
String value = attributes.get(key);
710+
return value == null ? "" : value;
711+
}
661712
}
662713

663714
// All methods are called in syncContext

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ void exitIdleMode() {
415415
LbHelperImpl lbHelper = new LbHelperImpl();
416416
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
417417
// Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
418-
// may throw. We don't want to confuse our state, even if we will enter panic mode.
418+
// may throw. We don't want to confuse our state, even if we enter panic mode.
419419
this.lbHelper = lbHelper;
420420

421421
channelStateManager.gotoState(CONNECTING);
@@ -1464,7 +1464,9 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
14641464
subchannelTracer,
14651465
subchannelLogId,
14661466
subchannelLogger,
1467-
transportFilters);
1467+
transportFilters,
1468+
target,
1469+
lbHelper.getMetricRecorder());
14681470
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
14691471
.setDescription("Child Subchannel created")
14701472
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
@@ -1895,7 +1897,8 @@ void onNotInUse(InternalSubchannel is) {
18951897
subchannelTracer,
18961898
subchannelLogId,
18971899
subchannelLogger,
1898-
transportFilters);
1900+
transportFilters, target,
1901+
lbHelper.getMetricRecorder());
18991902

19001903
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
19011904
.setDescription("Child Subchannel started")

core/src/main/java/io/grpc/internal/MetricRecorderImpl.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.grpc.LongCounterMetricInstrument;
2727
import io.grpc.LongGaugeMetricInstrument;
2828
import io.grpc.LongHistogramMetricInstrument;
29+
import io.grpc.LongUpDownCounterMetricInstrument;
2930
import io.grpc.MetricInstrument;
3031
import io.grpc.MetricInstrumentRegistry;
3132
import io.grpc.MetricRecorder;
@@ -82,7 +83,7 @@ public void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, dou
8283
* Records a long counter value.
8384
*
8485
* @param metricInstrument the {@link LongCounterMetricInstrument} to record.
85-
* @param value the value to record.
86+
* @param value the value to record. Must be non-negative.
8687
* @param requiredLabelValues the required label values for the metric.
8788
* @param optionalLabelValues the optional label values for the metric.
8889
*/
@@ -103,6 +104,32 @@ public void addLongCounter(LongCounterMetricInstrument metricInstrument, long va
103104
}
104105
}
105106

107+
/**
108+
* Adds a long up down counter value.
109+
*
110+
* @param metricInstrument the {@link io.grpc.LongUpDownCounterMetricInstrument} to record.
111+
* @param value the value to record. May be positive, negative or zero.
112+
* @param requiredLabelValues the required label values for the metric.
113+
* @param optionalLabelValues the optional label values for the metric.
114+
*/
115+
@Override
116+
public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
117+
List<String> requiredLabelValues,
118+
List<String> optionalLabelValues) {
119+
MetricRecorder.super.addLongUpDownCounter(metricInstrument, value, requiredLabelValues,
120+
optionalLabelValues);
121+
for (MetricSink sink : metricSinks) {
122+
int measuresSize = sink.getMeasuresSize();
123+
if (measuresSize <= metricInstrument.getIndex()) {
124+
// Measures may need updating in two cases:
125+
// 1. When the sink is initially created with an empty list of measures.
126+
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
127+
sink.updateMeasures(registry.getMetricInstruments());
128+
}
129+
sink.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
130+
}
131+
}
132+
106133
/**
107134
* Records a double histogram value.
108135
*

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
9292
return Status.FAILED_PRECONDITION.withDescription("Already shut down");
9393
}
9494

95-
// Cache whether or not this is a petiole policy, which is based off of an address attribute
95+
// Check whether this is a petiole policy, which is based off of an address attribute
9696
Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
9797
this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
9898

0 commit comments

Comments
 (0)