diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index cf737d2f19b..aa1e33e9eb3 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -484,6 +484,14 @@ private ConfigKeys() { public static final String CONTROLLER_BACKUP_VERSION_DELETION_SLEEP_MS = "controller.backup.version.deletion.sleep.ms"; + public static final String CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA = + "controller.backup.version.min.active.replica"; + public static final String CONTROLLER_BACKUP_VERSION_REPLICA_COUNT = "controller.backup.version.replica.count"; + + public static final String CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA_HIGH_RF = + "controller.backup.version.min.active.replica.high.rf"; + public static final String CONTROLLER_BACKUP_VERSION_REPLICA_COUNT_HIGH_RF = + "controller.backup.version.replica.count.high.rf"; /** * The following config is to control whether to enable backup version cleanup based on retention policy or not at cluster level. diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreBackupVersionDeletion.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreBackupVersionDeletion.java index f28cd291a05..38f8e13373c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreBackupVersionDeletion.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStoreBackupVersionDeletion.java @@ -165,7 +165,7 @@ public void testBackupVersionReplicaReduction() throws IOException { 30, TimeUnit.SECONDS, () -> veniceHelixAdmin.getIdealState(CLUSTER_NAMES[0], Version.composeKafkaTopic(storeName, 2)) - .getMinActiveReplicas() == 2); + .getMinActiveReplicas() == 1); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java index be2732dcd9d..40b292e010e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/StoreBackupVersionCleanupService.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.helix.model.IdealState; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; @@ -54,7 +55,6 @@ * to accommodate the delay between Controller and Router. */ public class StoreBackupVersionCleanupService extends AbstractVeniceService { - private static final int MIN_REPLICA = 2; public static final String TYPE_CURRENT_VERSION = "current_version"; private static final Logger LOGGER = LogManager.getLogger(StoreBackupVersionCleanupService.class); private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance(); @@ -82,6 +82,10 @@ public class StoreBackupVersionCleanupService extends AbstractVeniceService { private final CloseableHttpAsyncClient httpAsyncClient; private final long keepAliveDurationMs = TimeUnit.HOURS.toMillis(1); private final Time time; + private final int backupVersionMinActiveReplica; + private final int backupVersionMinActiveReplicaHighRF; + private final int backupVersionReplicaCount; + private final int backupVersionReplicaCountHighRF; public StoreBackupVersionCleanupService( VeniceHelixAdmin admin, @@ -102,6 +106,10 @@ protected StoreBackupVersionCleanupService( this.sleepInterval = multiClusterConfig.getBackupVersionCleanupSleepMs(); this.defaultBackupVersionRetentionMs = multiClusterConfig.getBackupVersionDefaultRetentionMs(); this.time = time; + this.backupVersionMinActiveReplica = multiClusterConfig.getBackupVersionMinActiveReplica(); + this.backupVersionMinActiveReplicaHighRF = multiClusterConfig.getBackupVersionMinActiveReplicaHighRF(); + this.backupVersionReplicaCount = multiClusterConfig.getBackupVersionReplicaCount(); + this.backupVersionReplicaCountHighRF = multiClusterConfig.getBackupVersionReplicaCountHighRF(); this.metricsRepository = metricsRepository; allClusters.forEach(clusterName -> { clusterNameCleanupStatsMap @@ -273,17 +281,7 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) { if (version.getNumber() >= currentVersion) { continue; } - - if (admin.updateIdealState( - clusterName, - Version.composeKafkaTopic(store.getName(), version.getNumber()), - MIN_REPLICA)) { - LOGGER.info( - "Store {} version {} is updated to ideal state to use {} replicas", - store.getName(), - version.getNumber(), - MIN_REPLICA); - } + updateReplicaCount(store, clusterName, version); } } return false; @@ -369,6 +367,30 @@ protected boolean cleanupBackupVersion(Store store, String clusterName) { return true; } + private void updateReplicaCount(Store store, String clusterName, Version version) { + IdealState idealState = + admin.getIdealState(clusterName, Version.composeKafkaTopic(store.getName(), version.getNumber())); + int replicaCount = Integer.parseInt(idealState.getReplicas()); + int minActiveReplica = backupVersionMinActiveReplicaHighRF; + int backupReplicaCount = backupVersionReplicaCountHighRF; + // non high batch get clusters + if (replicaCount <= 4) { + minActiveReplica = backupVersionMinActiveReplica; + backupReplicaCount = backupVersionReplicaCount; + } + if (admin.updateIdealState( + clusterName, + Version.composeKafkaTopic(store.getName(), version.getNumber()), + minActiveReplica, + backupReplicaCount)) { + LOGGER.info( + "Store {} version {} is updated to ideal state to use {} replicas", + store.getName(), + version.getNumber(), + minActiveReplica); + } + } + private class StoreBackupVersionCleanupTask implements Runnable { @Override public void run() { diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java index e0bf28d8592..aeee982931f 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java @@ -35,6 +35,10 @@ import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_DELETION_SLEEP_MS; import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_METADATA_FETCH_BASED_CLEANUP_ENABLED; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA_HIGH_RF; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_REPLICA_COUNT; +import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_REPLICA_COUNT_HIGH_RF; import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_REPLICA_REDUCTION_ENABLED; import static com.linkedin.venice.ConfigKeys.CONTROLLER_BACKUP_VERSION_RETENTION_BASED_CLEANUP_ENABLED; import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER; @@ -337,6 +341,10 @@ public class VeniceControllerClusterConfig { private final long errorPartitionProcessingCycleDelay; private final long backupVersionDefaultRetentionMs; private final long backupVersionCleanupSleepMs; + private final int backupVersionMinActiveReplica; + private final int backupVersionReplicaCount; + private final int backupVersionMinActiveReplicaHighRF; + private final int backupVersionReplicaCountHighRF; private final boolean backupVersionRetentionBasedCleanupEnabled; private final boolean backupVersionMetadataFetchBasedCleanupEnabled; @@ -1006,6 +1014,10 @@ public VeniceControllerClusterConfig(VeniceProperties props) { props.getLong(ERROR_PARTITION_PROCESSING_CYCLE_DELAY, 5 * Time.MS_PER_MINUTE); this.backupVersionCleanupSleepMs = props.getLong(CONTROLLER_BACKUP_VERSION_DELETION_SLEEP_MS, TimeUnit.MINUTES.toMillis(5)); + this.backupVersionMinActiveReplica = props.getInt(CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA, 1); + this.backupVersionReplicaCount = props.getInt(CONTROLLER_BACKUP_VERSION_REPLICA_COUNT, 1); + this.backupVersionMinActiveReplicaHighRF = props.getInt(CONTROLLER_BACKUP_VERSION_MIN_ACTIVE_REPLICA_HIGH_RF, 2); + this.backupVersionReplicaCountHighRF = props.getInt(CONTROLLER_BACKUP_VERSION_REPLICA_COUNT_HIGH_RF, 3); this.backupVersionDefaultRetentionMs = props.getLong(CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS, TimeUnit.DAYS.toMillis(7)); // 1 week this.backupVersionRetentionBasedCleanupEnabled = @@ -1933,6 +1945,22 @@ public boolean isAutoMaterializeDaVinciPushStatusSystemStoreEnabled() { return isAutoMaterializeDaVinciPushStatusSystemStoreEnabled; } + public int getBackupVersionMinActiveReplica() { + return backupVersionMinActiveReplica; + } + + public int getBackupVersionMinActiveReplicaHighRF() { + return backupVersionMinActiveReplicaHighRF; + } + + public int getBackupVersionReplicaCount() { + return backupVersionReplicaCount; + } + + public int getBackupVersionReplicaCountHighRF() { + return backupVersionReplicaCountHighRF; + } + public String getEmergencySourceRegion() { return emergencySourceRegion; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java index 1866c7a7685..0215f81e6c4 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerMultiClusterConfig.java @@ -231,6 +231,22 @@ public long getBackupVersionCleanupSleepMs() { return getCommonConfig().getBackupVersionCleanupSleepMs(); } + public int getBackupVersionMinActiveReplica() { + return getCommonConfig().getBackupVersionMinActiveReplica(); + } + + public int getBackupVersionMinActiveReplicaHighRF() { + return getCommonConfig().getBackupVersionMinActiveReplicaHighRF(); + } + + public int getBackupVersionReplicaCount() { + return getCommonConfig().getBackupVersionReplicaCount(); + } + + public int getBackupVersionReplicaCountHighRF() { + return getCommonConfig().getBackupVersionReplicaCountHighRF(); + } + public long getDeferredVersionSwapSleepMs() { return getCommonConfig().getDeferredVersionSwapSleepMs(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 8e34205c8b8..61b6cbef9dc 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5055,7 +5055,28 @@ public void rollbackToBackupVersion(String clusterName, String storeName, String if (backupVersion == NON_EXISTING_VERSION) { return store; } - int previousVersion = store.getCurrentVersion(); + int previousVersionNum = store.getCurrentVersion(); + Version previousVersion = store.getVersion(previousVersionNum); + IdealState previousIdealState = + getIdealState(clusterName, Version.composeKafkaTopic(store.getName(), previousVersion.getNumber())); + int replicaCount = Integer.parseInt(previousIdealState.getReplicas()); + int minActiveReplica = previousIdealState.getMinActiveReplicas(); + IdealState backupIdealState = + getIdealState(clusterName, Version.composeKafkaTopic(store.getName(), backupVersion)); + int backupReplicaCount = Integer.parseInt(backupIdealState.getReplicas()); + int backupMinActiveReplica = backupIdealState.getMinActiveReplicas(); + if (backupMinActiveReplica < minActiveReplica || backupReplicaCount < replicaCount) { + backupIdealState.setReplicas(String.valueOf(replicaCount)); + backupIdealState.setMinActiveReplicas(minActiveReplica); + LOGGER.info( + "Updating backup version {} replicas and minActiveReplicas to match previous version {} in store {}", + backupVersion, + previousVersion, + storeName); + helixAdminClient + .updateIdealState(clusterName, Version.composeKafkaTopic(store.getName(), backupVersion), backupIdealState); + } + store.setCurrentVersion(backupVersion); LOGGER.info( "Rolling back current version {} to version {} in store {}. Updating previous version {} status to ERROR", @@ -5068,12 +5089,12 @@ public void rollbackToBackupVersion(String clusterName, String storeName, String clusterName, store, backupVersion, - previousVersion, + previousVersionNum, false, store.isMigrating(), resources::isSourceCluster); - realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion, backupVersion); - store.updateVersionStatus(previousVersion, ERROR); + realTimeTopicSwitcher.transmitVersionSwapMessage(store, previousVersion.getNumber(), backupVersion); + store.updateVersionStatus(previousVersion.getNumber(), ERROR); return store; }); @@ -7504,12 +7525,15 @@ private void createClusterIfRequired(String clusterName) { admin.rebalance(controllerClusterName, clusterName, controllerClusterReplica); } - public boolean updateIdealState(String clusterName, String resourceName, int minReplica) { + public boolean updateIdealState(String clusterName, String resourceName, int minReplica, int replicationFactor) { IdealState idealState = helixAdminClient.getResourceIdealState(clusterName, resourceName); - if (idealState == null || idealState.getMinActiveReplicas() == minReplica) { + String replicas = String.valueOf(replicationFactor); + if (idealState == null + || (idealState.getMinActiveReplicas() == minReplica && replicas.equals(idealState.getReplicas()))) { return false; } idealState.setMinActiveReplicas(minReplica); + idealState.setReplicas(replicas); helixAdminClient.updateIdealState(clusterName, resourceName, idealState); return true; } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java index 9c74f30a3ba..42eadab4b36 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestStoreBackupVersionCleanupService.java @@ -269,7 +269,7 @@ public void testCleanupBackupVersion_OnlyOneBackupVersion() { // Should still not clean up the only backup version Assert.assertFalse(service.cleanupBackupVersion(storeWithOneBackup, CLUSTER_NAME)); verify(admin, never()).deleteOldVersionInStore(CLUSTER_NAME, storeWithOneBackup.getName(), 1); - verify(admin).updateIdealState(CLUSTER_NAME, Version.composeKafkaTopic(storeWithOneBackup.getName(), 1), 2); + verify(admin).updateIdealState(CLUSTER_NAME, Version.composeKafkaTopic(storeWithOneBackup.getName(), 1), 2, 3); } @Test