From 78b631b8671f8309a7d63f75725ce7287fa741ed Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 17 Oct 2025 12:27:48 +0200 Subject: [PATCH 1/3] feat(nuke): add option for nuke of stateful sets by name --- cmd/beekeeper/cmd/nuke.go | 31 +++++++++++++++++++----- pkg/k8s/pod/client.go | 10 ++++---- pkg/nuker/nuker.go | 50 +++++++++++++++++++++++++++++++++------ 3 files changed, 73 insertions(+), 18 deletions(-) diff --git a/cmd/beekeeper/cmd/nuke.go b/cmd/beekeeper/cmd/nuke.go index 0e17f89d..718d5ffd 100644 --- a/cmd/beekeeper/cmd/nuke.go +++ b/cmd/beekeeper/cmd/nuke.go @@ -14,16 +14,23 @@ const ( optionNameRestartArgs = "restart-args" optionNameUseRandomNeighboorhood = "use-random-neighborhood" optionNameDeploymentType = "deployment-type" + optionNameStatefulSets = "stateful-sets" beeLabelSelector = "app.kubernetes.io/name=bee" ) func (c *command) initNukeCmd() (err error) { cmd := &cobra.Command{ - Use: "nuke", - Short: "Clears databases and restarts Bee.", - Example: `beekeeper nuke --cluster-name=default --restart-args="bee,start,--config=.bee.yaml"`, + Use: "nuke", + Short: "Clears databases and restarts Bee.", + Example: `beekeeper nuke --cluster-name=default --restart-args="bee,start,--config=.bee.yaml" +beekeeper nuke --namespace=my-namespace --stateful-sets="bootnode-0,bootnode-1" --restart-args="bee,start,--config=.bee.yaml" +beekeeper nuke --namespace=my-namespace --restart-args="bee,start,--config=.bee.yaml" --label-selector="custom-label=bee-node"`, Long: `Executes a database nuke operation across Bee nodes in a Kubernetes cluster, forcing each node to resynchronize all data on next startup. - This command provides StatefulSet update and rollback procedures to maintain cluster stability during the nuke process, ensuring safe and coordinated resets of node state.`, +This command provides StatefulSet update and rollback procedures to maintain cluster stability during the nuke process, ensuring safe and coordinated resets of node state. + +The command supports two modes: +- Default mode: Uses NodeProvider to find Bee nodes (requires ingress/services) +- StatefulSet names mode: Directly targets specific StatefulSets by name (useful for bootnodes)`, RunE: func(cmd *cobra.Command, args []string) (err error) { return c.withTimeoutHandler(cmd, func(ctx context.Context) error { if !c.globalConfig.GetBool(optionNameEnableK8S) { @@ -42,8 +49,19 @@ func (c *command) initNukeCmd() (err error) { UseRandomNeighborhood: c.globalConfig.GetBool(optionNameUseRandomNeighboorhood), }) - if err := nukerClient.Run(ctx, c.globalConfig.GetStringSlice(optionNameRestartArgs)); err != nil { - return fmt.Errorf("running nuke command: %w", err) + statefulSetNames := c.globalConfig.GetStringSlice(optionNameStatefulSets) + restartArgs := c.globalConfig.GetStringSlice(optionNameRestartArgs) + if len(statefulSetNames) > 0 { + namespace := nodeClient.Namespace() + if err := nukerClient.NukeByStatefulSets(ctx, namespace, statefulSetNames, restartArgs); err != nil { + return fmt.Errorf("running nuke command with StatefulSet names: %w", err) + } + c.log.Infof("successfully nuked StatefulSets: %v", statefulSetNames) + } else { + if err := nukerClient.Run(ctx, restartArgs); err != nil { + return fmt.Errorf("running nuke command: %w", err) + } + c.log.Info("successfully nuked Bee nodes in the cluster") } return nil @@ -60,6 +78,7 @@ func (c *command) initNukeCmd() (err error) { cmd.Flags().StringSlice(optionNameRestartArgs, []string{"bee", "start", "--config=.bee.yaml"}, "Command to run in the Bee cluster, e.g. 'db,nuke,--config=.bee.yaml'") cmd.Flags().Bool(optionNameUseRandomNeighboorhood, false, "Use random neighborhood for Bee nodes (default: false)") cmd.Flags().String(optionNameDeploymentType, string(node.DeploymentTypeBeekeeper), "Indicates how the cluster was deployed: 'beekeeper' or 'helm'.") + cmd.Flags().StringSlice(optionNameStatefulSets, nil, "List of StatefulSet names to target for nuke (e.g., 'bootnode-0,bootnode-1'). When provided, uses direct StatefulSet targeting instead of NodeProvider.") c.root.AddCommand(cmd) diff --git a/pkg/k8s/pod/client.go b/pkg/k8s/pod/client.go index aa4203a5..ae07294d 100644 --- a/pkg/k8s/pod/client.go +++ b/pkg/k8s/pod/client.go @@ -208,7 +208,7 @@ func (s PodRecreationState) String() string { // WaitForPodRecreationAndCompletion waits for a pod to go through the complete lifecycle: // DELETED -> ADDED -> RUNNING -> COMPLETED func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespace, podName string) error { - c.log.Infof("waiting for pod %s to complete recreation and execution lifecycle", podName) + c.log.Debugf("waiting for pod %s to complete recreation and execution lifecycle", podName) defer c.log.Debugf("watch for pod %s in namespace %s done", podName, namespace) watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ @@ -224,7 +224,7 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac // Initialize state machine currentState := WaitingForDeletion - c.log.Infof("starting pod recreation lifecycle watch for %s, initial state: %s", podName, currentState) + c.log.Debugf("starting pod recreation lifecycle watch for %s, initial state: %s", podName, currentState) for { select { @@ -235,12 +235,12 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac } if newState != currentState { - c.log.Infof("pod %s transitioning from %s to %s", podName, currentState, newState) + c.log.Debugf("pod %s transitioning from %s to %s", podName, currentState, newState) currentState = newState } if currentState == Completed { - c.log.Infof("pod %s container completed successfully", podName) + c.log.Debugf("pod %s container completed successfully", podName) return nil } @@ -311,7 +311,7 @@ func (c *Client) WaitForRunning(ctx context.Context, namespace, podName string) } if pod.Status.Phase == v1.PodRunning { - c.log.Infof("pod %s has started and is in phase Running.", podName) + c.log.Debugf("pod %s has started and is in phase Running.", podName) return true, nil } diff --git a/pkg/nuker/nuker.go b/pkg/nuker/nuker.go index b030236d..e7032f2c 100644 --- a/pkg/nuker/nuker.go +++ b/pkg/nuker/nuker.go @@ -75,7 +75,7 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) { } // 2. Iterate through each StatefulSet and apply the update and rollback procedure concurrently using errgroup. - c.log.Debugf("found %d stateful sets to update", len(statefulSetsMap)) + c.log.Infof("found %d stateful sets to update", len(statefulSetsMap)) for name, ss := range statefulSetsMap { // Skip StatefulSets with 0 replicas @@ -105,6 +105,42 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) { return nil } +// NukeByStatefulSets sends a nuke command to the specified StatefulSets by name +func (c *Client) NukeByStatefulSets(ctx context.Context, namespace string, statefulSetNames []string, restartArgs []string) error { + c.log.Info("starting Bee cluster nuke by StatefulSet names") + + if len(statefulSetNames) == 0 { + return errors.New("stateful set names cannot be empty") + } + + statefulSetsMap := make(map[string]*v1.StatefulSet) + + for _, name := range statefulSetNames { + statefulSet, err := c.k8sClient.StatefulSet.Get(ctx, name, namespace) + if err != nil { + return fmt.Errorf("failed to get stateful set %s: %w", name, err) + } + statefulSetsMap[name] = statefulSet + } + + c.log.Infof("found %d stateful sets to update", len(statefulSetsMap)) + + for name, ss := range statefulSetsMap { + if ss.Spec.Replicas == nil || *ss.Spec.Replicas == 0 { + c.log.Infof("skipping stateful set %s: no replicas", name) + continue + } + + c.log.Debugf("updating stateful set %s, with args: %v", name, restartArgs) + if err := c.updateAndRollbackStatefulSet(ctx, namespace, ss, restartArgs); err != nil { + return fmt.Errorf("failed to update stateful set %s: %w", name, err) + } + c.log.Infof("successfully updated stateful set %s", name) + } + + return nil +} + func (c *Client) findStatefulSets(ctx context.Context, nodes node.NodeList, namespace string) (map[string]*v1.StatefulSet, error) { statefulSetsMap := make(map[string]*v1.StatefulSet) @@ -133,7 +169,7 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str // Ensure rollback happens regardless of success or failure defer func() { - c.log.Infof("rolling back stateful set %s to its original configuration", ss.Name) + c.log.Debugf("rolling back stateful set %s to its original configuration", ss.Name) if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { // Fetch the latest version of the StatefulSet before updating. latestSS, err := c.k8sClient.StatefulSet.Get(ctx, ss.Name, namespace) @@ -154,12 +190,12 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str } // Sequentially delete each pod again to trigger the rollback. - c.log.Infof("deleting pods in stateful set %s to trigger rollback", ss.Name) + c.log.Debugf("deleting pods in stateful set %s to trigger rollback", ss.Name) if err := c.recreatePodsAndWait(ctx, namespace, ss, c.k8sClient.Pods.WaitForRunning); err != nil { c.log.Errorf("failed during pod rollback for %s: %v", ss.Name, err) return } - c.log.Infof("all pods for %s have been rolled back and are ready", ss.Name) + c.log.Debugf("all pods for %s have been rolled back and are ready", ss.Name) }() // 2. Modify the StatefulSet for the update task using a retry loop. @@ -184,11 +220,11 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str } // 3. Sequentially delete each pod and wait for it to be recreated and complete the task. - c.log.Infof("deleting pods in stateful set %s to trigger update task", ss.Name) + c.log.Debugf("deleting pods in stateful set %s to trigger update task", ss.Name) if err := c.recreatePodsAndWait(ctx, namespace, ss, c.k8sClient.Pods.WaitForPodRecreationAndCompletion); err != nil { return fmt.Errorf("failed during pod update task for %s: %w", ss.Name, err) } - c.log.Infof("all pods for %s completed the update task", ss.Name) + c.log.Debugf("all pods for %s completed the update task", ss.Name) return nil } @@ -217,7 +253,7 @@ func (c *Client) recreatePodsAndWait(ctx context.Context, namespace string, ss * if err := waitFunc(ctx, namespace, podName); err != nil { return fmt.Errorf("failed to wait for pod %s: %w", podName, err) } - c.log.Infof("pod %s is ready", podName) + c.log.Debugf("pod %s is ready", podName) return nil }) } From c975dc14bcb55a8cd325990065560e72669475b4 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 21 Oct 2025 16:33:03 +0200 Subject: [PATCH 2/3] feat(nuke): enable random neighborhood for multiple replicas --- pkg/nuker/nuker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/nuker/nuker.go b/pkg/nuker/nuker.go index e7032f2c..29b5962f 100644 --- a/pkg/nuker/nuker.go +++ b/pkg/nuker/nuker.go @@ -85,7 +85,7 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) { } if neighborhoodArgProvider.UsesRandomNeighborhood() && *ss.Spec.Replicas != 1 { - return errors.New("random neighborhood provider requires exactly one pod (replica) in the StatefulSet") + c.log.Warningf("stateful set %s has %d replicas, but random neighborhood is enabled; all pods will receive the same neighborhood value", name, *ss.Spec.Replicas) } podNames := getPodNames(ss) From 460f8891bed1cf867fa07c580498c02e6b5e3f2e Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 21 Oct 2025 16:52:52 +0200 Subject: [PATCH 3/3] fix(nuke): avoid panic when statefulset not found --- cmd/beekeeper/cmd/nuke.go | 4 ++-- pkg/k8s/statefulset/client.go | 3 --- pkg/nuker/nuker.go | 15 +++++++++++++++ 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/cmd/beekeeper/cmd/nuke.go b/cmd/beekeeper/cmd/nuke.go index 718d5ffd..f5446c42 100644 --- a/cmd/beekeeper/cmd/nuke.go +++ b/cmd/beekeeper/cmd/nuke.go @@ -56,14 +56,14 @@ The command supports two modes: if err := nukerClient.NukeByStatefulSets(ctx, namespace, statefulSetNames, restartArgs); err != nil { return fmt.Errorf("running nuke command with StatefulSet names: %w", err) } - c.log.Infof("successfully nuked StatefulSets: %v", statefulSetNames) } else { if err := nukerClient.Run(ctx, restartArgs); err != nil { return fmt.Errorf("running nuke command: %w", err) } - c.log.Info("successfully nuked Bee nodes in the cluster") } + c.log.Infof("nuke command completed") + return nil }) }, diff --git a/pkg/k8s/statefulset/client.go b/pkg/k8s/statefulset/client.go index cc931add..caf11b47 100644 --- a/pkg/k8s/statefulset/client.go +++ b/pkg/k8s/statefulset/client.go @@ -136,9 +136,6 @@ func (c *Client) StatefulSets(ctx context.Context, namespace, labelSelector stri func (c *Client) Get(ctx context.Context, name, namespace string) (*appsv1.StatefulSet, error) { statefulSet, err := c.clientset.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { - if errors.IsNotFound(err) { - return nil, nil - } return nil, fmt.Errorf("getting statefulset %s in namespace %s: %w", name, namespace, err) } return statefulSet, nil diff --git a/pkg/nuker/nuker.go b/pkg/nuker/nuker.go index 29b5962f..5019e6fb 100644 --- a/pkg/nuker/nuker.go +++ b/pkg/nuker/nuker.go @@ -11,6 +11,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/node" "golang.org/x/sync/errgroup" v1 "k8s.io/api/apps/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/retry" ) @@ -77,6 +78,8 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) { // 2. Iterate through each StatefulSet and apply the update and rollback procedure concurrently using errgroup. c.log.Infof("found %d stateful sets to update", len(statefulSetsMap)) + count := 0 + for name, ss := range statefulSetsMap { // Skip StatefulSets with 0 replicas if ss.Spec.Replicas == nil || *ss.Spec.Replicas == 0 { @@ -99,9 +102,12 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) { if err := c.updateAndRollbackStatefulSet(ctx, namespace, ss, args); err != nil { return fmt.Errorf("failed to update stateful set %s: %w", name, err) } + count++ c.log.Infof("successfully updated stateful set %s", name) } + c.log.Infof("nuked %d stateful sets", count) + return nil } @@ -118,6 +124,10 @@ func (c *Client) NukeByStatefulSets(ctx context.Context, namespace string, state for _, name := range statefulSetNames { statefulSet, err := c.k8sClient.StatefulSet.Get(ctx, name, namespace) if err != nil { + if k8serrors.IsNotFound(err) { + c.log.Warningf("stateful set %s not found, skipping", name) + continue + } return fmt.Errorf("failed to get stateful set %s: %w", name, err) } statefulSetsMap[name] = statefulSet @@ -125,6 +135,8 @@ func (c *Client) NukeByStatefulSets(ctx context.Context, namespace string, state c.log.Infof("found %d stateful sets to update", len(statefulSetsMap)) + count := 0 + for name, ss := range statefulSetsMap { if ss.Spec.Replicas == nil || *ss.Spec.Replicas == 0 { c.log.Infof("skipping stateful set %s: no replicas", name) @@ -135,9 +147,12 @@ func (c *Client) NukeByStatefulSets(ctx context.Context, namespace string, state if err := c.updateAndRollbackStatefulSet(ctx, namespace, ss, restartArgs); err != nil { return fmt.Errorf("failed to update stateful set %s: %w", name, err) } + count++ c.log.Infof("successfully updated stateful set %s", name) } + c.log.Infof("nuked %d stateful sets", count) + return nil }