Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public HeartbeatMonitoringService(
regionNames),
leaderHeartbeatTimeStamps,
followerHeartbeatTimeStamps,
serverConfig.getClusterName());
serverConfig.getClusterName(),
localRegionName);
this.heartbeatMonitoringServiceStats = heartbeatMonitoringServiceStats;
this.customizedViewRepositoryFuture = customizedViewRepositoryFuture;
this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats<Hea
private final Map<String, HeartbeatOtelStats> heartbeatOtelStatsMap;
private final Map<String, RecordLevelDelayOtelStats> recordLevelDelayOtelStatsMap;
private final String clusterName;
private final String localRegionName;

// Time supplier for testability: defaults to System.currentTimeMillis()
private Supplier<Long> currentTimeSupplier = System::currentTimeMillis;
Expand All @@ -41,11 +42,13 @@ public HeartbeatVersionedStats(
StatsSupplier<HeartbeatStatReporter> reporterSupplier,
Map<HeartbeatKey, IngestionTimestampEntry> leaderMonitors,
Map<HeartbeatKey, IngestionTimestampEntry> followerMonitors,
String clusterName) {
String clusterName,
String localRegionName) {
super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true);
this.leaderMonitors = leaderMonitors;
this.followerMonitors = followerMonitors;
this.clusterName = clusterName;
this.localRegionName = localRegionName;
this.heartbeatOtelStatsMap = new VeniceConcurrentHashMap<>();
this.recordLevelDelayOtelStatsMap = new VeniceConcurrentHashMap<>();
}
Expand Down Expand Up @@ -228,16 +231,28 @@ private HeartbeatOtelStats getOrCreateHeartbeatOtelStats(String storeName) {
});
}

/** Same pattern as {@link #getOrCreateHeartbeatOtelStats}. */
/**
* Same pattern as {@link #getOrCreateHeartbeatOtelStats}. Additionally looks up the store to
* determine SLO classification dimensions (write compute status, chunking status).
*/
private RecordLevelDelayOtelStats getOrCreateRecordLevelDelayOtelStats(String storeName) {
RecordLevelDelayOtelStats existing = recordLevelDelayOtelStatsMap.get(storeName);
if (existing != null) {
return existing;
}
int currentVersion = getCurrentVersion(storeName);
int futureVersion = getFutureVersion(storeName);
Store store = metadataRepository.getStore(storeName);
boolean partialUpdateEnabled = store != null && store.isWriteComputationEnabled();
boolean chunkingEnabled = store != null && store.isChunkingEnabled();
Comment on lines +245 to +247
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunkingEnabled for the SLO dimension is derived from store.isChunkingEnabled(), but the PR description states the source should be version.isChunkingEnabled(). Since chunking is a version-level attribute (and can diverge from the store-level flag for existing versions), this may misclassify the metric for CURRENT/FUTURE roles. Consider deriving chunking status from the relevant Version object(s) (e.g., current/future version) or otherwise align the implementation/description so the emitted dimension reflects the intended semantics.

Copilot uses AI. Check for mistakes.
return recordLevelDelayOtelStatsMap.computeIfAbsent(storeName, key -> {
RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats(getMetricsRepository(), storeName, clusterName);
RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats(
getMetricsRepository(),
storeName,
clusterName,
localRegionName,
partialUpdateEnabled,
chunkingEnabled);
stats.updateVersionInfo(currentVersion, futureVersion);
return stats;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE;
import static com.linkedin.venice.utils.Utils.setOf;

Expand All @@ -28,9 +31,12 @@ public enum RecordLevelDelayOtelMetricEntity implements ModuleMetricEntityInterf
VENICE_STORE_NAME,
VENICE_CLUSTER_NAME,
VENICE_REGION_NAME,
VENICE_REGION_LOCALITY,
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)
VENICE_REPLICA_STATE,
VENICE_STORE_WRITE_TYPE,
VENICE_CHUNKING_STATUS)
);

private final MetricEntity metricEntity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository;
import com.linkedin.venice.stats.dimensions.ReplicaState;
import com.linkedin.venice.stats.dimensions.ReplicaType;
import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus;
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import com.linkedin.venice.stats.dimensions.VeniceRegionLocality;
import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType;
import com.linkedin.venice.stats.metrics.MetricEntityStateThreeEnums;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
Expand All @@ -29,6 +32,9 @@ public class RecordLevelDelayOtelStats {
private final VeniceOpenTelemetryMetricsRepository otelRepository;
private final Map<VeniceMetricsDimensions, String> baseDimensionsMap;

/** Local region name for computing region locality (LOCAL vs REMOTE). */
private final String localRegionName;

/**
* Per-region metric entity states, keyed by region name. Grows lazily via {@code computeIfAbsent} and is bounded
* by the number of distinct regions in the deployment. Entries are not evicted individually.
Expand All @@ -38,8 +44,15 @@ public class RecordLevelDelayOtelStats {
// Version info cache for classifying versions as CURRENT/FUTURE/BACKUP
private volatile VersionInfo versionInfo = new VersionInfo(NON_EXISTING_VERSION, NON_EXISTING_VERSION);

public RecordLevelDelayOtelStats(MetricsRepository metricsRepository, String storeName, String clusterName) {
public RecordLevelDelayOtelStats(
MetricsRepository metricsRepository,
String storeName,
String clusterName,
String localRegionName,
boolean partialUpdateEnabled,
boolean chunkingEnabled) {
this.metricsByRegion = new VeniceConcurrentHashMap<>();
this.localRegionName = localRegionName;

OpenTelemetryMetricsSetup.OpenTelemetryMetricsSetupInfo otelSetup =
OpenTelemetryMetricsSetup.builder(metricsRepository)
Expand All @@ -49,7 +62,14 @@ public RecordLevelDelayOtelStats(MetricsRepository metricsRepository, String sto

this.emitOtelMetrics = otelSetup.emitOpenTelemetryMetrics();
this.otelRepository = otelSetup.getOtelRepository();
this.baseDimensionsMap = otelSetup.getBaseDimensionsMap();

// Start with base dimensions (store name, cluster name) and add SLO classification dimensions
this.baseDimensionsMap = new HashMap<>(otelSetup.getBaseDimensionsMap());
VeniceStoreWriteType writeType =
partialUpdateEnabled ? VeniceStoreWriteType.PARTIAL_UPDATE : VeniceStoreWriteType.REGULAR_PUT;
this.baseDimensionsMap.put(VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE, writeType.getDimensionValue());
VeniceChunkingStatus chunkStatus = chunkingEnabled ? VeniceChunkingStatus.CHUNKED : VeniceChunkingStatus.UNCHUNKED;
this.baseDimensionsMap.put(VeniceMetricsDimensions.VENICE_CHUNKING_STATUS, chunkStatus.getDimensionValue());
}

/**
Expand Down Expand Up @@ -97,13 +117,18 @@ public void recordRecordDelayOtelMetrics(
}

/**
* Gets or creates a metric entity state for a specific region.
* Gets or creates a metric entity state for a specific region. Region locality (LOCAL vs REMOTE)
* is computed once per region at creation time by comparing the record's source region against
* this server's local region.
*/
private MetricEntityStateThreeEnums<VersionRole, ReplicaType, ReplicaState> getOrCreateMetricState(String region) {
return metricsByRegion.computeIfAbsent(region, r -> {
// Add region to base dimensions
// Add region name and locality to base dimensions
Map<VeniceMetricsDimensions, String> regionBaseDimensions = new HashMap<>(baseDimensionsMap);
regionBaseDimensions.put(VeniceMetricsDimensions.VENICE_REGION_NAME, r);
VeniceRegionLocality locality =
r.equals(localRegionName) ? VeniceRegionLocality.LOCAL : VeniceRegionLocality.REMOTE;
regionBaseDimensions.put(VeniceMetricsDimensions.VENICE_REGION_LOCALITY, locality.getDimensionValue());

return MetricEntityStateThreeEnums.create(
INGESTION_RECORD_DELAY.getMetricEntity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import static com.linkedin.davinci.stats.ServerMetricEntity.SERVER_METRIC_ENTITIES;
import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatOtelMetricEntity.INGESTION_HEARTBEAT_DELAY;
import static com.linkedin.davinci.stats.ingestion.heartbeat.RecordLevelDelayOtelMetricEntity.INGESTION_RECORD_DELAY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE;
import static com.linkedin.venice.utils.OpenTelemetryDataTestUtils.validateExponentialHistogramPointData;
import static org.mockito.Mockito.mock;
Expand All @@ -28,6 +31,9 @@
import com.linkedin.venice.stats.VeniceMetricsRepository;
import com.linkedin.venice.stats.dimensions.ReplicaState;
import com.linkedin.venice.stats.dimensions.ReplicaType;
import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus;
import com.linkedin.venice.stats.dimensions.VeniceRegionLocality;
import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.opentelemetry.api.common.Attributes;
Expand Down Expand Up @@ -92,7 +98,11 @@ public void setUp() {
versions.add(futureVersion);
when(mockStore.getVersions()).thenReturn(versions);

when(mockStore.isWriteComputationEnabled()).thenReturn(false);
when(mockStore.isChunkingEnabled()).thenReturn(false);

when(mockMetadataRepository.getStoreOrThrow(STORE_NAME)).thenReturn(mockStore);
when(mockMetadataRepository.getStore(STORE_NAME)).thenReturn(mockStore);
when(mockMetadataRepository.hasStore(STORE_NAME)).thenReturn(true);
when(mockMetadataRepository.getAllStores()).thenReturn(Collections.singletonList(mockStore));

Expand All @@ -117,7 +127,8 @@ public void setUp() {
reporterSupplier,
leaderMonitors,
followerMonitors,
CLUSTER_NAME);
CLUSTER_NAME,
REGION);
}

@AfterMethod
Expand Down Expand Up @@ -431,6 +442,24 @@ public void testRecordFollowerRecordLag(boolean isReadyToServe) {
isReadyToServe ? 0.0 : 450.0);
}

private Attributes buildRecordLevelAttributes(ReplicaType replicaType, ReplicaState replicaState) {
return Attributes.builder()
.put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME)
.put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME)
.put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION)
.put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.LOCAL.getDimensionValue())
.put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue())
.put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), replicaType.getDimensionValue())
.put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), replicaState.getDimensionValue())
.put(
VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(),
VeniceStoreWriteType.REGULAR_PUT.getDimensionValue())
.put(
VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(),
VeniceChunkingStatus.UNCHUNKED.getDimensionValue())
.build();
}

private void validateRecordOtelHistogram(
ReplicaType replicaType,
ReplicaState replicaState,
Expand All @@ -444,7 +473,7 @@ private void validateRecordOtelHistogram(
expectedMax,
expectedCount,
expectedSum,
buildAttributes(replicaType, replicaState),
buildRecordLevelAttributes(replicaType, replicaState),
INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(),
TEST_PREFIX);
}
Expand Down Expand Up @@ -832,24 +861,8 @@ public void testVersionRoleTaggingInRecordLevelDelayMetrics() {
// Record for current version
heartbeatVersionedStats.recordLeaderRecordLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 100);

// Verify CURRENT role tagging
Attributes currentAttributes = Attributes.builder()
.put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME)
.put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME)
.put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION)
.put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue())
.put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue())
.put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue())
.build();
validateExponentialHistogramPointData(
inMemoryMetricReader,
100.0,
100.0,
1,
100.0,
currentAttributes,
INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(),
TEST_PREFIX);
// Verify CURRENT role tagging (includes new SLO dimensions)
validateRecordOtelHistogram(ReplicaType.LEADER, ReplicaState.READY_TO_SERVE, 100.0, 100.0, 1, 100.0);

// Record for future version
heartbeatVersionedStats.recordLeaderRecordLag(STORE_NAME, FUTURE_VERSION, REGION, FIXED_CURRENT_TIME - 200);
Expand All @@ -859,9 +872,16 @@ public void testVersionRoleTaggingInRecordLevelDelayMetrics() {
.put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME)
.put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME)
.put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION)
.put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.LOCAL.getDimensionValue())
.put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.FUTURE.getDimensionValue())
.put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue())
.put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue())
.put(
VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(),
VeniceStoreWriteType.REGULAR_PUT.getDimensionValue())
.put(
VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(),
VeniceChunkingStatus.UNCHUNKED.getDimensionValue())
.build();
validateExponentialHistogramPointData(
inMemoryMetricReader,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.linkedin.davinci.stats.ingestion.heartbeat;

import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE;
import static com.linkedin.venice.utils.Utils.setOf;

Expand Down Expand Up @@ -36,9 +39,12 @@ private static Map<RecordLevelDelayOtelMetricEntity, MetricEntityExpectation> ex
VENICE_STORE_NAME,
VENICE_CLUSTER_NAME,
VENICE_REGION_NAME,
VENICE_REGION_LOCALITY,
VENICE_VERSION_ROLE,
VENICE_REPLICA_TYPE,
VENICE_REPLICA_STATE)));
VENICE_REPLICA_STATE,
VENICE_STORE_WRITE_TYPE,
VENICE_CHUNKING_STATUS)));
return map;
}
}
Loading
Loading