Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions cmd/beekeeper/cmd/nuke.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,23 @@ const (
optionNameRestartArgs = "restart-args"
optionNameUseRandomNeighboorhood = "use-random-neighborhood"
optionNameDeploymentType = "deployment-type"
optionNameStatefulSets = "stateful-sets"
beeLabelSelector = "app.kubernetes.io/name=bee"
)

func (c *command) initNukeCmd() (err error) {
cmd := &cobra.Command{
Use: "nuke",
Short: "Clears databases and restarts Bee.",
Example: `beekeeper nuke --cluster-name=default --restart-args="bee,start,--config=.bee.yaml"`,
Use: "nuke",
Short: "Clears databases and restarts Bee.",
Example: `beekeeper nuke --cluster-name=default --restart-args="bee,start,--config=.bee.yaml"
beekeeper nuke --namespace=my-namespace --stateful-sets="bootnode-0,bootnode-1" --restart-args="bee,start,--config=.bee.yaml"
beekeeper nuke --namespace=my-namespace --restart-args="bee,start,--config=.bee.yaml" --label-selector="custom-label=bee-node"`,
Long: `Executes a database nuke operation across Bee nodes in a Kubernetes cluster, forcing each node to resynchronize all data on next startup.
This command provides StatefulSet update and rollback procedures to maintain cluster stability during the nuke process, ensuring safe and coordinated resets of node state.`,
This command provides StatefulSet update and rollback procedures to maintain cluster stability during the nuke process, ensuring safe and coordinated resets of node state.

The command supports two modes:
- Default mode: Uses NodeProvider to find Bee nodes (requires ingress/services)
- StatefulSet names mode: Directly targets specific StatefulSets by name (useful for bootnodes)`,
RunE: func(cmd *cobra.Command, args []string) (err error) {
return c.withTimeoutHandler(cmd, func(ctx context.Context) error {
if !c.globalConfig.GetBool(optionNameEnableK8S) {
Expand All @@ -42,10 +49,21 @@ func (c *command) initNukeCmd() (err error) {
UseRandomNeighborhood: c.globalConfig.GetBool(optionNameUseRandomNeighboorhood),
})

if err := nukerClient.Run(ctx, c.globalConfig.GetStringSlice(optionNameRestartArgs)); err != nil {
return fmt.Errorf("running nuke command: %w", err)
statefulSetNames := c.globalConfig.GetStringSlice(optionNameStatefulSets)
restartArgs := c.globalConfig.GetStringSlice(optionNameRestartArgs)
if len(statefulSetNames) > 0 {
namespace := nodeClient.Namespace()
if err := nukerClient.NukeByStatefulSets(ctx, namespace, statefulSetNames, restartArgs); err != nil {
return fmt.Errorf("running nuke command with StatefulSet names: %w", err)
}
} else {
if err := nukerClient.Run(ctx, restartArgs); err != nil {
return fmt.Errorf("running nuke command: %w", err)
}
}

c.log.Infof("nuke command completed")

return nil
})
},
Expand All @@ -60,6 +78,7 @@ func (c *command) initNukeCmd() (err error) {
cmd.Flags().StringSlice(optionNameRestartArgs, []string{"bee", "start", "--config=.bee.yaml"}, "Command to run in the Bee cluster, e.g. 'db,nuke,--config=.bee.yaml'")
cmd.Flags().Bool(optionNameUseRandomNeighboorhood, false, "Use random neighborhood for Bee nodes (default: false)")
cmd.Flags().String(optionNameDeploymentType, string(node.DeploymentTypeBeekeeper), "Indicates how the cluster was deployed: 'beekeeper' or 'helm'.")
cmd.Flags().StringSlice(optionNameStatefulSets, nil, "List of StatefulSet names to target for nuke (e.g., 'bootnode-0,bootnode-1'). When provided, uses direct StatefulSet targeting instead of NodeProvider.")

c.root.AddCommand(cmd)

Expand Down
10 changes: 5 additions & 5 deletions pkg/k8s/pod/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s PodRecreationState) String() string {
// WaitForPodRecreationAndCompletion waits for a pod to go through the complete lifecycle:
// DELETED -> ADDED -> RUNNING -> COMPLETED
func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespace, podName string) error {
c.log.Infof("waiting for pod %s to complete recreation and execution lifecycle", podName)
c.log.Debugf("waiting for pod %s to complete recreation and execution lifecycle", podName)
defer c.log.Debugf("watch for pod %s in namespace %s done", podName, namespace)

watcher, err := c.clientset.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
Expand All @@ -224,7 +224,7 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac

// Initialize state machine
currentState := WaitingForDeletion
c.log.Infof("starting pod recreation lifecycle watch for %s, initial state: %s", podName, currentState)
c.log.Debugf("starting pod recreation lifecycle watch for %s, initial state: %s", podName, currentState)

for {
select {
Expand All @@ -235,12 +235,12 @@ func (c *Client) WaitForPodRecreationAndCompletion(ctx context.Context, namespac
}

if newState != currentState {
c.log.Infof("pod %s transitioning from %s to %s", podName, currentState, newState)
c.log.Debugf("pod %s transitioning from %s to %s", podName, currentState, newState)
currentState = newState
}

if currentState == Completed {
c.log.Infof("pod %s container completed successfully", podName)
c.log.Debugf("pod %s container completed successfully", podName)
return nil
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func (c *Client) WaitForRunning(ctx context.Context, namespace, podName string)
}

if pod.Status.Phase == v1.PodRunning {
c.log.Infof("pod %s has started and is in phase Running.", podName)
c.log.Debugf("pod %s has started and is in phase Running.", podName)
return true, nil
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/k8s/statefulset/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ func (c *Client) StatefulSets(ctx context.Context, namespace, labelSelector stri
func (c *Client) Get(ctx context.Context, name, namespace string) (*appsv1.StatefulSet, error) {
statefulSet, err := c.clientset.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("getting statefulset %s in namespace %s: %w", name, namespace, err)
}
return statefulSet, nil
Expand Down
67 changes: 59 additions & 8 deletions pkg/nuker/nuker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethersphere/beekeeper/pkg/node"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/retry"
)

Expand Down Expand Up @@ -75,7 +76,9 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) {
}

// 2. Iterate through each StatefulSet and apply the update and rollback procedure concurrently using errgroup.
c.log.Debugf("found %d stateful sets to update", len(statefulSetsMap))
c.log.Infof("found %d stateful sets to update", len(statefulSetsMap))

count := 0

for name, ss := range statefulSetsMap {
// Skip StatefulSets with 0 replicas
Expand All @@ -85,7 +88,7 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) {
}

if neighborhoodArgProvider.UsesRandomNeighborhood() && *ss.Spec.Replicas != 1 {
return errors.New("random neighborhood provider requires exactly one pod (replica) in the StatefulSet")
c.log.Warningf("stateful set %s has %d replicas, but random neighborhood is enabled; all pods will receive the same neighborhood value", name, *ss.Spec.Replicas)
}

podNames := getPodNames(ss)
Expand All @@ -99,9 +102,57 @@ func (c *Client) Run(ctx context.Context, restartArgs []string) (err error) {
if err := c.updateAndRollbackStatefulSet(ctx, namespace, ss, args); err != nil {
return fmt.Errorf("failed to update stateful set %s: %w", name, err)
}
count++
c.log.Infof("successfully updated stateful set %s", name)
}

c.log.Infof("nuked %d stateful sets", count)

return nil
}

// NukeByStatefulSets sends a nuke command to the specified StatefulSets by name
func (c *Client) NukeByStatefulSets(ctx context.Context, namespace string, statefulSetNames []string, restartArgs []string) error {
c.log.Info("starting Bee cluster nuke by StatefulSet names")

if len(statefulSetNames) == 0 {
return errors.New("stateful set names cannot be empty")
}

statefulSetsMap := make(map[string]*v1.StatefulSet)

for _, name := range statefulSetNames {
statefulSet, err := c.k8sClient.StatefulSet.Get(ctx, name, namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When passing a non-existent StatefulSet, the command triggers a panic error:
./dist/beekeeper nuke --namespace=local --stateful-sets="non-existant-bootnode" --restart-args="bee,start,--config=.bee.yaml"

In this case, the statefulSet is nil, which causes the panic.

Copy link
Member Author

@gacevicljubisa gacevicljubisa Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will make a warning and skip

if err != nil {
if k8serrors.IsNotFound(err) {
c.log.Warningf("stateful set %s not found, skipping", name)
continue
}
return fmt.Errorf("failed to get stateful set %s: %w", name, err)
}
statefulSetsMap[name] = statefulSet
}

c.log.Infof("found %d stateful sets to update", len(statefulSetsMap))

count := 0

for name, ss := range statefulSetsMap {
if ss.Spec.Replicas == nil || *ss.Spec.Replicas == 0 {
c.log.Infof("skipping stateful set %s: no replicas", name)
continue
}

c.log.Debugf("updating stateful set %s, with args: %v", name, restartArgs)
if err := c.updateAndRollbackStatefulSet(ctx, namespace, ss, restartArgs); err != nil {
return fmt.Errorf("failed to update stateful set %s: %w", name, err)
}
count++
c.log.Infof("successfully updated stateful set %s", name)
}

c.log.Infof("nuked %d stateful sets", count)

return nil
}

Expand Down Expand Up @@ -133,7 +184,7 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str

// Ensure rollback happens regardless of success or failure
defer func() {
c.log.Infof("rolling back stateful set %s to its original configuration", ss.Name)
c.log.Debugf("rolling back stateful set %s to its original configuration", ss.Name)
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Fetch the latest version of the StatefulSet before updating.
latestSS, err := c.k8sClient.StatefulSet.Get(ctx, ss.Name, namespace)
Expand All @@ -154,12 +205,12 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str
}

// Sequentially delete each pod again to trigger the rollback.
c.log.Infof("deleting pods in stateful set %s to trigger rollback", ss.Name)
c.log.Debugf("deleting pods in stateful set %s to trigger rollback", ss.Name)
if err := c.recreatePodsAndWait(ctx, namespace, ss, c.k8sClient.Pods.WaitForRunning); err != nil {
c.log.Errorf("failed during pod rollback for %s: %v", ss.Name, err)
return
}
c.log.Infof("all pods for %s have been rolled back and are ready", ss.Name)
c.log.Debugf("all pods for %s have been rolled back and are ready", ss.Name)
}()

// 2. Modify the StatefulSet for the update task using a retry loop.
Expand All @@ -184,11 +235,11 @@ func (c *Client) updateAndRollbackStatefulSet(ctx context.Context, namespace str
}

// 3. Sequentially delete each pod and wait for it to be recreated and complete the task.
c.log.Infof("deleting pods in stateful set %s to trigger update task", ss.Name)
c.log.Debugf("deleting pods in stateful set %s to trigger update task", ss.Name)
if err := c.recreatePodsAndWait(ctx, namespace, ss, c.k8sClient.Pods.WaitForPodRecreationAndCompletion); err != nil {
return fmt.Errorf("failed during pod update task for %s: %w", ss.Name, err)
}
c.log.Infof("all pods for %s completed the update task", ss.Name)
c.log.Debugf("all pods for %s completed the update task", ss.Name)

return nil
}
Expand Down Expand Up @@ -217,7 +268,7 @@ func (c *Client) recreatePodsAndWait(ctx context.Context, namespace string, ss *
if err := waitFunc(ctx, namespace, podName); err != nil {
return fmt.Errorf("failed to wait for pod %s: %w", podName, err)
}
c.log.Infof("pod %s is ready", podName)
c.log.Debugf("pod %s is ready", podName)
return nil
})
}
Expand Down