diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java index 9bae8ac196a..6054c3ed0c6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java @@ -112,7 +112,8 @@ public HeartbeatMonitoringService( regionNames), leaderHeartbeatTimeStamps, followerHeartbeatTimeStamps, - serverConfig.getClusterName()); + serverConfig.getClusterName(), + localRegionName); this.heartbeatMonitoringServiceStats = heartbeatMonitoringServiceStats; this.customizedViewRepositoryFuture = customizedViewRepositoryFuture; this.nodeId = Utils.getHelixNodeIdentifier(serverConfig.getListenerHostname(), serverConfig.getListenerPort()); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java index 4e90aa70ecd..48469370063 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStats.java @@ -30,6 +30,7 @@ public class HeartbeatVersionedStats extends AbstractVeniceAggVersionedStats heartbeatOtelStatsMap; private final Map recordLevelDelayOtelStatsMap; private final String clusterName; + private final String localRegionName; // Time supplier for testability: defaults to System.currentTimeMillis() private Supplier currentTimeSupplier = System::currentTimeMillis; @@ -41,11 +42,13 @@ public HeartbeatVersionedStats( StatsSupplier reporterSupplier, Map leaderMonitors, Map followerMonitors, - String clusterName) { + String clusterName, + String localRegionName) { super(metricsRepository, metadataRepository, statsInitiator, reporterSupplier, true); this.leaderMonitors = leaderMonitors; this.followerMonitors = followerMonitors; this.clusterName = clusterName; + this.localRegionName = localRegionName; this.heartbeatOtelStatsMap = new VeniceConcurrentHashMap<>(); this.recordLevelDelayOtelStatsMap = new VeniceConcurrentHashMap<>(); } @@ -228,7 +231,10 @@ private HeartbeatOtelStats getOrCreateHeartbeatOtelStats(String storeName) { }); } - /** Same pattern as {@link #getOrCreateHeartbeatOtelStats}. */ + /** + * Same pattern as {@link #getOrCreateHeartbeatOtelStats}. Additionally looks up the store to + * determine SLO classification dimensions (write compute status, chunking status). + */ private RecordLevelDelayOtelStats getOrCreateRecordLevelDelayOtelStats(String storeName) { RecordLevelDelayOtelStats existing = recordLevelDelayOtelStatsMap.get(storeName); if (existing != null) { @@ -236,8 +242,17 @@ private RecordLevelDelayOtelStats getOrCreateRecordLevelDelayOtelStats(String st } int currentVersion = getCurrentVersion(storeName); int futureVersion = getFutureVersion(storeName); + Store store = metadataRepository.getStore(storeName); + boolean partialUpdateEnabled = store != null && store.isWriteComputationEnabled(); + boolean chunkingEnabled = store != null && store.isChunkingEnabled(); return recordLevelDelayOtelStatsMap.computeIfAbsent(storeName, key -> { - RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats(getMetricsRepository(), storeName, clusterName); + RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats( + getMetricsRepository(), + storeName, + clusterName, + localRegionName, + partialUpdateEnabled, + chunkingEnabled); stats.updateVersionInfo(currentVersion, futureVersion); return stats; }); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntity.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntity.java index ad7bd287ad4..8febbe31421 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntity.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntity.java @@ -1,10 +1,13 @@ package com.linkedin.davinci.stats.ingestion.heartbeat; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; import static com.linkedin.venice.utils.Utils.setOf; @@ -28,9 +31,12 @@ public enum RecordLevelDelayOtelMetricEntity implements ModuleMetricEntityInterf VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_REGION_NAME, + VENICE_REGION_LOCALITY, VENICE_VERSION_ROLE, VENICE_REPLICA_TYPE, - VENICE_REPLICA_STATE) + VENICE_REPLICA_STATE, + VENICE_STORE_WRITE_TYPE, + VENICE_CHUNKING_STATUS) ); private final MetricEntity metricEntity; diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStats.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStats.java index 7d53f099d68..152e3082be4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStats.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStats.java @@ -11,7 +11,10 @@ import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository; import com.linkedin.venice.stats.dimensions.ReplicaState; import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus; import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions; +import com.linkedin.venice.stats.dimensions.VeniceRegionLocality; +import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType; import com.linkedin.venice.stats.metrics.MetricEntityStateThreeEnums; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.tehuti.metrics.MetricsRepository; @@ -29,6 +32,9 @@ public class RecordLevelDelayOtelStats { private final VeniceOpenTelemetryMetricsRepository otelRepository; private final Map baseDimensionsMap; + /** Local region name for computing region locality (LOCAL vs REMOTE). */ + private final String localRegionName; + /** * Per-region metric entity states, keyed by region name. Grows lazily via {@code computeIfAbsent} and is bounded * by the number of distinct regions in the deployment. Entries are not evicted individually. @@ -38,8 +44,15 @@ public class RecordLevelDelayOtelStats { // Version info cache for classifying versions as CURRENT/FUTURE/BACKUP private volatile VersionInfo versionInfo = new VersionInfo(NON_EXISTING_VERSION, NON_EXISTING_VERSION); - public RecordLevelDelayOtelStats(MetricsRepository metricsRepository, String storeName, String clusterName) { + public RecordLevelDelayOtelStats( + MetricsRepository metricsRepository, + String storeName, + String clusterName, + String localRegionName, + boolean partialUpdateEnabled, + boolean chunkingEnabled) { this.metricsByRegion = new VeniceConcurrentHashMap<>(); + this.localRegionName = localRegionName; OpenTelemetryMetricsSetup.OpenTelemetryMetricsSetupInfo otelSetup = OpenTelemetryMetricsSetup.builder(metricsRepository) @@ -49,7 +62,14 @@ public RecordLevelDelayOtelStats(MetricsRepository metricsRepository, String sto this.emitOtelMetrics = otelSetup.emitOpenTelemetryMetrics(); this.otelRepository = otelSetup.getOtelRepository(); - this.baseDimensionsMap = otelSetup.getBaseDimensionsMap(); + + // Start with base dimensions (store name, cluster name) and add SLO classification dimensions + this.baseDimensionsMap = new HashMap<>(otelSetup.getBaseDimensionsMap()); + VeniceStoreWriteType writeType = + partialUpdateEnabled ? VeniceStoreWriteType.PARTIAL_UPDATE : VeniceStoreWriteType.REGULAR_PUT; + this.baseDimensionsMap.put(VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE, writeType.getDimensionValue()); + VeniceChunkingStatus chunkStatus = chunkingEnabled ? VeniceChunkingStatus.CHUNKED : VeniceChunkingStatus.UNCHUNKED; + this.baseDimensionsMap.put(VeniceMetricsDimensions.VENICE_CHUNKING_STATUS, chunkStatus.getDimensionValue()); } /** @@ -97,13 +117,18 @@ public void recordRecordDelayOtelMetrics( } /** - * Gets or creates a metric entity state for a specific region. + * Gets or creates a metric entity state for a specific region. Region locality (LOCAL vs REMOTE) + * is computed once per region at creation time by comparing the record's source region against + * this server's local region. */ private MetricEntityStateThreeEnums getOrCreateMetricState(String region) { return metricsByRegion.computeIfAbsent(region, r -> { - // Add region to base dimensions + // Add region name and locality to base dimensions Map regionBaseDimensions = new HashMap<>(baseDimensionsMap); regionBaseDimensions.put(VeniceMetricsDimensions.VENICE_REGION_NAME, r); + VeniceRegionLocality locality = + r.equals(localRegionName) ? VeniceRegionLocality.LOCAL : VeniceRegionLocality.REMOTE; + regionBaseDimensions.put(VeniceMetricsDimensions.VENICE_REGION_LOCALITY, locality.getDimensionValue()); return MetricEntityStateThreeEnums.create( INGESTION_RECORD_DELAY.getMetricEntity(), diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java index 2794b1d6c23..b09df97a5aa 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatVersionedStatsTest.java @@ -3,11 +3,14 @@ import static com.linkedin.davinci.stats.ServerMetricEntity.SERVER_METRIC_ENTITIES; import static com.linkedin.davinci.stats.ingestion.heartbeat.HeartbeatOtelMetricEntity.INGESTION_HEARTBEAT_DELAY; import static com.linkedin.davinci.stats.ingestion.heartbeat.RecordLevelDelayOtelMetricEntity.INGESTION_RECORD_DELAY; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; import static com.linkedin.venice.utils.OpenTelemetryDataTestUtils.validateExponentialHistogramPointData; import static org.mockito.Mockito.mock; @@ -28,6 +31,9 @@ import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.stats.dimensions.ReplicaState; import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus; +import com.linkedin.venice.stats.dimensions.VeniceRegionLocality; +import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import io.opentelemetry.api.common.Attributes; @@ -92,7 +98,11 @@ public void setUp() { versions.add(futureVersion); when(mockStore.getVersions()).thenReturn(versions); + when(mockStore.isWriteComputationEnabled()).thenReturn(false); + when(mockStore.isChunkingEnabled()).thenReturn(false); + when(mockMetadataRepository.getStoreOrThrow(STORE_NAME)).thenReturn(mockStore); + when(mockMetadataRepository.getStore(STORE_NAME)).thenReturn(mockStore); when(mockMetadataRepository.hasStore(STORE_NAME)).thenReturn(true); when(mockMetadataRepository.getAllStores()).thenReturn(Collections.singletonList(mockStore)); @@ -117,7 +127,8 @@ public void setUp() { reporterSupplier, leaderMonitors, followerMonitors, - CLUSTER_NAME); + CLUSTER_NAME, + REGION); } @AfterMethod @@ -431,6 +442,24 @@ public void testRecordFollowerRecordLag(boolean isReadyToServe) { isReadyToServe ? 0.0 : 450.0); } + private Attributes buildRecordLevelAttributes(ReplicaType replicaType, ReplicaState replicaState) { + return Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.LOCAL.getDimensionValue()) + .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) + .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), replicaType.getDimensionValue()) + .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), replicaState.getDimensionValue()) + .put( + VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), + VeniceStoreWriteType.REGULAR_PUT.getDimensionValue()) + .put( + VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), + VeniceChunkingStatus.UNCHUNKED.getDimensionValue()) + .build(); + } + private void validateRecordOtelHistogram( ReplicaType replicaType, ReplicaState replicaState, @@ -444,7 +473,7 @@ private void validateRecordOtelHistogram( expectedMax, expectedCount, expectedSum, - buildAttributes(replicaType, replicaState), + buildRecordLevelAttributes(replicaType, replicaState), INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(), TEST_PREFIX); } @@ -832,24 +861,8 @@ public void testVersionRoleTaggingInRecordLevelDelayMetrics() { // Record for current version heartbeatVersionedStats.recordLeaderRecordLag(STORE_NAME, CURRENT_VERSION, REGION, FIXED_CURRENT_TIME - 100); - // Verify CURRENT role tagging - Attributes currentAttributes = Attributes.builder() - .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) - .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) - .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION) - .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) - .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue()) - .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue()) - .build(); - validateExponentialHistogramPointData( - inMemoryMetricReader, - 100.0, - 100.0, - 1, - 100.0, - currentAttributes, - INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(), - TEST_PREFIX); + // Verify CURRENT role tagging (includes new SLO dimensions) + validateRecordOtelHistogram(ReplicaType.LEADER, ReplicaState.READY_TO_SERVE, 100.0, 100.0, 1, 100.0); // Record for future version heartbeatVersionedStats.recordLeaderRecordLag(STORE_NAME, FUTURE_VERSION, REGION, FIXED_CURRENT_TIME - 200); @@ -859,9 +872,16 @@ public void testVersionRoleTaggingInRecordLevelDelayMetrics() { .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REGION) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.LOCAL.getDimensionValue()) .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.FUTURE.getDimensionValue()) .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue()) .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue()) + .put( + VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), + VeniceStoreWriteType.REGULAR_PUT.getDimensionValue()) + .put( + VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), + VeniceChunkingStatus.UNCHUNKED.getDimensionValue()) .build(); validateExponentialHistogramPointData( inMemoryMetricReader, diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntityTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntityTest.java index 7af4b8c6ae5..91b9ec1b47b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntityTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelMetricEntityTest.java @@ -1,10 +1,13 @@ package com.linkedin.davinci.stats.ingestion.heartbeat; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; import static com.linkedin.venice.utils.Utils.setOf; @@ -36,9 +39,12 @@ private static Map ex VENICE_STORE_NAME, VENICE_CLUSTER_NAME, VENICE_REGION_NAME, + VENICE_REGION_LOCALITY, VENICE_VERSION_ROLE, VENICE_REPLICA_TYPE, - VENICE_REPLICA_STATE))); + VENICE_REPLICA_STATE, + VENICE_STORE_WRITE_TYPE, + VENICE_CHUNKING_STATUS))); return map; } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStatsTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStatsTest.java index d38f3c70ce1..64229cf82b7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStatsTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/RecordLevelDelayOtelStatsTest.java @@ -3,11 +3,14 @@ import static com.linkedin.davinci.stats.ServerMetricEntity.SERVER_METRIC_ENTITIES; import static com.linkedin.davinci.stats.ingestion.heartbeat.RecordLevelDelayOtelMetricEntity.INGESTION_RECORD_DELAY; import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_STATE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REPLICA_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE; import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_VERSION_ROLE; import static com.linkedin.venice.utils.OpenTelemetryDataTestUtils.validateExponentialHistogramPointData; import static org.testng.Assert.assertEquals; @@ -20,6 +23,9 @@ import com.linkedin.venice.stats.VeniceMetricsRepository; import com.linkedin.venice.stats.dimensions.ReplicaState; import com.linkedin.venice.stats.dimensions.ReplicaType; +import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus; +import com.linkedin.venice.stats.dimensions.VeniceRegionLocality; +import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.tehuti.metrics.MetricsRepository; @@ -31,11 +37,17 @@ public class RecordLevelDelayOtelStatsTest { private static final String STORE_NAME = "test_store"; private static final String CLUSTER_NAME = "test_cluster"; - private static final String REGION_US_WEST = "us-west"; + private static final String LOCAL_REGION = "us-west"; + private static final String REMOTE_REGION = "us-east"; + private static final String REGION_US_WEST = LOCAL_REGION; private static final int CURRENT_VERSION = 2; private static final int FUTURE_VERSION = 3; private static final String TEST_PREFIX = "test_prefix"; + // Default SLO dimensions for most tests: non-WC, non-chunked + private static final boolean DEFAULT_PARTIAL_UPDATE = false; + private static final boolean DEFAULT_CHUNKING_ENABLED = false; + private InMemoryMetricReader inMemoryMetricReader; private VeniceMetricsRepository metricsRepository; private RecordLevelDelayOtelStats recordLevelDelayOtelStats; @@ -49,7 +61,13 @@ public void setUp() { .setEmitOtelMetrics(true) .setOtelAdditionalMetricsReader(inMemoryMetricReader) .build()); - recordLevelDelayOtelStats = new RecordLevelDelayOtelStats(metricsRepository, STORE_NAME, CLUSTER_NAME); + recordLevelDelayOtelStats = new RecordLevelDelayOtelStats( + metricsRepository, + STORE_NAME, + CLUSTER_NAME, + LOCAL_REGION, + DEFAULT_PARTIAL_UPDATE, + DEFAULT_CHUNKING_ENABLED); } @AfterMethod @@ -73,8 +91,13 @@ public void testConstructorWithOtelDisabled() { .setEmitOtelMetrics(false) .setOtelAdditionalMetricsReader(inMemoryMetricReader) .build())) { - RecordLevelDelayOtelStats stats = - new RecordLevelDelayOtelStats(disabledMetricsRepository, STORE_NAME, CLUSTER_NAME); + RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats( + disabledMetricsRepository, + STORE_NAME, + CLUSTER_NAME, + LOCAL_REGION, + DEFAULT_PARTIAL_UPDATE, + DEFAULT_CHUNKING_ENABLED); // Verify OTel metrics are disabled assertFalse(stats.emitOtelMetrics(), "OTel metrics should be disabled"); @@ -85,7 +108,13 @@ public void testConstructorWithOtelDisabled() { public void testConstructorWithNonVeniceMetricsRepository() { // Create with regular MetricsRepository (not VeniceOpenTelemetryMetricsRepository) MetricsRepository regularRepository = new MetricsRepository(); - RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats(regularRepository, STORE_NAME, CLUSTER_NAME); + RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats( + regularRepository, + STORE_NAME, + CLUSTER_NAME, + LOCAL_REGION, + DEFAULT_PARTIAL_UPDATE, + DEFAULT_CHUNKING_ENABLED); // Verify OTel metrics are disabled (default for non-Venice repository) assertFalse(stats.emitOtelMetrics(), "OTel metrics should be disabled for non-Venice repository"); @@ -199,8 +228,13 @@ public void testRecordRecordDelayOtelMetricsWhenOtelDisabled() { new VeniceMetricsConfig.Builder().setMetricEntities(SERVER_METRIC_ENTITIES) .setEmitOtelMetrics(false) .build())) { - RecordLevelDelayOtelStats stats = - new RecordLevelDelayOtelStats(disabledMetricsRepository, STORE_NAME, CLUSTER_NAME); + RecordLevelDelayOtelStats stats = new RecordLevelDelayOtelStats( + disabledMetricsRepository, + STORE_NAME, + CLUSTER_NAME, + LOCAL_REGION, + DEFAULT_PARTIAL_UPDATE, + DEFAULT_CHUNKING_ENABLED); stats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); // Record metrics - should be no-op @@ -237,7 +271,8 @@ private void validateRecordMetric( } /** - * Helper method to validate record histogram metrics with explicit min/max/sum + * Helper method to validate record histogram metrics with explicit min/max/sum. + * Uses the default SLO dimensions (non-WC, non-chunked, local region). */ private void validateRecordMetric( String region, @@ -248,13 +283,23 @@ private void validateRecordMetric( double expectedMax, double expectedSum, long expectedCount) { + VeniceRegionLocality locality = + region.equals(LOCAL_REGION) ? VeniceRegionLocality.LOCAL : VeniceRegionLocality.REMOTE; + VeniceStoreWriteType wcStatus = + DEFAULT_PARTIAL_UPDATE ? VeniceStoreWriteType.PARTIAL_UPDATE : VeniceStoreWriteType.REGULAR_PUT; + VeniceChunkingStatus chunkStatus = + DEFAULT_CHUNKING_ENABLED ? VeniceChunkingStatus.CHUNKED : VeniceChunkingStatus.UNCHUNKED; + Attributes expectedAttributes = Attributes.builder() .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), region) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), locality.getDimensionValue()) .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), versionRole.getDimensionValue()) .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), replicaType.getDimensionValue()) .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), replicaState.getDimensionValue()) + .put(VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), wcStatus.getDimensionValue()) + .put(VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), chunkStatus.getDimensionValue()) .build(); validateExponentialHistogramPointData( @@ -402,14 +447,21 @@ public void testMultipleRegionsHaveIndependentMetrics() { 100.0, 1); - // Validate us-east independently + // Validate us-east independently (REMOTE since it differs from LOCAL_REGION) + VeniceStoreWriteType wcStatus = + DEFAULT_PARTIAL_UPDATE ? VeniceStoreWriteType.PARTIAL_UPDATE : VeniceStoreWriteType.REGULAR_PUT; + VeniceChunkingStatus chunkStatus = + DEFAULT_CHUNKING_ENABLED ? VeniceChunkingStatus.CHUNKED : VeniceChunkingStatus.UNCHUNKED; Attributes eastAttributes = Attributes.builder() .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), regionEast) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.REMOTE.getDimensionValue()) .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue()) .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue()) + .put(VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), wcStatus.getDimensionValue()) + .put(VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), chunkStatus.getDimensionValue()) .build(); validateExponentialHistogramPointData( inMemoryMetricReader, @@ -463,4 +515,151 @@ public void testCloseAndReuse() { 300.0, 2); } + + // ================================================================================== + // Tests for SLO classification dimensions: region locality, write compute, chunking + // ================================================================================== + + /** + * Verifies that local region records are tagged with locality=LOCAL and remote region + * records are tagged with locality=REMOTE. + */ + @Test + public void testRegionLocalityDimension() { + recordLevelDelayOtelStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + // Record from local region + recordLevelDelayOtelStats.recordRecordDelayOtelMetrics( + CURRENT_VERSION, + LOCAL_REGION, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 100L); + + // Record from remote region + recordLevelDelayOtelStats.recordRecordDelayOtelMetrics( + CURRENT_VERSION, + REMOTE_REGION, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 200L); + + // Validate local — uses validateRecordMetric which checks LOCAL for LOCAL_REGION + validateRecordMetric(LOCAL_REGION, VersionRole.CURRENT, ReplicaType.LEADER, ReplicaState.READY_TO_SERVE, 100.0, 1); + + // Validate remote — explicit attributes check + Attributes remoteAttributes = Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), STORE_NAME) + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), REMOTE_REGION) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.REMOTE.getDimensionValue()) + .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) + .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue()) + .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue()) + .put( + VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), + VeniceStoreWriteType.REGULAR_PUT.getDimensionValue()) + .put( + VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), + VeniceChunkingStatus.UNCHUNKED.getDimensionValue()) + .build(); + validateExponentialHistogramPointData( + inMemoryMetricReader, + 200.0, + 200.0, + 1, + 200.0, + remoteAttributes, + INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(), + TEST_PREFIX); + } + + /** + * Verifies that write-compute-enabled stores emit write_compute_status=write_compute_enabled. + */ + @Test + public void testWriteComputeEnabledDimension() { + RecordLevelDelayOtelStats wcStats = new RecordLevelDelayOtelStats( + metricsRepository, + "wc_store", + CLUSTER_NAME, + LOCAL_REGION, + true, // writeComputeEnabled + false); + wcStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + wcStats.recordRecordDelayOtelMetrics( + CURRENT_VERSION, + LOCAL_REGION, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 150L); + + Attributes expectedAttributes = Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), "wc_store") + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), LOCAL_REGION) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.LOCAL.getDimensionValue()) + .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) + .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue()) + .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue()) + .put( + VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), + VeniceStoreWriteType.PARTIAL_UPDATE.getDimensionValue()) + .put( + VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), + VeniceChunkingStatus.UNCHUNKED.getDimensionValue()) + .build(); + + validateExponentialHistogramPointData( + inMemoryMetricReader, + 150.0, + 150.0, + 1, + 150.0, + expectedAttributes, + INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(), + TEST_PREFIX); + } + + /** + * Verifies that chunking-enabled stores emit chunking_status=chunked. + */ + @Test + public void testChunkingEnabledDimension() { + RecordLevelDelayOtelStats chunkedStats = + new RecordLevelDelayOtelStats(metricsRepository, "chunked_store", CLUSTER_NAME, LOCAL_REGION, false, true); // chunkingEnabled + chunkedStats.updateVersionInfo(CURRENT_VERSION, FUTURE_VERSION); + + chunkedStats.recordRecordDelayOtelMetrics( + CURRENT_VERSION, + LOCAL_REGION, + ReplicaType.LEADER, + ReplicaState.READY_TO_SERVE, + 300L); + + Attributes expectedAttributes = Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), "chunked_store") + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), LOCAL_REGION) + .put(VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), VeniceRegionLocality.LOCAL.getDimensionValue()) + .put(VENICE_VERSION_ROLE.getDimensionNameInDefaultFormat(), VersionRole.CURRENT.getDimensionValue()) + .put(VENICE_REPLICA_TYPE.getDimensionNameInDefaultFormat(), ReplicaType.LEADER.getDimensionValue()) + .put(VENICE_REPLICA_STATE.getDimensionNameInDefaultFormat(), ReplicaState.READY_TO_SERVE.getDimensionValue()) + .put( + VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), + VeniceStoreWriteType.REGULAR_PUT.getDimensionValue()) + .put(VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), VeniceChunkingStatus.CHUNKED.getDimensionValue()) + .build(); + + validateExponentialHistogramPointData( + inMemoryMetricReader, + 300.0, + 300.0, + 1, + 300.0, + expectedAttributes, + INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(), + TEST_PREFIX); + } } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java index ee0cbd01247..4166c72e87f 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java @@ -153,7 +153,10 @@ public enum VeniceMetricsDimensions { VENICE_CONNECTION_SOURCE("venice.connection.source"), /** {@link VeniceDrainerType} Drainer type: sorted or unsorted. */ - VENICE_DRAINER_TYPE("venice.drainer.type"); + VENICE_DRAINER_TYPE("venice.drainer.type"), + + /** {@link VeniceStoreWriteType} Store write type: regular_put or partial_update. */ + VENICE_STORE_WRITE_TYPE("venice.store.write_type"); private final String[] dimensionName = new String[VeniceOpenTelemetryMetricNamingFormat.SIZE]; diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceStoreWriteType.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceStoreWriteType.java new file mode 100644 index 00000000000..2bfebcd4219 --- /dev/null +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceStoreWriteType.java @@ -0,0 +1,17 @@ +package com.linkedin.venice.stats.dimensions; + +/** + * Dimension to classify a store's write type for SLO tier classification. + * Partial update (write compute) stores have higher latency due to read-merge-write on leaders. + */ +public enum VeniceStoreWriteType implements VeniceDimensionInterface { + /** Store uses full PUT (regular writes). */ + REGULAR_PUT, + /** Store uses partial update (write compute). */ + PARTIAL_UPDATE; + + @Override + public VeniceMetricsDimensions getDimensionName() { + return VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE; + } +} diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceStoreWriteTypeTest.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceStoreWriteTypeTest.java new file mode 100644 index 00000000000..e5217303e95 --- /dev/null +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceStoreWriteTypeTest.java @@ -0,0 +1,20 @@ +package com.linkedin.venice.stats.dimensions; + +import com.linkedin.venice.utils.CollectionUtils; +import java.util.Map; +import org.testng.annotations.Test; + + +public class VeniceStoreWriteTypeTest { + @Test + public void testDimensionInterface() { + Map expectedValues = CollectionUtils.mapBuilder() + .put(VeniceStoreWriteType.REGULAR_PUT, "regular_put") + .put(VeniceStoreWriteType.PARTIAL_UPDATE, "partial_update") + .build(); + new VeniceDimensionTestFixture<>( + VeniceStoreWriteType.class, + VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE, + expectedValues).assertAll(); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java index 0a3381a09c6..716bb492415 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/NearlineE2ELatencyTest.java @@ -1,7 +1,16 @@ package com.linkedin.venice.endToEnd; +import static com.linkedin.davinci.stats.ingestion.heartbeat.RecordLevelDelayOtelMetricEntity.INGESTION_RECORD_DELAY; +import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVICE_METRIC_PREFIX; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CHUNKING_STATUS; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_LOCALITY; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_REGION_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME; +import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_WRITE_TYPE; import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer; import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; +import static com.linkedin.venice.utils.OpenTelemetryDataTestUtils.validateExponentialHistogramPointDataAtLeast; import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA; import com.linkedin.davinci.stats.IngestionStats; @@ -17,12 +26,19 @@ import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceServerWrapper; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pubsub.PubSubPositionTypeRegistry; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.server.VeniceServer; +import com.linkedin.venice.stats.VeniceMetricsRepository; +import com.linkedin.venice.stats.dimensions.VeniceChunkingStatus; +import com.linkedin.venice.stats.dimensions.VeniceRegionLocality; +import com.linkedin.venice.stats.dimensions.VeniceStoreWriteType; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Utils; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.tehuti.metrics.MetricsRepository; import java.util.AbstractMap; import java.util.Arrays; @@ -164,4 +180,150 @@ public void testEndToEndNearlineMetric() { Assert.assertTrue(producerToLocalBroker.get()); Assert.assertTrue(producerToLocalBrokerLatencies.stream().anyMatch(v -> v > 0)); } + + /** + * Validates that the record-level delay OTel metric (ingestion.replication.record.delay) includes + * the SLO classification dimensions: region locality, partial update status, and chunking status. + * + *

Uses the same hybrid store setup as {@link #testEndToEndNearlineMetric()}: non-WC, non-chunked, + * local-region ingestion. After streaming records and waiting for ingestion, verifies that at least + * one server emitted the histogram with the expected dimension values. + */ + @Test(timeOut = TEST_TIMEOUT) + public void testRecordLevelDelaySloDimensions() { + String storeName = "test-slo-dims"; + String parentControllerUrls = parentController.getControllerUrl(); + try (ControllerClient parentControllerCli = new ControllerClient(CLUSTER_NAME, parentControllerUrls); + ControllerClient dc0Client = + new ControllerClient(CLUSTER_NAME, childDatacenters.get(0).getControllerConnectString()); + ControllerClient dc1Client = + new ControllerClient(CLUSTER_NAME, childDatacenters.get(1).getControllerConnectString())) { + List dcControllerClientList = Arrays.asList(dc0Client, dc1Client); + TestUtils.createAndVerifyStoreInAllRegions(storeName, parentControllerCli, dcControllerClientList); + Assert.assertFalse( + parentControllerCli + .updateStore( + storeName, + new UpdateStoreQueryParams().setHybridRewindSeconds(10) + .setHybridOffsetLagThreshold(5) + .setNativeReplicationEnabled(true) + .setPartitionCount(1)) + .isError()); + TestUtils.verifyDCConfigNativeAndActiveRepl(storeName, true, false, dc0Client, dc1Client); + VersionCreationResponse versionCreationResponse = parentControllerCli.requestTopicForWrites( + storeName, + 1024, + Version.PushType.BATCH, + Version.guidBasedDummyPushId(), + true, + false, + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + false, + -1); + Assert.assertFalse(versionCreationResponse.isError()); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + childDatacenters.get(0).getKafkaBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); + List pubSubBrokerWrappers = + childDatacenters.stream().map(VeniceMultiClusterWrapper::getKafkaBrokerWrapper).collect(Collectors.toList()); + Map additionalConfigs = PubSubBrokerWrapper.getBrokerDetailsForClients(pubSubBrokerWrappers); + TestUtils.writeBatchData( + versionCreationResponse, + STRING_SCHEMA.toString(), + STRING_SCHEMA.toString(), + IntStream.range(0, 10).mapToObj(i -> new AbstractMap.SimpleEntry<>(String.valueOf(i), String.valueOf(i))), + HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID, + pubSubProducerAdapterFactory, + additionalConfigs, + pubSubPositionTypeRegistry); + TestUtils.waitForNonDeterministicPushCompletion( + versionCreationResponse.getKafkaTopic(), + parentControllerCli, + 60, + TimeUnit.SECONDS); + } + + VeniceClusterWrapper cluster0 = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); + SystemProducer dc0Producer = getSamzaProducer(cluster0, storeName, Version.PushType.STREAM); + + for (int i = 10; i < 20; i++) { + sendStreamingRecord(dc0Producer, storeName, i); + } + + // Wait for streaming data to be consumed + try (AvroGenericStoreClient client = ClientFactory.getAndStartGenericAvroClient( + ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(cluster0.getRandomRouterURL()))) { + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { + try { + Object value = client.get("19").get(); + Assert.assertNotNull(value, "Last streaming record not yet available"); + } catch (Exception e) { + throw new VeniceException(e); + } + }); + } + + // Wait for the heartbeat reporter thread to emit record-level delay metrics (reports every 60s, + // but in tests the initial report fires shortly after ingestion starts) + String dc0Region = childDatacenters.get(0).getRegionName(); + TestUtils.waitForNonDeterministicAssertion(90, TimeUnit.SECONDS, true, true, () -> { + boolean foundMetric = false; + for (VeniceServerWrapper sw: cluster0.getVeniceServers()) { + InMemoryMetricReader reader = getOtelReader(sw); + if (reader == null) { + continue; + } + + // Build expected attributes for: local region, non-WC, non-chunked + Attributes expectedAttributes = Attributes.builder() + .put(VENICE_STORE_NAME.getDimensionNameInDefaultFormat(), storeName) + .put(VENICE_CLUSTER_NAME.getDimensionNameInDefaultFormat(), CLUSTER_NAME) + .put(VENICE_REGION_NAME.getDimensionNameInDefaultFormat(), dc0Region) + .put( + VENICE_REGION_LOCALITY.getDimensionNameInDefaultFormat(), + VeniceRegionLocality.LOCAL.getDimensionValue()) + .put( + VENICE_STORE_WRITE_TYPE.getDimensionNameInDefaultFormat(), + VeniceStoreWriteType.REGULAR_PUT.getDimensionValue()) + .put( + VENICE_CHUNKING_STATUS.getDimensionNameInDefaultFormat(), + VeniceChunkingStatus.UNCHUNKED.getDimensionValue()) + .build(); + + try { + // Validate at least 1 data point with the expected SLO dimensions + validateExponentialHistogramPointDataAtLeast( + reader, + 1, + expectedAttributes, + INGESTION_RECORD_DELAY.getMetricEntity().getMetricName(), + SERVICE_METRIC_PREFIX); + foundMetric = true; + LOGGER.info( + "Server {} emitted record-level delay metric with SLO dimensions for store {}", + sw.getAddressForLogging(), + storeName); + } catch (AssertionError e) { + // This server may not be the leader for this partition — try next server + LOGGER.debug( + "Server {} did not emit record-level delay metric for store {}: {}", + sw.getAddressForLogging(), + storeName, + e.getMessage()); + } + } + Assert.assertTrue(foundMetric, "No server emitted record-level delay metric with expected SLO dimensions"); + }); + } + + private static InMemoryMetricReader getOtelReader(VeniceServerWrapper server) { + MetricsRepository metricsRepo = server.getMetricsRepository(); + if (metricsRepo instanceof VeniceMetricsRepository) { + return (InMemoryMetricReader) ((VeniceMetricsRepository) metricsRepo).getVeniceMetricsConfig() + .getOtelAdditionalMetricsReader(); + } + return null; + } }