From 301ec41bcb74934ed8429918121c95a03b7a5af4 Mon Sep 17 00:00:00 2001 From: ehila Date: Fri, 21 Nov 2025 00:13:33 -0500 Subject: [PATCH] fix: add arbiter to control plane mcp sync during upgrades we need the arbiter mcp nodes to be counted in the same calculation for upgrading control plane nodes so an arbiter nodes does not update at the same time as a control plane node causing a quorum loss in etcd Signed-off-by: ehila --- pkg/controller/node/node_controller.go | 161 ++++++++++++++++++-- pkg/controller/node/node_controller_test.go | 71 +++++++++ 2 files changed, 223 insertions(+), 9 deletions(-) diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index b0cfa0eb6e..2015383d24 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -1130,6 +1130,11 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { pool := machineconfigpool.DeepCopy() everything := metav1.LabelSelector{} + // If arbiter pool, requeue master pool update and only sync status + if pool.Name == ctrlcommon.MachineConfigPoolArbiter { + return ctrl.handleArbiterPoolEvent(pool) + } + if reflect.DeepEqual(pool.Spec.NodeSelector, &everything) { ctrl.eventRecorder.Eventf(pool, corev1.EventTypeWarning, "SelectingAll", "This machineconfigpool is selecting all nodes. A non-empty selector is required.") return nil @@ -1183,7 +1188,50 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { return err } - if err := ctrl.setClusterConfigAnnotation(nodes); err != nil { + cc, err := ctrl.ccLister.Get(ctrlcommon.ControllerConfigName) + if err != nil { + return fmt.Errorf("error getting controllerconfig %q, error: %w", ctrlcommon.ControllerConfigName, err) + } + controlPlaneTopology := cc.Spec.Infra.Status.ControlPlaneTopology + + // For master pool in HighlyAvailableArbiterMode, coordinate with arbiter pool + var arbiterPool *mcfgv1.MachineConfigPool + var arbiterNodes []*corev1.Node + var arbiterMosc *mcfgv1.MachineOSConfig + var arbiterMosb *mcfgv1.MachineOSBuild + var arbiterLayered bool + if pool.Name == ctrlcommon.MachineConfigPoolMaster && controlPlaneTopology == configv1.HighlyAvailableArbiterMode { + arbiterObj, err := ctrl.mcpLister.Get(ctrlcommon.MachineConfigPoolArbiter) + if err != nil { + return fmt.Errorf("error getting arbiter pool %q, error: %w", ctrlcommon.MachineConfigPoolArbiter, err) + } + if arbiterObj.Spec.Configuration.Name != "" && arbiterObj.DeletionTimestamp == nil && !arbiterObj.Spec.Paused { + arbiterPool = arbiterObj.DeepCopy() + arbiterNodes, err = ctrl.getNodesForPool(arbiterPool) + if err != nil { + return fmt.Errorf("error getting nodes for arbiter pool %q, error: %w", ctrlcommon.MachineConfigPoolArbiter, err) + } + arbiterMosc, arbiterMosb, arbiterLayered, err = ctrl.getConfigAndBuildAndLayeredStatus(arbiterPool) + if err != nil { + return fmt.Errorf("error getting config and build for arbiter pool %q, error: %w", ctrlcommon.MachineConfigPoolArbiter, err) + } + combinedNodes := append([]*corev1.Node{}, nodes...) + combinedNodes = append(combinedNodes, arbiterNodes...) + combinedMax, err := maxUnavailable(pool, combinedNodes) + if err != nil { + return fmt.Errorf("error getting max unavailable count for pool %q, error: %w", pool.Name, err) + } + arbiterUnavailable := len(getUnavailableMachines(arbiterNodes, arbiterPool)) + // Adjust maxunavail to account for arbiter unavailable nodes + // This ensures we don't exceed the combined maxUnavailable across both pools + maxunavail = combinedMax - arbiterUnavailable + if maxunavail < 0 { + maxunavail = 0 + } + } + } + + if err := ctrl.setClusterConfigAnnotation(nodes, controlPlaneTopology); err != nil { return fmt.Errorf("error setting clusterConfig Annotation for node in pool %q, error: %w", pool.Name, err) } // Taint all the nodes in the node pool, irrespective of their upgrade status. @@ -1214,6 +1262,7 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { } } candidates, capacity := getAllCandidateMachines(layered, mosc, mosb, pool, nodes, maxunavail) + masterTargeted := 0 if len(candidates) > 0 { zones := make(map[string]bool) for _, candidate := range candidates { @@ -1230,8 +1279,107 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { } return err } + masterTargeted = len(candidates) ctrlcommon.UpdateStateMetric(ctrlcommon.MCCSubControllerState, "machine-config-controller-node", "Sync Machine Config Pool", pool.Name) } + + // If coordinating with arbiter pool, also handle arbiter node updates + if arbiterPool != nil && len(arbiterNodes) > 0 { + // Set cluster config annotation for arbiter nodes + if err := ctrl.setClusterConfigAnnotation(arbiterNodes, controlPlaneTopology); err != nil { + return fmt.Errorf("error setting clusterConfig Annotation for node in pool %q, error: %w", arbiterPool.Name, err) + } + + // Handle taints for arbiter nodes + for _, node := range arbiterNodes { + hasInProgressTaint := checkIfNodeHasInProgressTaint(node) + lns := ctrlcommon.NewLayeredNodeState(node) + if (!arbiterLayered && lns.IsDesiredMachineConfigEqualToPool(arbiterPool) && !lns.AreImageAnnotationsPresentOnNode()) || (arbiterLayered && lns.IsDesiredEqualToBuild(arbiterMosc, arbiterMosb)) { + if hasInProgressTaint { + if err := ctrl.removeUpdateInProgressTaint(ctx, node.Name); err != nil { + err = fmt.Errorf("failed removing %s taint for node %s: %w", constants.NodeUpdateInProgressTaint.Key, node.Name, err) + klog.Error(err) + } + } + } else { + if !hasInProgressTaint { + if err := ctrl.setUpdateInProgressTaint(ctx, node.Name); err != nil { + err = fmt.Errorf("failed applying %s taint for node %s: %w", constants.NodeUpdateInProgressTaint.Key, node.Name, err) + klog.Error(err) + } + } + } + } + + // Calculate remaining capacity for arbiter after master updates + masterUnavailable := len(getUnavailableMachines(nodes, pool)) + arbiterUnavailable := len(getUnavailableMachines(arbiterNodes, arbiterPool)) + combinedNodes := append([]*corev1.Node{}, nodes...) + combinedNodes = append(combinedNodes, arbiterNodes...) + combinedMax, err := maxUnavailable(pool, combinedNodes) + if err == nil { + remainingCapacity := combinedMax - masterUnavailable - masterTargeted - arbiterUnavailable + if remainingCapacity < 0 { + remainingCapacity = 0 + } + arbiterMaxUnavail := arbiterUnavailable + remainingCapacity + if arbiterMaxUnavail < 0 { + arbiterMaxUnavail = 0 + } + + arbiterCandidates, arbiterCapacity := getAllCandidateMachines(arbiterLayered, arbiterMosc, arbiterMosb, arbiterPool, arbiterNodes, arbiterMaxUnavail) + if len(arbiterCandidates) > 0 { + zones := make(map[string]bool) + for _, candidate := range arbiterCandidates { + if zone, ok := candidate.Labels[zoneLabel]; ok { + zones[zone] = true + } + } + ctrl.logPool(arbiterPool, "%d candidate nodes in %d zones for update, capacity: %d", len(arbiterCandidates), len(zones), arbiterCapacity) + if err := ctrl.updateCandidateMachines(arbiterLayered, arbiterMosc, arbiterMosb, arbiterPool, arbiterCandidates, arbiterCapacity); err != nil { + if syncErr := ctrl.syncStatusOnly(arbiterPool); syncErr != nil { + errs := kubeErrs.NewAggregate([]error{syncErr, err}) + return fmt.Errorf("error setting annotations for pool %q, sync error: %w", arbiterPool.Name, errs) + } + return err + } + ctrlcommon.UpdateStateMetric(ctrlcommon.MCCSubControllerState, "machine-config-controller-node", "Sync Machine Config Pool", arbiterPool.Name) + } + + // Sync status for arbiter pool + if err := ctrl.syncStatusOnly(arbiterPool); err != nil { + return err + } + } + } + + return ctrl.syncStatusOnly(pool) +} + +func (ctrl *Controller) handleArbiterPoolEvent(pool *mcfgv1.MachineConfigPool) error { + masterPool, err := ctrl.mcpLister.Get(ctrlcommon.MachineConfigPoolMaster) + if err == nil { + ctrl.enqueue(masterPool) + } else if !errors.IsNotFound(err) { + return err + } + // Still sync status for arbiter pool + if pool.DeletionTimestamp != nil || pool.Spec.Paused { + return ctrl.syncStatusOnly(pool) + } + mosc, mosb, layered, err := ctrl.getConfigAndBuildAndLayeredStatus(pool) + if err != nil { + return fmt.Errorf("could not get config and build: %w", err) + } + if layered { + _, canApplyUpdates, err := ctrl.canLayeredContinue(mosc, mosb) + if err != nil { + return err + } + if !canApplyUpdates { + return ctrl.syncStatusOnly(pool) + } + } return ctrl.syncStatusOnly(pool) } @@ -1277,17 +1425,12 @@ func (ctrl *Controller) getNodesForPool(pool *mcfgv1.MachineConfigPool) ([]*core // setClusterConfigAnnotation reads cluster configs set into controllerConfig // and add/updates required annotation to node such as ControlPlaneTopology // from infrastructure object. -func (ctrl *Controller) setClusterConfigAnnotation(nodes []*corev1.Node) error { - cc, err := ctrl.ccLister.Get(ctrlcommon.ControllerConfigName) - if err != nil { - return err - } - +func (ctrl *Controller) setClusterConfigAnnotation(nodes []*corev1.Node, controlPlaneTopology configv1.TopologyMode) error { for _, node := range nodes { - if node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] != string(cc.Spec.Infra.Status.ControlPlaneTopology) { + if node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] != string(controlPlaneTopology) { oldAnn := node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] _, err := internal.UpdateNodeRetry(ctrl.kubeClient.CoreV1().Nodes(), ctrl.nodeLister, node.Name, func(node *corev1.Node) { - node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] = string(cc.Spec.Infra.Status.ControlPlaneTopology) + node.Annotations[daemonconsts.ClusterControlPlaneTopologyAnnotationKey] = string(controlPlaneTopology) }) if err != nil { return err diff --git a/pkg/controller/node/node_controller_test.go b/pkg/controller/node/node_controller_test.go index f9b42152f5..c0537d9370 100644 --- a/pkg/controller/node/node_controller_test.go +++ b/pkg/controller/node/node_controller_test.go @@ -1772,3 +1772,74 @@ func filterLastTransitionTime(obj runtime.Object) runtime.Object { } return o } + +func TestArbiterPoolCoordination(t *testing.T) { + t.Parallel() + f := newFixture(t) + + // Create controller config with HighlyAvailableArbiterMode + cc := newControllerConfig(ctrlcommon.ControllerConfigName, configv1.HighlyAvailableArbiterMode) + f.ccLister = append(f.ccLister, cc) + f.objects = append(f.objects, cc) + + // Create master pool with new config + masterPool := helpers.NewMachineConfigPool(ctrlcommon.MachineConfigPoolMaster, nil, helpers.MasterSelector, machineConfigV1) + masterPool.Spec.Configuration.Name = machineConfigV2 + f.mcpLister = append(f.mcpLister, masterPool) + f.objects = append(f.objects, masterPool) + + // Create arbiter pool with new config + arbiterSelector := metav1.AddLabelToSelector(&metav1.LabelSelector{}, "node-role.kubernetes.io/arbiter", "") + arbiterPool := helpers.NewMachineConfigPool(ctrlcommon.MachineConfigPoolArbiter, nil, arbiterSelector, machineConfigV1) + arbiterPool.Spec.Configuration.Name = machineConfigV2 + f.mcpLister = append(f.mcpLister, arbiterPool) + f.objects = append(f.objects, arbiterPool) + + // Create master node with correct label format + masterNode := helpers.NewNodeWithReady("master-node-0", machineConfigV1, machineConfigV1, corev1.ConditionTrue) + masterNode.Labels = map[string]string{ + "node-role/master": "", + } + f.nodeLister = append(f.nodeLister, masterNode) + f.kubeobjects = append(f.kubeobjects, masterNode) + + // Create arbiter node + arbiterNode := helpers.NewNodeWithReady("arbiter-node-0", machineConfigV1, machineConfigV1, corev1.ConditionTrue) + arbiterNode.Labels = map[string]string{ + "node-role.kubernetes.io/arbiter": "", + } + f.nodeLister = append(f.nodeLister, arbiterNode) + f.kubeobjects = append(f.kubeobjects, arbiterNode) + + // Test: When master pool syncs in arbiter mode, it should coordinate both pools + // Expect status updates for both pools (arbiter first, then master) + f.expectUpdateMachineConfigPoolStatus(arbiterPool) + f.expectUpdateMachineConfigPoolStatus(masterPool) + + // Sync master pool - this should coordinate both pools + c := f.newController() + err := c.syncHandler(ctrlcommon.MachineConfigPoolMaster) + require.NoError(t, err) + + // Verify that both pools had their status updated + actions := filterInformerActions(f.client.Actions()) + statusUpdates := 0 + for _, action := range actions { + if action.Matches("update", "machineconfigpools") && action.GetSubresource() == "status" { + statusUpdates++ + } + } + // Should have status updates for both master and arbiter pools + assert.GreaterOrEqual(t, statusUpdates, 2, "Expected at least 2 status updates (master and arbiter pools)") + + // Verify that both nodes were patched (for desired config) + k8sActions := filterInformerActions(f.kubeclient.Actions()) + nodePatches := 0 + for _, action := range k8sActions { + if action.Matches("patch", "nodes") { + nodePatches++ + } + } + // Should have patches for both master and arbiter nodes + assert.GreaterOrEqual(t, nodePatches, 2, "Expected at least 2 node patches (master and arbiter nodes)") +}