Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private void rollForwardToTargetVersion(Set<String> regions, Store store, Versio

if (stalledVersionSwapSet.contains(storeName)) {
stalledVersionSwapSet.remove(storeName);
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapSensor(stalledVersionSwapSet.size());
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapMetric(stalledVersionSwapSet.size());
}

// Update parent version status after roll forward, so we don't check this store version again
Expand Down Expand Up @@ -375,7 +375,7 @@ private boolean isPushInTerminalState(
VERSION_SWAP_COMPLETION_STATUSES);
if (didPushCompleteInNonTargetRegions) {
stalledVersionSwapSet.remove(storeName);
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapSensor(stalledVersionSwapSet.size());
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapMetric(stalledVersionSwapSet.size());
}
}

Expand All @@ -390,7 +390,7 @@ private boolean isPushInTerminalState(
targetVersion.getNumber(),
TERMINAL_PUSH_VERSION_STATUSES);
if (didPushCompleteInTargetRegions) {
deferredVersionSwapStats.recordDeferredVersionSwapParentChildStatusMismatchSensor();
deferredVersionSwapStats.recordDeferredVersionSwapParentChildStatusMismatchMetric(clusterName, storeName);
String message =
"Push completed in target regions, parent status is still STARTED. Continuing with deferred swap for store: "
+ storeName + " for version: " + targetVersionNum;
Expand All @@ -412,7 +412,7 @@ private boolean isPushInTerminalState(
VERSION_SWAP_COMPLETION_STATUSES);

if (!didVersionSwapCompleteInNonTargetRegions) {
deferredVersionSwapStats.recordDeferredVersionSwapParentChildStatusMismatchSensor();
deferredVersionSwapStats.recordDeferredVersionSwapParentChildStatusMismatchMetric(clusterName, storeName);
String message =
"Parent status is already ONLINE, but version swap has not happened in the non target regions. "
+ "Continuing with deferred swap for store: " + storeName + " for version: " + targetVersionNum;
Expand Down Expand Up @@ -498,7 +498,8 @@ private void emitMetricIfVersionSwapIsStalled(
String targetRegion,
Store store,
int targetVersionNum,
Version parentVersion) {
Version parentVersion,
String clusterName) {
if (parentVersion.getStatus().equals(ONLINE) || parentVersion.getStatus().equals(ERROR)
|| parentVersion.getStatus().equals(PARTIALLY_ONLINE)) {
return;
Expand All @@ -525,7 +526,7 @@ private void emitMetricIfVersionSwapIsStalled(
+ targetRegion;
logMessageIfNotRedundant(message);
stalledVersionSwapSet.add(store.getName());
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapSensor(stalledVersionSwapSet.size());
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapMetric(stalledVersionSwapSet.size());
}
}

Expand Down Expand Up @@ -567,7 +568,7 @@ private void emitMetricIfVersionSwapIfStalledForSequentialRollout(
+ " and the wait time: " + parentStore.getTargetSwapRegionWaitTime() + " has passed in region " + region;
logMessageIfNotRedundant(message);
stalledVersionSwapSet.add(parentStore.getName());
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapSensor(stalledVersionSwapSet.size());
deferredVersionSwapStats.recordDeferredVersionSwapStalledVersionSwapMetric(stalledVersionSwapSet.size());
}
}

Expand Down Expand Up @@ -759,7 +760,8 @@ private Set<String> getRegionsToRollForward(
+ childStore.getCurrentVersion() + " for store " + parentStore.getName() + " in region "
+ nonTargetRegion;
logMessageIfNotRedundant(message);
deferredVersionSwapStats.recordDeferredVersionSwapChildStatusMismatchSensor();
deferredVersionSwapStats
.recordDeferredVersionSwapChildStatusMismatchMetric(clusterName, parentStore.getName());
} else {
onlineNonTargetRegions.add(nonTargetRegion);
}
Expand Down Expand Up @@ -882,7 +884,7 @@ private void handleFailedRollForward(
});

if (attemptedRetries == MAX_ROLL_FORWARD_RETRY_LIMIT) {
deferredVersionSwapStats.recordDeferredVersionSwapFailedRollForwardSensor();
deferredVersionSwapStats.recordDeferredVersionSwapFailedRollForwardMetric(clusterName, parentStore.getName());
updateStore(clusterName, parentStore.getName(), PARTIALLY_ONLINE, targetVersionNum);
failedRollforwardRetryCountMap.remove(kafkaTopicName);
LOGGER.info(
Expand Down Expand Up @@ -937,8 +939,8 @@ private Runnable getRunnableForDeferredVersionSwap() {
return;
}

try {
for (String cluster: veniceParentHelixAdmin.getClustersLeaderOf()) {
for (String cluster: veniceParentHelixAdmin.getClustersLeaderOf()) {
try {
if (!veniceParentHelixAdmin.isLeaderControllerFor(cluster)) {
continue;
}
Expand Down Expand Up @@ -1003,13 +1005,13 @@ private Runnable getRunnableForDeferredVersionSwap() {
});
}
clusterThreadPoolStats.recordQueuedTasksCount(clusterExecutorService.getQueue().size());
} catch (Exception e) {
LOGGER.warn("Caught exception while performing deferred version swap for cluster: {}", cluster, e);
deferredVersionSwapStats.recordDeferredVersionSwapExceptionMetric(cluster);
} catch (Throwable throwable) {
LOGGER.warn("Caught a throwable while performing deferred version swap for cluster: {}", cluster, throwable);
deferredVersionSwapStats.recordDeferredVersionSwapThrowableMetric(cluster);
}
} catch (Exception e) {
LOGGER.warn("Caught exception while performing deferred version swap", e);
deferredVersionSwapStats.recordDeferredVersionSwapErrorSensor();
} catch (Throwable throwable) {
LOGGER.warn("Caught a throwable while performing deferred version swap", throwable);
deferredVersionSwapStats.recordDeferredVersionSwapThrowableSensor();
}
};
}
Expand Down Expand Up @@ -1192,7 +1194,8 @@ private void performParallelRollForward(
targetRegion,
parentStore,
targetVersionNum,
targetVersion);
targetVersion,
cluster);

if (!didPostVersionSwapValidationsPass(
parentStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public class VeniceController {
TopicCleanupServiceStats.TopicCleanupOtelMetricEntity.class,
VeniceAdminStats.VeniceAdminOtelMetricEntity.class,
AdminConsumptionStats.AdminConsumptionOtelMetricEntity.class,
AddVersionLatencyStats.AddVersionLatencyOtelMetricEntity.class);
AddVersionLatencyStats.AddVersionLatencyOtelMetricEntity.class,
DeferredVersionSwapStats.DeferredVersionSwapOtelMetricEntity.class);

// services
private final VeniceControllerService controllerService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,63 +1,184 @@
package com.linkedin.venice.controller.stats;

import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_CLUSTER_NAME;
import static com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions.VENICE_STORE_NAME;
import static com.linkedin.venice.utils.Utils.setOf;

import com.google.common.collect.ImmutableMap;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.stats.OpenTelemetryMetricsSetup;
import com.linkedin.venice.stats.VeniceOpenTelemetryMetricsRepository;
import com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions;
import com.linkedin.venice.stats.metrics.MetricEntity;
import com.linkedin.venice.stats.metrics.MetricEntityStateBase;
import com.linkedin.venice.stats.metrics.MetricEntityStateGeneric;
import com.linkedin.venice.stats.metrics.MetricType;
import com.linkedin.venice.stats.metrics.MetricUnit;
import com.linkedin.venice.stats.metrics.ModuleMetricEntityInterface;
import com.linkedin.venice.stats.metrics.TehutiMetricNameEnum;
import io.opentelemetry.api.common.Attributes;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.Count;
import io.tehuti.metrics.stats.Gauge;
import java.util.Collections;
import java.util.Map;
import java.util.Set;


public class DeferredVersionSwapStats extends AbstractVeniceStats {
private final Sensor deferredVersionSwapErrorSensor;
private final Sensor deferredVersionSwapThrowableSensor;
private final Sensor deferredVersionSwapFailedRollForwardSensor;
private final Sensor deferredVersionSwapStalledVersionSwapSensor;
private final Sensor deferredVersionSwapParentChildStatusMismatchSensor;
private final Sensor deferredVersionSwapChildStatusMismatchSensor;
private final static String DEFERRED_VERSION_SWAP_ERROR = "deferred_version_swap_error";
private final static String DEFERRED_VERSION_SWAP_THROWABLE = "deferred_version_swap_throwable";
private final static String DEFERRED_VERSION_SWAP_FAILED_ROLL_FORWARD = "deferred_version_swap_failed_roll_forward";
private static final String DEFERRED_VERSION_SWAP_STALLED_VERSION_SWAP = "deferred_version_swap_stalled_version_swap";
private static final String DEFERRED_VERSION_SWAP_PARENT_CHILD_STATUS_MISMATCH_SENSOR =
"deferred_version_swap_parent_child_status_mismatch";
private static final String DEFERRED_VERSION_SWAP_CHILD_STATUS_MISMATCH_SENSOR =
"deferred_version_swap_child_status_mismatch";
private final MetricEntityStateGeneric deferredVersionSwapThrowableMetric;
private final MetricEntityStateGeneric deferredVersionSwapExceptionMetric;

private final MetricEntityStateGeneric deferredVersionSwapFailedRollForwardMetric;
private final MetricEntityStateBase deferredVersionSwapStalledVersionSwapMetric;
private final MetricEntityStateGeneric deferredVersionSwapParentChildStatusMismatchMetric;
private final MetricEntityStateGeneric deferredVersionSwapChildStatusMismatchMetric;

public DeferredVersionSwapStats(MetricsRepository metricsRepository) {
super(metricsRepository, "DeferredVersionSwap");
deferredVersionSwapErrorSensor = registerSensorIfAbsent(DEFERRED_VERSION_SWAP_ERROR, new Count());
deferredVersionSwapThrowableSensor = registerSensorIfAbsent(DEFERRED_VERSION_SWAP_THROWABLE, new Count());
deferredVersionSwapFailedRollForwardSensor =
registerSensorIfAbsent(DEFERRED_VERSION_SWAP_FAILED_ROLL_FORWARD, new Count());
deferredVersionSwapStalledVersionSwapSensor =
registerSensorIfAbsent(DEFERRED_VERSION_SWAP_STALLED_VERSION_SWAP, new Gauge());
deferredVersionSwapParentChildStatusMismatchSensor =
registerSensorIfAbsent(DEFERRED_VERSION_SWAP_PARENT_CHILD_STATUS_MISMATCH_SENSOR, new Count());
deferredVersionSwapChildStatusMismatchSensor =
registerSensorIfAbsent(DEFERRED_VERSION_SWAP_CHILD_STATUS_MISMATCH_SENSOR, new Count());

OpenTelemetryMetricsSetup.OpenTelemetryMetricsSetupInfo otelData =
OpenTelemetryMetricsSetup.builder(metricsRepository).build();
VeniceOpenTelemetryMetricsRepository otelRepository = otelData.getOtelRepository();
Map<VeniceMetricsDimensions, String> baseDimensionsMap = otelData.getBaseDimensionsMap();
Attributes baseAttributes = otelData.getBaseAttributes();

deferredVersionSwapExceptionMetric = MetricEntityStateGeneric.create(
DeferredVersionSwapOtelMetricEntity.DEFERRED_VERSION_SWAP_PROCESSING_ERROR_COUNT.getMetricEntity(),
otelRepository,
this::registerSensorIfAbsent,
DeferredVersionSwapTehutiMetricNameEnum.DEFERRED_VERSION_SWAP_ERROR,
Collections.singletonList(new Count()),
baseDimensionsMap);

deferredVersionSwapThrowableMetric = MetricEntityStateGeneric.create(
DeferredVersionSwapOtelMetricEntity.DEFERRED_VERSION_SWAP_PROCESSING_ERROR_COUNT.getMetricEntity(),
otelRepository,
this::registerSensorIfAbsent,
DeferredVersionSwapTehutiMetricNameEnum.DEFERRED_VERSION_SWAP_THROWABLE,
Collections.singletonList(new Count()),
baseDimensionsMap);

deferredVersionSwapFailedRollForwardMetric = MetricEntityStateGeneric.create(
DeferredVersionSwapOtelMetricEntity.DEFERRED_VERSION_SWAP_ROLL_FORWARD_FAILURE_COUNT.getMetricEntity(),
otelRepository,
this::registerSensorIfAbsent,
DeferredVersionSwapTehutiMetricNameEnum.DEFERRED_VERSION_SWAP_FAILED_ROLL_FORWARD,
Collections.singletonList(new Count()),
baseDimensionsMap);

deferredVersionSwapStalledVersionSwapMetric = MetricEntityStateBase.create(
DeferredVersionSwapOtelMetricEntity.DEFERRED_VERSION_SWAP_STALLED_COUNT.getMetricEntity(),
otelRepository,
this::registerSensorIfAbsent,
DeferredVersionSwapTehutiMetricNameEnum.DEFERRED_VERSION_SWAP_STALLED_VERSION_SWAP,
Collections.singletonList(new Gauge()),
baseDimensionsMap,
baseAttributes);

deferredVersionSwapParentChildStatusMismatchMetric = MetricEntityStateGeneric.create(
DeferredVersionSwapOtelMetricEntity.DEFERRED_VERSION_SWAP_PARENT_STATUS_MISMATCH_COUNT.getMetricEntity(),
otelRepository,
this::registerSensorIfAbsent,
DeferredVersionSwapTehutiMetricNameEnum.DEFERRED_VERSION_SWAP_PARENT_CHILD_STATUS_MISMATCH,
Collections.singletonList(new Count()),
baseDimensionsMap);

deferredVersionSwapChildStatusMismatchMetric = MetricEntityStateGeneric.create(
DeferredVersionSwapOtelMetricEntity.DEFERRED_VERSION_SWAP_CHILD_STATUS_MISMATCH_COUNT.getMetricEntity(),
otelRepository,
this::registerSensorIfAbsent,
DeferredVersionSwapTehutiMetricNameEnum.DEFERRED_VERSION_SWAP_CHILD_STATUS_MISMATCH,
Collections.singletonList(new Count()),
baseDimensionsMap);
}

public void recordDeferredVersionSwapExceptionMetric(String clusterName) {
deferredVersionSwapExceptionMetric.record(1, clusterDimensions(clusterName));
}

public void recordDeferredVersionSwapErrorSensor() {
deferredVersionSwapErrorSensor.record();
public void recordDeferredVersionSwapThrowableMetric(String clusterName) {
deferredVersionSwapThrowableMetric.record(1, clusterDimensions(clusterName));
}

public void recordDeferredVersionSwapThrowableSensor() {
deferredVersionSwapThrowableSensor.record();
public void recordDeferredVersionSwapFailedRollForwardMetric(String clusterName, String storeName) {
deferredVersionSwapFailedRollForwardMetric.record(1, clusterAndStoreDimensions(clusterName, storeName));
}

public void recordDeferredVersionSwapFailedRollForwardSensor() {
deferredVersionSwapFailedRollForwardSensor.record();
public void recordDeferredVersionSwapStalledVersionSwapMetric(double value) {
deferredVersionSwapStalledVersionSwapMetric.record(value);
}

public void recordDeferredVersionSwapStalledVersionSwapSensor(double value) {
deferredVersionSwapStalledVersionSwapSensor.record(value);
public void recordDeferredVersionSwapParentChildStatusMismatchMetric(String clusterName, String storeName) {
deferredVersionSwapParentChildStatusMismatchMetric.record(1, clusterAndStoreDimensions(clusterName, storeName));
}

public void recordDeferredVersionSwapParentChildStatusMismatchSensor() {
deferredVersionSwapParentChildStatusMismatchSensor.record();
public void recordDeferredVersionSwapChildStatusMismatchMetric(String clusterName, String storeName) {
deferredVersionSwapChildStatusMismatchMetric.record(1, clusterAndStoreDimensions(clusterName, storeName));
}

public void recordDeferredVersionSwapChildStatusMismatchSensor() {
deferredVersionSwapChildStatusMismatchSensor.record();
private static Map<VeniceMetricsDimensions, String> clusterDimensions(String clusterName) {
return Collections.singletonMap(VENICE_CLUSTER_NAME, clusterName);
}

private static Map<VeniceMetricsDimensions, String> clusterAndStoreDimensions(String clusterName, String storeName) {
return ImmutableMap.of(VENICE_CLUSTER_NAME, clusterName, VENICE_STORE_NAME, storeName);
}

enum DeferredVersionSwapTehutiMetricNameEnum implements TehutiMetricNameEnum {
DEFERRED_VERSION_SWAP_ERROR, DEFERRED_VERSION_SWAP_THROWABLE, DEFERRED_VERSION_SWAP_FAILED_ROLL_FORWARD,
DEFERRED_VERSION_SWAP_STALLED_VERSION_SWAP, DEFERRED_VERSION_SWAP_PARENT_CHILD_STATUS_MISMATCH,
DEFERRED_VERSION_SWAP_CHILD_STATUS_MISMATCH
}

public enum DeferredVersionSwapOtelMetricEntity implements ModuleMetricEntityInterface {
/** Count of unexpected failures (both {@link Exception} and {@link Throwable}) in the per-cluster processing loop */
DEFERRED_VERSION_SWAP_PROCESSING_ERROR_COUNT(
"deferred_version_swap.processing_error_count", MetricType.COUNTER, MetricUnit.NUMBER,
"Count of unexpected failures in the deferred version swap processing loop", setOf(VENICE_CLUSTER_NAME)
),
/** Count of deferred version swap roll forward failures */
DEFERRED_VERSION_SWAP_ROLL_FORWARD_FAILURE_COUNT(
"deferred_version_swap.roll_forward.failure_count", MetricType.COUNTER, MetricUnit.NUMBER,
"Count of deferred version swap roll forward failures", setOf(VENICE_CLUSTER_NAME, VENICE_STORE_NAME)
),
/** Gauge of stalled deferred version swaps (global — stalledVersionSwapSet is shared across all clusters) */
DEFERRED_VERSION_SWAP_STALLED_COUNT(
MetricEntity.createWithNoDimensions(
"deferred_version_swap.stalled_count",
MetricType.GAUGE,
MetricUnit.NUMBER,
"Count of stalled deferred version swaps across all clusters")
),
/** Count of deferred version swap parent-child status mismatches */
DEFERRED_VERSION_SWAP_PARENT_STATUS_MISMATCH_COUNT(
"deferred_version_swap.parent_status_mismatch_count", MetricType.COUNTER, MetricUnit.NUMBER,
"Count of deferred version swap parent-child status mismatches", setOf(VENICE_CLUSTER_NAME, VENICE_STORE_NAME)
),
/** Count of deferred version swap child status mismatches */
DEFERRED_VERSION_SWAP_CHILD_STATUS_MISMATCH_COUNT(
"deferred_version_swap.child_status_mismatch_count", MetricType.COUNTER, MetricUnit.NUMBER,
"Count of deferred version swap child status mismatches", setOf(VENICE_CLUSTER_NAME, VENICE_STORE_NAME)
);

private final MetricEntity metricEntity;

DeferredVersionSwapOtelMetricEntity(
String metricName,
MetricType metricType,
MetricUnit unit,
String description,
Set<VeniceMetricsDimensions> dimensionsList) {
this.metricEntity = new MetricEntity(metricName, metricType, unit, description, dimensionsList);
}

DeferredVersionSwapOtelMetricEntity(MetricEntity metricEntity) {
this.metricEntity = metricEntity;
}

@Override
public MetricEntity getMetricEntity() {
return metricEntity;
}
}
}
Loading
Loading