Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 101 additions & 6 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"context"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -22,14 +23,14 @@ func SetupClusterReconciler(mgr ctrl.Manager) error {
beController := be.New(mgr.GetClient(), mgr.GetEventRecorderFor)
cnController := cn.New(mgr.GetClient(), mgr.GetEventRecorderFor)
feProxyController := feproxy.New(mgr.GetClient(), mgr.GetEventRecorderFor)
subcs := []subcontrollers.ClusterSubController{
feController, beController, cnController, feProxyController,
}

reconciler := &StarRocksClusterReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("starrockscluster-controller"),
Scs: subcs,
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("starrockscluster-controller"),
FeController: feController,
BeController: beController,
CnController: cnController,
FeProxyController: feProxyController,
}

if err := reconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -38,6 +39,100 @@ func SetupClusterReconciler(mgr ctrl.Manager) error {
return nil
}

// getControllersInOrder returns controllers in the appropriate order based on deployment scenario
func getControllersInOrder(
ctx context.Context,
client client.Client,
cluster *srapi.StarRocksCluster,
fe, be, cn, feproxy subcontrollers.ClusterSubController,
) []subcontrollers.ClusterSubController {
logger := logr.FromContextOrDiscard(ctx)

// Auto-detect upgrade scenario by checking if this is an image change
if isUpgrade(ctx, client, cluster) {
logger.Info("upgrade detected: using BE-first ordering")
// upgrade order: BE/CN -> FE
return []subcontrollers.ClusterSubController{be, cn, fe, feproxy}
}

logger.Info("initial deployment detected: using FE-first ordering")
// initial deployment (default) order: FE -> BE -> CN -> FeProxy
return []subcontrollers.ClusterSubController{fe, be, cn, feproxy}
}

// isUpgrade detects if this reconciliation is due to an upgrade scenario
// An upgrade is detected when:
// 1. The cluster status shows it's already running
// 2. The desired image versions differ from currently deployed versions
func isUpgrade(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool {
logger := logr.FromContextOrDiscard(ctx)

// Check if cluster is already running by looking at status
if cluster.Status.Phase != srapi.ClusterRunning {
logger.Info("cluster not in running state, assuming initial deployment", "currentPhase", cluster.Status.Phase)
return false
}

// Compare desired images with current images by checking StatefulSets
hasImageChanges := checkForImageChanges(ctx, kubeClient, cluster)
if hasImageChanges {
logger.Info("image changes detected in running cluster, this is an upgrade")
} else {
logger.Info("no image changes detected")
}

return hasImageChanges
}

// checkForImageChanges compares desired spec images with currently deployed StatefulSet images
func checkForImageChanges(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool {
// Check FE image changes
if cluster.Spec.StarRocksFeSpec != nil {
desiredImage := cluster.Spec.StarRocksFeSpec.Image
currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-fe")
if currentImage != "" && desiredImage != currentImage {
return true
}
}

// Check BE image changes
if cluster.Spec.StarRocksBeSpec != nil {
desiredImage := cluster.Spec.StarRocksBeSpec.Image
currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-be")
if currentImage != "" && desiredImage != currentImage {
return true
}
}

// Check CN image changes
if cluster.Spec.StarRocksCnSpec != nil {
desiredImage := cluster.Spec.StarRocksCnSpec.Image
currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-cn")
if currentImage != "" && desiredImage != currentImage {
return true
}
}

return false
}

// getCurrentImageFromStatefulSet gets the current image from a deployed StatefulSet
func getCurrentImageFromStatefulSet(ctx context.Context, kubeClient client.Client, namespace, name string) string {
var st appsv1.StatefulSet
err := kubeClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, &st)
if err != nil {
// StatefulSet doesn't exist yet, this is initial deployment
return ""
}

// Get image from first container (StarRocks container)
if len(st.Spec.Template.Spec.Containers) > 0 {
return st.Spec.Template.Spec.Containers[0].Image
}

return ""
}

// SetupWithManager sets up the controller with the Manager.
func (r *StarRocksClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
// cannot add Owns(&v2.HorizontalPodAutoscaler{}), because if a kubernetes version is lower than 1.23,
Expand Down
14 changes: 10 additions & 4 deletions pkg/controllers/starrockscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ import (
// StarRocksClusterReconciler reconciles a StarRocksCluster object
type StarRocksClusterReconciler struct {
client.Client
Recorder record.EventRecorder
Scs []subcontrollers.ClusterSubController
Recorder record.EventRecorder
FeController subcontrollers.ClusterSubController
BeController subcontrollers.ClusterSubController
CnController subcontrollers.ClusterSubController
FeProxyController subcontrollers.ClusterSubController
}

// +kubebuilder:rbac:groups=starrocks.com,resources=starrocksclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -86,8 +89,11 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, nil
}

// Get controllers in appropriate order based on deployment scenario
controllers := getControllersInOrder(ctx, r.Client, src, r.FeController, r.BeController, r.CnController, r.FeProxyController)

// subControllers reconcile for create or update component.
for _, rc := range r.Scs {
for _, rc := range controllers {
kvs := []interface{}{"subController", rc.GetControllerName()}
logger.Info("sub controller sync spec", kvs...)
if err = rc.SyncCluster(ctx, src); err != nil {
Expand All @@ -100,7 +106,7 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req
}
}

for _, rc := range r.Scs {
for _, rc := range controllers {
kvs := []interface{}{"subController", rc.GetControllerName()}
logger.Info("sub controller update status", kvs...)
if err = rc.UpdateClusterStatus(ctx, src); err != nil {
Expand Down
149 changes: 142 additions & 7 deletions pkg/controllers/starrockscluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,7 +33,6 @@ import (
srapi "github.com/StarRocks/starrocks-kubernetes-operator/pkg/apis/starrocks/v1"
rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe"
Expand All @@ -43,12 +43,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *StarRocksClusterR
srcController := &StarRocksClusterReconciler{}
srcController.Recorder = record.NewFakeRecorder(10)
srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...)
srcController.Scs = []subcontrollers.ClusterSubController{
fe.New(srcController.Client, fake.GetEventRecorderFor(nil)),
be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)),
cn.New(srcController.Client, fake.GetEventRecorderFor(nil)),
feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)),
}
srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil))
srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder))
srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil))
srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil))
return srcController
}

Expand Down Expand Up @@ -106,3 +104,140 @@ func TestReconcileConstructFeResource(t *testing.T) {
require.NoError(t, err)
require.Equal(t, reconcile.Result{}, res)
}

// Helper to create a basic cluster for upgrade tests
func newTestCluster(phase srapi.Phase, image string) *srapi.StarRocksCluster {
return &srapi.StarRocksCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: "default",
},
Spec: srapi.StarRocksClusterSpec{
StarRocksFeSpec: &srapi.StarRocksFeSpec{
StarRocksComponentSpec: srapi.StarRocksComponentSpec{
StarRocksLoadSpec: srapi.StarRocksLoadSpec{
Image: image,
},
},
},
},
Status: srapi.StarRocksClusterStatus{
Phase: phase,
},
}
}

// Helper to create a StatefulSet with a specific image for upgrade tests
func newTestStatefulSet(name string) *appsv1.StatefulSet {
return &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Spec: appsv1.StatefulSetSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Image: "starrocks/fe:3.1.0"},
},
},
},
},
}
}

// TestIsUpgrade tests the main upgrade detection logic
func TestIsUpgrade(t *testing.T) {
ctx := context.Background()

t.Run("not running cluster returns false", func(t *testing.T) {
cluster := newTestCluster("", "starrocks/fe:3.1.0")
client := fake.NewFakeClient(srapi.Scheme)

result := isUpgrade(ctx, client, cluster)
require.False(t, result)
})

t.Run("running cluster with no statefulset returns false", func(t *testing.T) {
cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.1.0")
client := fake.NewFakeClient(srapi.Scheme)

result := isUpgrade(ctx, client, cluster)
require.False(t, result)
})

t.Run("running cluster with same image returns false", func(t *testing.T) {
cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.1.0")
sts := newTestStatefulSet("test-cluster-fe")
client := fake.NewFakeClient(srapi.Scheme, sts)

result := isUpgrade(ctx, client, cluster)
require.False(t, result)
})

t.Run("running cluster with different image returns true", func(t *testing.T) {
cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0")
sts := newTestStatefulSet("test-cluster-fe")
client := fake.NewFakeClient(srapi.Scheme, sts)

result := isUpgrade(ctx, client, cluster)
require.True(t, result)
})
}

// TestGetCurrentImageFromStatefulSet tests image retrieval from StatefulSets
func TestGetCurrentImageFromStatefulSet(t *testing.T) {
ctx := context.Background()

t.Run("missing statefulset returns empty", func(t *testing.T) {
client := fake.NewFakeClient(srapi.Scheme)
result := getCurrentImageFromStatefulSet(ctx, client, "default", "test-fe")
require.Equal(t, "", result)
})

t.Run("existing statefulset returns image", func(t *testing.T) {
sts := newTestStatefulSet("test-fe")
client := fake.NewFakeClient(srapi.Scheme, sts)

result := getCurrentImageFromStatefulSet(ctx, client, "default", "test-fe")
require.Equal(t, "starrocks/fe:3.1.0", result)
})
}

// TestGetControllersInOrder tests controller ordering based on deployment scenario
func TestGetControllersInOrder(t *testing.T) {
ctx := context.Background()
feCtrl := fe.New(nil, nil)
beCtrl := be.New(nil, nil)
cnCtrl := cn.New(nil, nil)
feProxyCtrl := feproxy.New(nil, nil)

t.Run("initial deployment uses FE-first order", func(t *testing.T) {
cluster := newTestCluster("", "starrocks/fe:3.1.0")
client := fake.NewFakeClient(srapi.Scheme)

controllers := getControllersInOrder(ctx, client, cluster, feCtrl, beCtrl, cnCtrl, feProxyCtrl)

// Check FE is first
require.Equal(t, "fe", controllers[0].GetControllerName())
// Verify order: FE -> BE -> CN -> FeProxy
require.Equal(t, "be", controllers[1].GetControllerName())
require.Equal(t, "cn", controllers[2].GetControllerName())
require.Equal(t, "feproxy", controllers[3].GetControllerName())
})

t.Run("upgrade uses BE-first order", func(t *testing.T) {
cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0")
sts := newTestStatefulSet("test-cluster-fe")
client := fake.NewFakeClient(srapi.Scheme, sts)

controllers := getControllersInOrder(ctx, client, cluster, feCtrl, beCtrl, cnCtrl, feProxyCtrl)

// Check BE is first
require.Equal(t, "be", controllers[0].GetControllerName())
// Verify order: BE -> CN -> FE -> FeProxy (StarRocks upgrade procedure)
require.Equal(t, "cn", controllers[1].GetControllerName())
require.Equal(t, "fe", controllers[2].GetControllerName())
require.Equal(t, "feproxy", controllers[3].GetControllerName())
})
}
11 changes: 4 additions & 7 deletions pkg/subcontrollers/be/be_controller_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/controllers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe"
Expand All @@ -29,12 +28,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarR
srcController := &controllers.StarRocksClusterReconciler{}
srcController.Recorder = record.NewFakeRecorder(10)
srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...)
srcController.Scs = []subcontrollers.ClusterSubController{
fe.New(srcController.Client, fake.GetEventRecorderFor(nil)),
be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)),
cn.New(srcController.Client, fake.GetEventRecorderFor(nil)),
feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)),
}
srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil))
srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder))
srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil))
srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil))
return srcController
}

Expand Down
11 changes: 4 additions & 7 deletions pkg/subcontrollers/cn/cn_controller_reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/controllers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe"
Expand All @@ -30,12 +29,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarR
srcController := &controllers.StarRocksClusterReconciler{}
srcController.Recorder = record.NewFakeRecorder(10)
srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...)
srcController.Scs = []subcontrollers.ClusterSubController{
fe.New(srcController.Client, fake.GetEventRecorderFor(nil)),
be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)),
cn.New(srcController.Client, fake.GetEventRecorderFor(nil)),
feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)),
}
srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil))
srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder))
srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil))
srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil))
return srcController
}

Expand Down
Loading