Skip to content
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public abstract class LoadBalancer {
public static final Attributes.Key<Boolean> IS_PETIOLE_POLICY =
Attributes.Key.create("io.grpc.IS_PETIOLE_POLICY");

/**
* The name of the locality that this EquivalentAddressGroup is in.
*/
public static final Attributes.Key<String> ATTR_LOCALITY_NAME =
Attributes.Key.create("io.grpc.lb.locality");

/**
* A picker that always returns an erring pick.
*
Expand Down
32 changes: 32 additions & 0 deletions api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.util.List;

/**
* Represents a long-valued up down counter metric instrument.
*/
@Internal
public final class LongUpDownCounterMetricInstrument extends PartialMetricInstrument {
public LongUpDownCounterMetricInstrument(int index, String name, String description, String unit,
List<String> requiredLabelKeys,
List<String> optionalLabelKeys,
boolean enableByDefault) {
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
}
}
41 changes: 41 additions & 0 deletions api/src/main/java/io/grpc/MetricInstrumentRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,47 @@ public LongCounterMetricInstrument registerLongCounter(String name,
}
}

/**
* Registers a new Long Up Down Counter metric instrument.
*
* @param name the name of the metric
* @param description a description of the metric
* @param unit the unit of measurement for the metric
* @param requiredLabelKeys a list of required label keys
* @param optionalLabelKeys a list of optional label keys
* @param enableByDefault whether the metric should be enabled by default
* @return the newly created LongUpDownCounterMetricInstrument
* @throws IllegalStateException if a metric with the same name already exists
*/
public LongUpDownCounterMetricInstrument registerLongUpDownCounter(String name,
String description,
String unit,
List<String> requiredLabelKeys,
List<String> optionalLabelKeys,
boolean enableByDefault) {
checkArgument(!Strings.isNullOrEmpty(name), "missing metric name");
checkNotNull(description, "description");
checkNotNull(unit, "unit");
checkNotNull(requiredLabelKeys, "requiredLabelKeys");
checkNotNull(optionalLabelKeys, "optionalLabelKeys");
synchronized (lock) {
if (registeredMetricNames.contains(name)) {
throw new IllegalStateException("Metric with name " + name + " already exists");
}
int index = nextAvailableMetricIndex;
if (index + 1 == metricInstruments.length) {
resizeMetricInstruments();
}
LongUpDownCounterMetricInstrument instrument = new LongUpDownCounterMetricInstrument(
index, name, description, unit, requiredLabelKeys, optionalLabelKeys,
enableByDefault);
metricInstruments[index] = instrument;
registeredMetricNames.add(name);
nextAvailableMetricIndex += 1;
return instrument;
}
}

/**
* Registers a new Double Histogram metric instrument.
*
Expand Down
25 changes: 24 additions & 1 deletion api/src/main/java/io/grpc/MetricRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do
* Adds a value for a long valued counter metric instrument.
*
* @param metricInstrument The counter metric instrument to add the value against.
* @param value The value to add.
* @param value The value to add. MUST be non-negative.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
Expand All @@ -66,6 +66,29 @@ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long v
metricInstrument.getOptionalLabelKeys().size());
}

/**
* Adds a value for a long valued up down counter metric instrument.
*
* @param metricInstrument The counter metric instrument to add the value against.
* @param value The value to add. May be positive, negative or zero.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument,
long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
checkArgument(requiredLabelValues != null
&& requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(),
"Incorrect number of required labels provided. Expected: %s",
metricInstrument.getRequiredLabelKeys().size());
checkArgument(optionalLabelValues != null
&& optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(),
"Incorrect number of optional labels provided. Expected: %s",
metricInstrument.getOptionalLabelKeys().size());
}


/**
* Records a value for a double-precision histogram metric instrument.
*
Expand Down
18 changes: 16 additions & 2 deletions api/src/main/java/io/grpc/MetricSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,26 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do
* Adds a value for a long valued counter metric associated with specified metric instrument.
*
* @param metricInstrument The counter metric instrument identifies metric measure to add.
* @param value The value to record.
* @param value The value to record. MUST be non-negative.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void addLongCounter(LongCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues) {
List<String> requiredLabelValues, List<String> optionalLabelValues) {
}

/**
* Adds a value for a long valued up down counter metric associated with specified metric
* instrument.
*
* @param metricInstrument The counter metric instrument identifies metric measure to add.
* @param value The value to record. May be positive, negative or zero.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ public ListenableFuture<SocketStats> getStats() {
}

/**
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
* more buffered streams.
*/
@Override
Expand Down
71 changes: 69 additions & 2 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver;
import io.grpc.SecurityLevel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand Down Expand Up @@ -160,6 +163,8 @@ protected void handleNotInUse() {
private Status shutdownReason;

private volatile Attributes connectedAddressAttributes;
private final SubchannelMetrics subchannelMetrics;
private final String target;

InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
Expand All @@ -168,7 +173,9 @@ protected void handleNotInUse() {
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
ChannelTracer channelTracer, InternalLogId logId,
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
String target,
MetricRecorder metricRecorder) {
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
Expand All @@ -192,6 +199,8 @@ protected void handleNotInUse() {
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters;
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
this.target = target;
this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
}

ChannelLogger getChannelLogger() {
Expand Down Expand Up @@ -593,6 +602,15 @@ public void run() {
pendingTransport = null;
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
gotoNonErrorState(READY);
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null,
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
}
}
});
Expand All @@ -618,11 +636,27 @@ public void run() {
activeTransport = null;
addressIndex.reset();
gotoNonErrorState(IDLE);
subchannelMetrics.recordDisconnection(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
"Peer Pressure",
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
} else if (pendingTransport == transport) {
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null, null
));
Preconditions.checkState(state.getState() == CONNECTING,
"Expected state is CONNECTING, actual state is %s", state.getState());
addressIndex.increment();
// Continue reconnect if there are still addresses to try.
// Continue reconnecting with remaining addresses.
if (!addressIndex.isValid()) {
pendingTransport = null;
addressIndex.reset();
Expand Down Expand Up @@ -658,6 +692,27 @@ public void run() {
}
});
}

private String extractSecurityLevel(SecurityLevel securityLevel) {
if (securityLevel == null) {
return "none";
}
switch (securityLevel) {
case NONE:
return "none";
case INTEGRITY:
return "integrity_only";
case PRIVACY_AND_INTEGRITY:
return "privacy_and_integrity";
default:
throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);
}
}

private String getAttributeOrDefault(Attributes attributes, Attributes.Key<String> key) {
String value = attributes.get(key);
return value == null ? "" : value;
}
}

// All methods are called in syncContext
Expand Down Expand Up @@ -817,6 +872,18 @@ private String printShortStatus(Status status) {
return buffer.toString();
}

private OtelMetricsAttributes buildLabelSet(String backendService, String locality,
String disconnectError, String securityLevel) {
return new OtelMetricsAttributes(
target,
backendService,
locality,
disconnectError,
securityLevel
);
}


@VisibleForTesting
static final class TransportLogger extends ChannelLogger {
// Changed just after construction to break a cyclic dependency.
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void exitIdleMode() {
LbHelperImpl lbHelper = new LbHelperImpl();
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
// Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
// may throw. We don't want to confuse our state, even if we will enter panic mode.
// may throw. We don't want to confuse our state, even if we enter panic mode.
this.lbHelper = lbHelper;

channelStateManager.gotoState(CONNECTING);
Expand Down Expand Up @@ -1464,7 +1464,9 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
subchannelTracer,
subchannelLogId,
subchannelLogger,
transportFilters);
transportFilters,
target,
lbHelper.getMetricRecorder());
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
Expand Down Expand Up @@ -1895,7 +1897,8 @@ void onNotInUse(InternalSubchannel is) {
subchannelTracer,
subchannelLogId,
subchannelLogger,
transportFilters);
transportFilters, target,
lbHelper.getMetricRecorder());

channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel started")
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/io/grpc/internal/MetricRecorderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.LongHistogramMetricInstrument;
import io.grpc.LongUpDownCounterMetricInstrument;
import io.grpc.MetricInstrument;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, dou
* Records a long counter value.
*
* @param metricInstrument the {@link LongCounterMetricInstrument} to record.
* @param value the value to record.
* @param value the value to record. Must be non-negative.
* @param requiredLabelValues the required label values for the metric.
* @param optionalLabelValues the optional label values for the metric.
*/
Expand All @@ -103,6 +104,32 @@ public void addLongCounter(LongCounterMetricInstrument metricInstrument, long va
}
}

/**
* Adds a long up down counter value.
*
* @param metricInstrument the {@link io.grpc.LongUpDownCounterMetricInstrument} to record.
* @param value the value to record. May be positive, negative or zero.
* @param requiredLabelValues the required label values for the metric.
* @param optionalLabelValues the optional label values for the metric.
*/
@Override
public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
MetricRecorder.super.addLongUpDownCounter(metricInstrument, value, requiredLabelValues,
optionalLabelValues);
for (MetricSink sink : metricSinks) {
int measuresSize = sink.getMeasuresSize();
if (measuresSize <= metricInstrument.getIndex()) {
// Measures may need updating in two cases:
// 1. When the sink is initially created with an empty list of measures.
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
sink.updateMeasures(registry.getMetricInstruments());
}
sink.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
}
}

/**
* Records a double histogram value.
*
Expand Down
Loading
Loading