From 7e4c4c0b0fe8a34b489cc865c4d892d10359d805 Mon Sep 17 00:00:00 2001 From: jmjm15x Date: Thu, 9 Oct 2025 09:32:14 -0700 Subject: [PATCH 1/4] fix: implement upgrade sequencing with StatefulSet readiness validation Fixes upgrade sequence issues and prevents premature component updates Key changes: - Add isUpgrade() detection based on StatefulSet existence - Implement getControllersInOrder() for scenario-based sequencing - Add isComponentReady() with endpoint, generation, rollout, and replica checks - Detect and log corrupted state (BE without FE) with recovery attempt Signed-off-by: jmjm15x --- pkg/controllers/controllers.go | 170 ++++++- .../starrockscluster_controller.go | 63 ++- .../starrockscluster_controller_test.go | 445 +++++++++++++++++- .../be/be_controller_reconcile_test.go | 11 +- .../cn/cn_controller_reconcile_test.go | 11 +- .../fe/fe_controller_reconcile_test.go | 11 +- .../feproxy/feproxy_controller_test.go | 16 +- 7 files changed, 680 insertions(+), 47 deletions(-) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index db75ba02..bb97ea26 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -3,13 +3,16 @@ package controllers import ( "context" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" srapi "github.com/StarRocks/starrocks-kubernetes-operator/pkg/apis/starrocks/v1" + rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/templates/service" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" @@ -17,19 +20,25 @@ import ( "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/feproxy" ) +const ( + componentTypeFE = "fe" + componentTypeBE = "be" + componentTypeCN = "cn" +) + func SetupClusterReconciler(mgr ctrl.Manager) error { feController := fe.New(mgr.GetClient(), mgr.GetEventRecorderFor) beController := be.New(mgr.GetClient(), mgr.GetEventRecorderFor) cnController := cn.New(mgr.GetClient(), mgr.GetEventRecorderFor) feProxyController := feproxy.New(mgr.GetClient(), mgr.GetEventRecorderFor) - subcs := []subcontrollers.ClusterSubController{ - feController, beController, cnController, feProxyController, - } reconciler := &StarRocksClusterReconciler{ - Client: mgr.GetClient(), - Recorder: mgr.GetEventRecorderFor("starrockscluster-controller"), - Scs: subcs, + Client: mgr.GetClient(), + Recorder: mgr.GetEventRecorderFor("starrockscluster-controller"), + FeController: feController, + BeController: beController, + CnController: cnController, + FeProxyController: feProxyController, } if err := reconciler.SetupWithManager(mgr); err != nil { @@ -38,6 +47,52 @@ func SetupClusterReconciler(mgr ctrl.Manager) error { return nil } +// getControllersInOrder returns controllers in the appropriate order based on deployment scenario +func getControllersInOrder( + isUpgradeScenario bool, + fe, be, cn, feproxy subcontrollers.ClusterSubController, +) []subcontrollers.ClusterSubController { + if isUpgradeScenario { + return []subcontrollers.ClusterSubController{be, cn, fe, feproxy} + } + + // default order + return []subcontrollers.ClusterSubController{fe, be, cn, feproxy} +} + +// isUpgrade determines if the current reconciliation is an upgrade scenario. +func isUpgrade(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool { + logger := logr.FromContextOrDiscard(ctx) + + // Check FE first (always required in StarRocks) + feSts := &appsv1.StatefulSet{} + feExists := kubeClient.Get(ctx, types.NamespacedName{ + Namespace: cluster.Namespace, + Name: cluster.Name + "-fe", + }, feSts) == nil + + beSts := &appsv1.StatefulSet{} + beExists := kubeClient.Get(ctx, types.NamespacedName{ + Namespace: cluster.Namespace, + Name: cluster.Name + "-be", + }, beSts) == nil + + // Corrupted state safeguard: BE exists but FE doesn't (invalid configuration). + // Treat as initial deployment so FE is reconciled first. + // Rationale: FE is a prerequisite for BE/CN; prioritizing FE allows recovery without misordering. + if beExists && !feExists { + logger.Info("WARNING: BE StatefulSet exists without FE - treating as initial deployment to recreate FE first") + return false + } + + if feExists { + return true + } + + // No StatefulSets found - this is initial deployment + return false +} + // SetupWithManager sets up the controller with the Manager. func (r *StarRocksClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { // cannot add Owns(&v2.HorizontalPodAutoscaler{}), because if a kubernetes version is lower than 1.23, @@ -82,6 +137,109 @@ func SetupWarehouseReconciler(mgr ctrl.Manager, namespace string) error { return nil } +// isComponentReady checks if a component is ready by verifying: +// 1. Its service endpoints have ready addresses (pods are healthy) +// 2. Its StatefulSet rollout is complete (no pending updates) +func isComponentReady(ctx context.Context, k8sClient client.Client, cluster *srapi.StarRocksCluster, componentType string) bool { + logger := logr.FromContextOrDiscard(ctx) + + var serviceName string + var statefulSetName string + + switch componentType { + case componentTypeFE: + if cluster.Spec.StarRocksFeSpec == nil { + return true // Component not configured, consider it ready + } + serviceName = rutils.ExternalServiceName(cluster.Name, (*srapi.StarRocksFeSpec)(nil)) + statefulSetName = cluster.Name + "-fe" + case componentTypeBE: + if cluster.Spec.StarRocksBeSpec == nil { + return true + } + serviceName = rutils.ExternalServiceName(cluster.Name, (*srapi.StarRocksBeSpec)(nil)) + statefulSetName = cluster.Name + "-be" + case componentTypeCN: + if cluster.Spec.StarRocksCnSpec == nil { + return true + } + serviceName = rutils.ExternalServiceName(cluster.Name, (*srapi.StarRocksCnSpec)(nil)) + statefulSetName = cluster.Name + "-cn" + default: + return true + } + + // Check 1: Service endpoints must have ready addresses + endpoints := corev1.Endpoints{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: cluster.Namespace, + Name: serviceName, + }, &endpoints); err != nil { + logger.V(5).Info("get component service endpoints failed", "component", componentType, "serviceName", serviceName, "error", err) + return false + } + + hasReadyEndpoints := false + for _, sub := range endpoints.Subsets { + if len(sub.Addresses) > 0 { + hasReadyEndpoints = true + break + } + } + + if !hasReadyEndpoints { + logger.Info("component not ready: no ready endpoints", "component", componentType, "serviceName", serviceName) + return false + } + + // Check 2: StatefulSet rollout must be complete (currentRevision == updateRevision) + sts := &appsv1.StatefulSet{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: cluster.Namespace, + Name: statefulSetName, + }, sts); err != nil { + logger.V(5).Info("get component StatefulSet failed", "component", componentType, "statefulSetName", statefulSetName, "error", err) + return false + } + + // Check if StatefulSet controller has observed our latest spec change + if sts.Generation != sts.Status.ObservedGeneration { + logger.Info("component not ready: StatefulSet spec change not yet observed", + "component", componentType, + "statefulSetName", statefulSetName, + "generation", sts.Generation, + "observedGeneration", sts.Status.ObservedGeneration) + return false + } + + // Check if rollout is complete + if sts.Status.CurrentRevision != sts.Status.UpdateRevision { + logger.Info("component not ready: StatefulSet rollout in progress", + "component", componentType, + "statefulSetName", statefulSetName, + "currentRevision", sts.Status.CurrentRevision, + "updateRevision", sts.Status.UpdateRevision) + return false + } + + // Check if all replicas are ready + if sts.Status.ReadyReplicas != *sts.Spec.Replicas { + logger.Info("component not ready: waiting for replicas", + "component", componentType, + "statefulSetName", statefulSetName, + "readyReplicas", sts.Status.ReadyReplicas, + "desiredReplicas", *sts.Spec.Replicas) + return false + } + + logger.Info("component is ready", + "component", componentType, + "serviceName", serviceName, + "readyAddresses", len(endpoints.Subsets[0].Addresses), + "revision", sts.Status.CurrentRevision) + return true +} + // SetupWithManager sets up the controller with the Manager. func (r *StarRocksWarehouseReconciler) SetupWithManager(mgr ctrl.Manager) error { // cannot add Owns(&v2.HorizontalPodAutoscaler{}), because if a kubernetes version is lower than 1.23, diff --git a/pkg/controllers/starrockscluster_controller.go b/pkg/controllers/starrockscluster_controller.go index 6f9cb364..f0fd4b8c 100644 --- a/pkg/controllers/starrockscluster_controller.go +++ b/pkg/controllers/starrockscluster_controller.go @@ -40,8 +40,11 @@ import ( // StarRocksClusterReconciler reconciles a StarRocksCluster object type StarRocksClusterReconciler struct { client.Client - Recorder record.EventRecorder - Scs []subcontrollers.ClusterSubController + Recorder record.EventRecorder + FeController subcontrollers.ClusterSubController + BeController subcontrollers.ClusterSubController + CnController subcontrollers.ClusterSubController + FeProxyController subcontrollers.ClusterSubController } // +kubebuilder:rbac:groups=starrocks.com,resources=starrocksclusters,verbs=get;list;watch;create;update;patch;delete @@ -62,6 +65,8 @@ type StarRocksClusterReconciler struct { // move the current state of the cluster closer to the desired state. // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile +// +//nolint:gocyclo,funlen // Complexity is inherent to orchestrating multiple controllers with upgrade sequencing func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.Log.WithName("StarRocksClusterReconciler").WithValues("name", req.Name, "namespace", req.Namespace) ctx = logr.NewContext(ctx, logger) @@ -86,10 +91,32 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, nil } + // Determine if this is an upgrade scenario and get controllers in appropriate order + isUpgradeScenario := isUpgrade(ctx, r.Client, src) + controllers := getControllersInOrder(isUpgradeScenario, r.FeController, r.BeController, r.CnController, r.FeProxyController) + // subControllers reconcile for create or update component. - for _, rc := range r.Scs { + for _, rc := range controllers { kvs := []interface{}{"subController", rc.GetControllerName()} - logger.Info("sub controller sync spec", kvs...) + controllerName := rc.GetControllerName() + + // During upgrades, check BE and CN are ready BEFORE syncing FE + if isUpgradeScenario && controllerName == "feController" { + // Check BE readiness if BE exists in spec + if src.Spec.StarRocksBeSpec != nil && !isComponentReady(ctx, r.Client, src, "be") { + logger.Info("upgrade: waiting for BE rollout to complete before updating FE", + "controller", controllerName) + return ctrl.Result{}, nil + } + // Check CN readiness if CN exists in spec + if src.Spec.StarRocksCnSpec != nil && !isComponentReady(ctx, r.Client, src, "cn") { + logger.Info("upgrade: waiting for CN rollout to complete before updating FE", + "controller", controllerName) + return ctrl.Result{}, nil + } + } + + // Sync the controller (create or update resources) if err = rc.SyncCluster(ctx, src); err != nil { logger.Error(err, "sub controller reconciles spec failed", kvs...) handleSyncClusterError(src, rc, err) @@ -98,9 +125,35 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req } return requeueIfError(err) } + + // After syncing, check if we need to wait for this component to be ready before proceeding + // Initial deployment: Wait for FE to be ready before creating BE/CN + // Upgrade: Wait for BE and CN to be ready before updating FE + if !isUpgradeScenario && controllerName == "feController" { + if src.Spec.StarRocksFeSpec != nil && !isComponentReady(ctx, r.Client, src, "fe") { + logger.Info("initial deployment: waiting for FE to be ready before creating BE/CN", "controller", controllerName) + return ctrl.Result{}, nil + } + } + + if isUpgradeScenario && (controllerName == "beController" || controllerName == "cnController") { + componentType := "" + if controllerName == "beController" { + componentType = "be" + } else { + componentType = "cn" + } + + if !isComponentReady(ctx, r.Client, src, componentType) { + logger.Info("upgrade: waiting for component rollout to complete before proceeding", + "controller", controllerName, + "component", componentType) + return ctrl.Result{}, nil + } + } } - for _, rc := range r.Scs { + for _, rc := range controllers { kvs := []interface{}{"subController", rc.GetControllerName()} logger.Info("sub controller update status", kvs...) if err = rc.UpdateClusterStatus(ctx, src); err != nil { diff --git a/pkg/controllers/starrockscluster_controller_test.go b/pkg/controllers/starrockscluster_controller_test.go index ea5cab8b..32a573b5 100644 --- a/pkg/controllers/starrockscluster_controller_test.go +++ b/pkg/controllers/starrockscluster_controller_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,7 +33,6 @@ import ( srapi "github.com/StarRocks/starrocks-kubernetes-operator/pkg/apis/starrocks/v1" rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake" - "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe" @@ -43,12 +43,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *StarRocksClusterR srcController := &StarRocksClusterReconciler{} srcController.Recorder = record.NewFakeRecorder(10) srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...) - srcController.Scs = []subcontrollers.ClusterSubController{ - fe.New(srcController.Client, fake.GetEventRecorderFor(nil)), - be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)), - cn.New(srcController.Client, fake.GetEventRecorderFor(nil)), - feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)), - } + srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)) + srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)) return srcController } @@ -106,3 +104,436 @@ func TestReconcileConstructFeResource(t *testing.T) { require.NoError(t, err) require.Equal(t, reconcile.Result{}, res) } + +// Helper to create a basic cluster for upgrade tests +func newTestCluster(phase srapi.Phase, image string) *srapi.StarRocksCluster { + return &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksFeSpec: &srapi.StarRocksFeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: image, + }, + }, + }, + }, + Status: srapi.StarRocksClusterStatus{ + Phase: phase, + }, + } +} + +// Helper to create a StatefulSet with a specific image for upgrade tests +func newTestStatefulSet(name string) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Image: "starrocks/fe:3.1.0"}, + }, + }, + }, + }, + } +} + +// TestIsUpgrade tests the main upgrade detection logic +func TestIsUpgrade(t *testing.T) { + ctx := context.Background() + + t.Run("not running cluster returns false", func(t *testing.T) { + cluster := newTestCluster("", "starrocks/fe:3.1.0") + client := fake.NewFakeClient(srapi.Scheme) + + result := isUpgrade(ctx, client, cluster) + require.False(t, result) + }) + + // Test 1: No StatefulSets, empty phase - initial deployment + t.Run("empty phase with no statefulsets is initial deployment", func(t *testing.T) { + cluster := newTestCluster("", "starrocks/fe:3.1.0") + client := fake.NewFakeClient(srapi.Scheme) + + result := isUpgrade(ctx, client, cluster) + require.False(t, result, "empty phase should be initial deployment") + }) + + // Test 2: Reconciling phase with StatefulSets - still initial deployment + t.Run("reconciling phase is initial deployment even with statefulsets", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterReconciling, "starrocks/fe:3.1.0") + sts := newTestStatefulSet("test-cluster-fe") + client := fake.NewFakeClient(srapi.Scheme, sts) + + result := isUpgrade(ctx, client, cluster) + require.False(t, result, "reconciling phase should be initial deployment") + }) + + // Test 3: Running phase with FE StatefulSet - upgrade + t.Run("running cluster with FE statefulset is upgrade", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0") + sts := newTestStatefulSet("test-cluster-fe") + client := fake.NewFakeClient(srapi.Scheme, sts) + + result := isUpgrade(ctx, client, cluster) + require.True(t, result, "running cluster with StatefulSet should be upgrade") + }) + + // Test 4: BE StatefulSet present but FE missing (corrupted state) - treated as initial deployment (FE must be created first) + t.Run("be statefulset without fe treated as initial deployment", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/be:3.2.0") + sts := newTestStatefulSet("test-cluster-be") + client := fake.NewFakeClient(srapi.Scheme, sts) + + result := isUpgrade(ctx, client, cluster) + require.False(t, result, "BE without FE should not trigger upgrade ordering; FE must be reconciled first") + }) + + // Test 5: Running phase with both FE and BE StatefulSets - upgrade + t.Run("running cluster with multiple statefulsets is upgrade", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0") + feSts := newTestStatefulSet("test-cluster-fe") + beSts := newTestStatefulSet("test-cluster-be") + client := fake.NewFakeClient(srapi.Scheme, feSts, beSts) + + result := isUpgrade(ctx, client, cluster) + require.True(t, result, "running cluster with multiple StatefulSets should be upgrade") + }) + + // Test 6: Running phase without StatefulSets - defensive check catches corruption + t.Run("running cluster without statefulsets is treated as initial deployment", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.1.0") + client := fake.NewFakeClient(srapi.Scheme) // No StatefulSets + + result := isUpgrade(ctx, client, cluster) + require.False(t, result, "running phase without StatefulSets should be treated as initial deployment (status corruption)") + }) + + // Test 7: Failed phase with StatefulSets - initial deployment + t.Run("failed phase is initial deployment even with statefulsets", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterFailed, "starrocks/fe:3.1.0") + sts := newTestStatefulSet("test-cluster-fe") + client := fake.NewFakeClient(srapi.Scheme, sts) + + result := isUpgrade(ctx, client, cluster) + require.False(t, result, "failed phase should be initial deployment") + }) + + // Test 8: Running phase with same image - still upgrade (generic detection) + t.Run("running cluster with same image is still upgrade scenario", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.1.0") + sts := newTestStatefulSet("test-cluster-fe") + sts.Spec.Template.Spec.Containers[0].Image = "starrocks/fe:3.1.0" // Same image + client := fake.NewFakeClient(srapi.Scheme, sts) + + result := isUpgrade(ctx, client, cluster) + require.True(t, result, "running cluster uses upgrade ordering even if no image change (config/volume changes)") + }) +} + +// TestGetControllersInOrder tests controller ordering based on deployment scenario +func TestGetControllersInOrder(t *testing.T) { + ctx := context.Background() + feCtrl := fe.New(nil, nil) + beCtrl := be.New(nil, nil) + cnCtrl := cn.New(nil, nil) + feProxyCtrl := feproxy.New(nil, nil) + + t.Run("initial deployment uses FE-first order", func(t *testing.T) { + cluster := newTestCluster("", "starrocks/fe:3.1.0") + client := fake.NewFakeClient(srapi.Scheme) + + isUpgradeScenario := isUpgrade(ctx, client, cluster) + controllers := getControllersInOrder(isUpgradeScenario, feCtrl, beCtrl, cnCtrl, feProxyCtrl) + + // Check FE is first + require.Equal(t, "fe", controllers[0].GetControllerName()) + // Verify order: FE -> BE -> CN -> FeProxy + require.Equal(t, "be", controllers[1].GetControllerName()) + require.Equal(t, "cn", controllers[2].GetControllerName()) + require.Equal(t, "feproxy", controllers[3].GetControllerName()) + }) + + t.Run("upgrade uses BE-first order", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0") + sts := newTestStatefulSet("test-cluster-fe") + client := fake.NewFakeClient(srapi.Scheme, sts) + + isUpgradeScenario := isUpgrade(ctx, client, cluster) + controllers := getControllersInOrder(isUpgradeScenario, feCtrl, beCtrl, cnCtrl, feProxyCtrl) + + // Check BE is first + require.Equal(t, "be", controllers[0].GetControllerName()) + // Verify order: BE -> CN -> FE -> FeProxy (StarRocks upgrade procedure) + require.Equal(t, "cn", controllers[1].GetControllerName()) + require.Equal(t, "fe", controllers[2].GetControllerName()) + require.Equal(t, "feproxy", controllers[3].GetControllerName()) + }) +} + +// TestIsComponentReady tests component readiness detection +func TestIsComponentReady(t *testing.T) { + ctx := context.Background() + cluster := &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksFeSpec: &srapi.StarRocksFeSpec{}, + StarRocksBeSpec: &srapi.StarRocksBeSpec{}, + }, + } + + t.Run("component not ready when endpoints not found", func(t *testing.T) { + client := fake.NewFakeClient(srapi.Scheme) + result := isComponentReady(ctx, client, cluster, "fe") + require.False(t, result) + }) + + t.Run("component not ready when no addresses", func(t *testing.T) { + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{}, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints) + result := isComponentReady(ctx, client, cluster, "fe") + require.False(t, result) + }) + + t.Run("component ready when endpoints have addresses", func(t *testing.T) { + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + {IP: "10.0.0.1"}, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints) + result := isComponentReady(ctx, client, cluster, "fe") + require.True(t, result) + }) + + t.Run("component ready when spec is nil", func(t *testing.T) { + clusterNoCN := &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksFeSpec: &srapi.StarRocksFeSpec{}, + }, + } + client := fake.NewFakeClient(srapi.Scheme) + result := isComponentReady(ctx, client, clusterNoCN, "cn") + require.True(t, result, "CN should be considered ready when not configured") + }) + + t.Run("component not ready when StatefulSet not found", func(t *testing.T) { + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.1"}}, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints) + result := isComponentReady(ctx, client, cluster, "fe") + require.False(t, result, "component not ready when StatefulSet doesn't exist") + }) + + t.Run("component not ready when ObservedGeneration lags behind Generation", func(t *testing.T) { + replicas := int32(3) + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.1"}}, + }, + }, + } + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe", + Namespace: "default", + Generation: 5, // Spec changed + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 4, // Controller hasn't observed the change yet + CurrentRevision: "test-cluster-fe-12345", + UpdateRevision: "test-cluster-fe-12345", + ReadyReplicas: 3, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints, sts) + result := isComponentReady(ctx, client, cluster, "fe") + require.False(t, result, "component not ready when StatefulSet spec change not yet observed") + }) + + t.Run("component not ready when rollout in progress", func(t *testing.T) { + replicas := int32(3) + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-be-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.2"}}, + }, + }, + } + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-be", + Namespace: "default", + Generation: 3, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 3, // Observed + CurrentRevision: "test-cluster-be-12345", // Old revision + UpdateRevision: "test-cluster-be-67890", // New revision - rollout in progress + ReadyReplicas: 3, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints, sts) + result := isComponentReady(ctx, client, cluster, "be") + require.False(t, result, "component not ready when StatefulSet rollout in progress") + }) + + t.Run("component not ready when replicas not all ready", func(t *testing.T) { + replicas := int32(3) + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.1"}}, + }, + }, + } + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe", + Namespace: "default", + Generation: 2, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 2, + CurrentRevision: "test-cluster-fe-12345", + UpdateRevision: "test-cluster-fe-12345", + ReadyReplicas: 2, // Only 2 out of 3 ready + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints, sts) + result := isComponentReady(ctx, client, cluster, "fe") + require.False(t, result, "component not ready when not all replicas are ready") + }) + + t.Run("component ready when all checks pass", func(t *testing.T) { + replicas := int32(3) + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.1"}, {IP: "10.0.0.2"}, {IP: "10.0.0.3"}}, + }, + }, + } + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe", + Namespace: "default", + Generation: 5, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 5, // Generation matches + CurrentRevision: "test-cluster-fe-12345", // Rollout complete + UpdateRevision: "test-cluster-fe-12345", // Same revision + ReadyReplicas: 3, // All replicas ready + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints, sts) + result := isComponentReady(ctx, client, cluster, "fe") + require.True(t, result, "component ready when all 4 checks pass") + }) + + t.Run("component ready for BE with all checks passing", func(t *testing.T) { + replicas := int32(2) + endpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-be-service", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.4"}, {IP: "10.0.0.5"}}, + }, + }, + } + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-be", + Namespace: "default", + Generation: 10, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 10, + CurrentRevision: "test-cluster-be-abc123", + UpdateRevision: "test-cluster-be-abc123", + ReadyReplicas: 2, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints, sts) + result := isComponentReady(ctx, client, cluster, "be") + require.True(t, result, "BE component ready when all checks pass") + }) +} diff --git a/pkg/subcontrollers/be/be_controller_reconcile_test.go b/pkg/subcontrollers/be/be_controller_reconcile_test.go index 14d9468c..981f6f8a 100644 --- a/pkg/subcontrollers/be/be_controller_reconcile_test.go +++ b/pkg/subcontrollers/be/be_controller_reconcile_test.go @@ -18,7 +18,6 @@ import ( rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/controllers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake" - "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe" @@ -29,12 +28,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarR srcController := &controllers.StarRocksClusterReconciler{} srcController.Recorder = record.NewFakeRecorder(10) srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...) - srcController.Scs = []subcontrollers.ClusterSubController{ - fe.New(srcController.Client, fake.GetEventRecorderFor(nil)), - be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)), - cn.New(srcController.Client, fake.GetEventRecorderFor(nil)), - feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)), - } + srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)) + srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)) return srcController } diff --git a/pkg/subcontrollers/cn/cn_controller_reconcile_test.go b/pkg/subcontrollers/cn/cn_controller_reconcile_test.go index a7ad0962..1821daaf 100644 --- a/pkg/subcontrollers/cn/cn_controller_reconcile_test.go +++ b/pkg/subcontrollers/cn/cn_controller_reconcile_test.go @@ -19,7 +19,6 @@ import ( rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/controllers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake" - "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe" @@ -30,12 +29,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarR srcController := &controllers.StarRocksClusterReconciler{} srcController.Recorder = record.NewFakeRecorder(10) srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...) - srcController.Scs = []subcontrollers.ClusterSubController{ - fe.New(srcController.Client, fake.GetEventRecorderFor(nil)), - be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)), - cn.New(srcController.Client, fake.GetEventRecorderFor(nil)), - feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)), - } + srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)) + srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)) return srcController } diff --git a/pkg/subcontrollers/fe/fe_controller_reconcile_test.go b/pkg/subcontrollers/fe/fe_controller_reconcile_test.go index 239322d7..a9fb71ff 100644 --- a/pkg/subcontrollers/fe/fe_controller_reconcile_test.go +++ b/pkg/subcontrollers/fe/fe_controller_reconcile_test.go @@ -18,7 +18,6 @@ import ( rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/controllers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake" - "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe" @@ -29,12 +28,10 @@ func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarR srcController := &controllers.StarRocksClusterReconciler{} srcController.Recorder = record.NewFakeRecorder(10) srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...) - srcController.Scs = []subcontrollers.ClusterSubController{ - fe.New(srcController.Client, fake.GetEventRecorderFor(nil)), - be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)), - cn.New(srcController.Client, fake.GetEventRecorderFor(nil)), - feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)), - } + srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)) + srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)) return srcController } diff --git a/pkg/subcontrollers/feproxy/feproxy_controller_test.go b/pkg/subcontrollers/feproxy/feproxy_controller_test.go index a9dc4996..b0c81493 100644 --- a/pkg/subcontrollers/feproxy/feproxy_controller_test.go +++ b/pkg/subcontrollers/feproxy/feproxy_controller_test.go @@ -18,24 +18,24 @@ import ( rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/common/resource_utils" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/controllers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake" - "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe" "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/feproxy" ) -func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarRocksClusterReconciler { +func TestMain(_ *testing.M) { srapi.Register() +} + +func newStarRocksClusterController(objects ...runtime.Object) *controllers.StarRocksClusterReconciler { srcController := &controllers.StarRocksClusterReconciler{} srcController.Recorder = record.NewFakeRecorder(10) srcController.Client = fake.NewFakeClient(srapi.Scheme, objects...) - srcController.Scs = []subcontrollers.ClusterSubController{ - fe.New(srcController.Client, fake.GetEventRecorderFor(nil)), - be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)), - cn.New(srcController.Client, fake.GetEventRecorderFor(nil)), - feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)), - } + srcController.FeController = fe.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.BeController = be.New(srcController.Client, fake.GetEventRecorderFor(srcController.Recorder)) + srcController.CnController = cn.New(srcController.Client, fake.GetEventRecorderFor(nil)) + srcController.FeProxyController = feproxy.New(srcController.Client, fake.GetEventRecorderFor(nil)) return srcController } From abc305e150b922926ccd08143c687e90aab28235 Mon Sep 17 00:00:00 2001 From: jmjm15x Date: Wed, 15 Oct 2025 22:53:14 -0700 Subject: [PATCH 2/4] Limit upgrade detection to image version changes only Previously, any StatefulSet existence triggered BE-Frist ordering. Now only actual image changes trigger upgrade ordering, preventing unnecessary use of upgrade path for all changes. Remove the redundant checks in the reconcile method Signed-off-by: jmjm15x --- pkg/controllers/controllers.go | 63 +++- .../starrockscluster_controller.go | 19 +- .../starrockscluster_controller_test.go | 291 +++++++++++++++++- 3 files changed, 338 insertions(+), 35 deletions(-) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index bb97ea26..49da59d3 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -61,6 +61,7 @@ func getControllersInOrder( } // isUpgrade determines if the current reconciliation is an upgrade scenario. +// Returns true only if StatefulSets exist AND there are image changes detected. func isUpgrade(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool { logger := logr.FromContextOrDiscard(ctx) @@ -85,14 +86,70 @@ func isUpgrade(ctx context.Context, kubeClient client.Client, cluster *srapi.Sta return false } - if feExists { - return true + if !feExists { + return false + } + + return checkForImageChanges(ctx, kubeClient, cluster) +} + +// checkForImageChanges compares the desired StarRocks component images +// against the currently deployed StatefulSet images. +// Returns true if any component image differs. +func checkForImageChanges(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool { + if cluster.Spec.StarRocksFeSpec != nil { + desiredImage := cluster.Spec.StarRocksFeSpec.Image + currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-fe") + if currentImage != "" && desiredImage != currentImage { + return true + } + } + + if cluster.Spec.StarRocksBeSpec != nil { + desiredImage := cluster.Spec.StarRocksBeSpec.Image + currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-be") + if currentImage != "" && desiredImage != currentImage { + return true + } + } + + if cluster.Spec.StarRocksCnSpec != nil { + desiredImage := cluster.Spec.StarRocksCnSpec.Image + currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-cn") + if currentImage != "" && desiredImage != currentImage { + return true + } } - // No StatefulSets found - this is initial deployment return false } +// getCurrentImageFromStatefulSet returns the container image used in a StatefulSet. +// Returns an empty string if the StatefulSet is missing or has no containers. +func getCurrentImageFromStatefulSet(ctx context.Context, kubeClient client.Client, namespace, name string) string { + var sts appsv1.StatefulSet + if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &sts); err != nil { + // StatefulSet does not exist + return "" + } + + containers := sts.Spec.Template.Spec.Containers + if len(containers) == 0 { + return "" + } + + // Prefer a named match for known StarRocks components + for _, c := range containers { + switch c.Name { + case "fe", "be", "cn": + return c.Image + } + } + + // Fallback for backward compatibility + return containers[0].Image +} + // SetupWithManager sets up the controller with the Manager. func (r *StarRocksClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { // cannot add Owns(&v2.HorizontalPodAutoscaler{}), because if a kubernetes version is lower than 1.23, diff --git a/pkg/controllers/starrockscluster_controller.go b/pkg/controllers/starrockscluster_controller.go index f0fd4b8c..d37e9e53 100644 --- a/pkg/controllers/starrockscluster_controller.go +++ b/pkg/controllers/starrockscluster_controller.go @@ -66,7 +66,7 @@ type StarRocksClusterReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile // -//nolint:gocyclo,funlen // Complexity is inherent to orchestrating multiple controllers with upgrade sequencing +//nolint:gocyclo // Complexity is inherent to orchestrating multiple controllers with upgrade sequencing func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.Log.WithName("StarRocksClusterReconciler").WithValues("name", req.Name, "namespace", req.Namespace) ctx = logr.NewContext(ctx, logger) @@ -128,29 +128,12 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req // After syncing, check if we need to wait for this component to be ready before proceeding // Initial deployment: Wait for FE to be ready before creating BE/CN - // Upgrade: Wait for BE and CN to be ready before updating FE if !isUpgradeScenario && controllerName == "feController" { if src.Spec.StarRocksFeSpec != nil && !isComponentReady(ctx, r.Client, src, "fe") { logger.Info("initial deployment: waiting for FE to be ready before creating BE/CN", "controller", controllerName) return ctrl.Result{}, nil } } - - if isUpgradeScenario && (controllerName == "beController" || controllerName == "cnController") { - componentType := "" - if controllerName == "beController" { - componentType = "be" - } else { - componentType = "cn" - } - - if !isComponentReady(ctx, r.Client, src, componentType) { - logger.Info("upgrade: waiting for component rollout to complete before proceeding", - "controller", controllerName, - "component", componentType) - return ctrl.Result{}, nil - } - } } for _, rc := range controllers { diff --git a/pkg/controllers/starrockscluster_controller_test.go b/pkg/controllers/starrockscluster_controller_test.go index 32a573b5..d7ca1695 100644 --- a/pkg/controllers/starrockscluster_controller_test.go +++ b/pkg/controllers/starrockscluster_controller_test.go @@ -158,7 +158,6 @@ func TestIsUpgrade(t *testing.T) { require.False(t, result) }) - // Test 1: No StatefulSets, empty phase - initial deployment t.Run("empty phase with no statefulsets is initial deployment", func(t *testing.T) { cluster := newTestCluster("", "starrocks/fe:3.1.0") client := fake.NewFakeClient(srapi.Scheme) @@ -167,7 +166,6 @@ func TestIsUpgrade(t *testing.T) { require.False(t, result, "empty phase should be initial deployment") }) - // Test 2: Reconciling phase with StatefulSets - still initial deployment t.Run("reconciling phase is initial deployment even with statefulsets", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterReconciling, "starrocks/fe:3.1.0") sts := newTestStatefulSet("test-cluster-fe") @@ -177,17 +175,16 @@ func TestIsUpgrade(t *testing.T) { require.False(t, result, "reconciling phase should be initial deployment") }) - // Test 3: Running phase with FE StatefulSet - upgrade - t.Run("running cluster with FE statefulset is upgrade", func(t *testing.T) { + t.Run("fe statefulset with image change triggers upgrade", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0") sts := newTestStatefulSet("test-cluster-fe") + sts.Spec.Template.Spec.Containers[0].Image = "starrocks/fe:3.1.0" // Different image client := fake.NewFakeClient(srapi.Scheme, sts) result := isUpgrade(ctx, client, cluster) - require.True(t, result, "running cluster with StatefulSet should be upgrade") + require.True(t, result, "FE StatefulSet with image change should trigger upgrade") }) - // Test 4: BE StatefulSet present but FE missing (corrupted state) - treated as initial deployment (FE must be created first) t.Run("be statefulset without fe treated as initial deployment", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterRunning, "starrocks/be:3.2.0") sts := newTestStatefulSet("test-cluster-be") @@ -197,18 +194,18 @@ func TestIsUpgrade(t *testing.T) { require.False(t, result, "BE without FE should not trigger upgrade ordering; FE must be reconciled first") }) - // Test 5: Running phase with both FE and BE StatefulSets - upgrade - t.Run("running cluster with multiple statefulsets is upgrade", func(t *testing.T) { + t.Run("multiple statefulsets with image change triggers upgrade", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.2.0") feSts := newTestStatefulSet("test-cluster-fe") + feSts.Spec.Template.Spec.Containers[0].Image = "starrocks/fe:3.1.0" // Different beSts := newTestStatefulSet("test-cluster-be") + beSts.Spec.Template.Spec.Containers[0].Image = "starrocks/be:3.1.0" client := fake.NewFakeClient(srapi.Scheme, feSts, beSts) result := isUpgrade(ctx, client, cluster) - require.True(t, result, "running cluster with multiple StatefulSets should be upgrade") + require.True(t, result, "multiple StatefulSets with image change should trigger upgrade") }) - // Test 6: Running phase without StatefulSets - defensive check catches corruption t.Run("running cluster without statefulsets is treated as initial deployment", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.1.0") client := fake.NewFakeClient(srapi.Scheme) // No StatefulSets @@ -217,7 +214,6 @@ func TestIsUpgrade(t *testing.T) { require.False(t, result, "running phase without StatefulSets should be treated as initial deployment (status corruption)") }) - // Test 7: Failed phase with StatefulSets - initial deployment t.Run("failed phase is initial deployment even with statefulsets", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterFailed, "starrocks/fe:3.1.0") sts := newTestStatefulSet("test-cluster-fe") @@ -227,15 +223,282 @@ func TestIsUpgrade(t *testing.T) { require.False(t, result, "failed phase should be initial deployment") }) - // Test 8: Running phase with same image - still upgrade (generic detection) - t.Run("running cluster with same image is still upgrade scenario", func(t *testing.T) { + t.Run("statefulset with same image does not trigger upgrade", func(t *testing.T) { cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.1.0") sts := newTestStatefulSet("test-cluster-fe") sts.Spec.Template.Spec.Containers[0].Image = "starrocks/fe:3.1.0" // Same image client := fake.NewFakeClient(srapi.Scheme, sts) result := isUpgrade(ctx, client, cluster) - require.True(t, result, "running cluster uses upgrade ordering even if no image change (config/volume changes)") + require.False(t, result, "same image version should use initial deployment ordering (no upgrade needed)") + }) + + t.Run("be image change triggers upgrade", func(t *testing.T) { + cluster := &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksFeSpec: &srapi.StarRocksFeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/fe:3.1.0", + }, + }, + }, + StarRocksBeSpec: &srapi.StarRocksBeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/be:3.2.0", // New version + }, + }, + }, + }, + } + feSts := newTestStatefulSet("test-cluster-fe") + feSts.Spec.Template.Spec.Containers[0].Image = "starrocks/fe:3.1.0" // Same + beSts := newTestStatefulSet("test-cluster-be") + beSts.Spec.Template.Spec.Containers[0].Image = "starrocks/be:3.1.0" // Old version + client := fake.NewFakeClient(srapi.Scheme, feSts, beSts) + + result := isUpgrade(ctx, client, cluster) + require.True(t, result, "BE image change should trigger upgrade") + }) + + t.Run("downgrade treated as upgrade for safety", func(t *testing.T) { + cluster := newTestCluster(srapi.ClusterRunning, "starrocks/fe:3.0.0") // Downgrade to 3.0.0 + sts := newTestStatefulSet("test-cluster-fe") + sts.Spec.Template.Spec.Containers[0].Image = "starrocks/fe:3.1.0" // Currently on 3.1.0 + client := fake.NewFakeClient(srapi.Scheme, sts) + + result := isUpgrade(ctx, client, cluster) + require.True(t, result, "downgrade should use upgrade ordering") + }) +} + +// TestGetCurrentImageFromStatefulSet tests the image retrieval logic from StatefulSets +func TestGetCurrentImageFromStatefulSet(t *testing.T) { + ctx := context.Background() + + t.Run("returns empty string when StatefulSet not found", func(t *testing.T) { + client := fake.NewFakeClient(srapi.Scheme) + result := getCurrentImageFromStatefulSet(ctx, client, "default", "missing-sts") + require.Equal(t, "", result) + }) + + t.Run("returns empty string when StatefulSet has no containers", func(t *testing.T) { + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + Namespace: "default", + }, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{}, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, sts) + result := getCurrentImageFromStatefulSet(ctx, client, "default", "test-sts") + require.Equal(t, "", result) + }) + + t.Run("returns image from fe container by name", func(t *testing.T) { + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-fe", + Namespace: "default", + }, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "fe", Image: "starrocks/fe:3.1.0"}, + }, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, sts) + result := getCurrentImageFromStatefulSet(ctx, client, "default", "test-fe") + require.Equal(t, "starrocks/fe:3.1.0", result) + }) + + t.Run("returns image from be container by name", func(t *testing.T) { + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-be", + Namespace: "default", + }, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "be", Image: "starrocks/be:3.1.0"}, + }, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, sts) + result := getCurrentImageFromStatefulSet(ctx, client, "default", "test-be") + require.Equal(t, "starrocks/be:3.1.0", result) + }) + + t.Run("returns image from cn container by name", func(t *testing.T) { + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cn", + Namespace: "default", + }, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "cn", Image: "starrocks/cn:3.1.0"}, + }, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, sts) + result := getCurrentImageFromStatefulSet(ctx, client, "default", "test-cn") + require.Equal(t, "starrocks/cn:3.1.0", result) + }) +} + +// TestCheckForImageChanges tests the image comparison logic +func TestCheckForImageChanges(t *testing.T) { + ctx := context.Background() + + t.Run("no changes when all images match", func(t *testing.T) { + cluster := &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksFeSpec: &srapi.StarRocksFeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/fe:3.1.0", + }, + }, + }, + StarRocksBeSpec: &srapi.StarRocksBeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/be:3.1.0", + }, + }, + }, + StarRocksCnSpec: &srapi.StarRocksCnSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/cn:3.1.0", + }, + }, + }, + }, + } + feSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-fe", Namespace: "default"}, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "fe", Image: "starrocks/fe:3.1.0"}}, + }, + }, + }, + } + beSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-be", Namespace: "default"}, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "be", Image: "starrocks/be:3.1.0"}}, + }, + }, + }, + } + cnSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-cn", Namespace: "default"}, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "cn", Image: "starrocks/cn:3.1.0"}}, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, feSts, beSts, cnSts) + result := checkForImageChanges(ctx, client, cluster) + require.False(t, result, "no changes when all images match") + }) + + t.Run("detects FE image change", func(t *testing.T) { + cluster := &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksFeSpec: &srapi.StarRocksFeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/fe:3.2.0", + }, + }, + }, + }, + } + feSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-fe", Namespace: "default"}, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "fe", Image: "starrocks/fe:3.1.0"}}, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, feSts) + result := checkForImageChanges(ctx, client, cluster) + require.True(t, result, "should detect FE image change") + }) + + t.Run("detects BE image change", func(t *testing.T) { + cluster := &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksBeSpec: &srapi.StarRocksBeSpec{ + StarRocksComponentSpec: srapi.StarRocksComponentSpec{ + StarRocksLoadSpec: srapi.StarRocksLoadSpec{ + Image: "starrocks/be:3.2.0", + }, + }, + }, + }, + } + beSts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-be", Namespace: "default"}, + Spec: appsv1.StatefulSetSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "be", Image: "starrocks/be:3.1.0"}}, + }, + }, + }, + } + client := fake.NewFakeClient(srapi.Scheme, beSts) + result := checkForImageChanges(ctx, client, cluster) + require.True(t, result, "should detect BE image change") }) } From 09148b2679cea03502be74e8f2d0f878389243f7 Mon Sep 17 00:00:00 2001 From: yandongxiao Date: Thu, 16 Oct 2025 10:17:11 -0700 Subject: [PATCH 3/4] [Enhancement] Support arrow_flight_port (#708) * [Enhancement] Support arrow_flight_port Signed-off-by: yandongxiao * [BugFix] fix failed test cases and add test cases for arrow flight Signed-off-by: yandongxiao --------- Signed-off-by: yandongxiao --- pkg/common/resource_utils/configmap.go | 9 ++- pkg/common/resource_utils/service.go | 38 +++++---- pkg/controllers/controllers_test.go | 2 - .../starrockswarehouse_controller_test.go | 8 +- pkg/k8sutils/k8sutils_test.go | 4 +- pkg/subcontrollers/be/be_controller.go | 27 ++++--- pkg/subcontrollers/be/be_controller_test.go | 70 +++++++++++++++- pkg/subcontrollers/cn/cn_controller.go | 42 +++++++--- pkg/subcontrollers/cn/cn_controller_test.go | 80 +++++++++++++++++-- pkg/subcontrollers/fe/fe_controller_test.go | 4 +- 10 files changed, 229 insertions(+), 55 deletions(-) diff --git a/pkg/common/resource_utils/configmap.go b/pkg/common/resource_utils/configmap.go index c9851b4d..898a3b97 100644 --- a/pkg/common/resource_utils/configmap.go +++ b/pkg/common/resource_utils/configmap.go @@ -22,10 +22,11 @@ import ( // the fe ports key const ( - HTTP_PORT = "http_port" - RPC_PORT = "rpc_port" - QUERY_PORT = "query_port" - EDIT_LOG_PORT = "edit_log_port" + HTTP_PORT = "http_port" + RPC_PORT = "rpc_port" + QUERY_PORT = "query_port" + EDIT_LOG_PORT = "edit_log_port" + ARROW_FLIGHT_PORT = "arrow_flight_port" ) // the cn or be ports key diff --git a/pkg/common/resource_utils/service.go b/pkg/common/resource_utils/service.go index f0ed3f4c..95e0f06f 100644 --- a/pkg/common/resource_utils/service.go +++ b/pkg/common/resource_utils/service.go @@ -26,20 +26,23 @@ import ( ) const ( - FeHTTPPortName = "http" - FeRPCPortName = "rpc" - FeQueryPortName = "query" - FeEditLogPortName = "edit-log" - - BePortName = "be" - BeWebserverPortName = "webserver" - BeHeartbeatPortName = "heartbeat" - BeBrpcPortName = "brpc" - - CnThriftPortName = "thrift" - CnWebserverPortName = "webserver" - CnHeartbeatPortName = "heartbeat" - CnBrpcPortName = "brpc" + FeHTTPPortName = "http" + FeRPCPortName = "rpc" + FeQueryPortName = "query" + FeEditLogPortName = "edit-log" + FEArrowFlightPortName = "arrow-flight" + + BePortName = "be" + BeWebserverPortName = "webserver" + BeHeartbeatPortName = "heartbeat" + BeBrpcPortName = "brpc" + BEArrowFlightPortName = "arrow-flight" + + CnThriftPortName = "thrift" + CnWebserverPortName = "webserver" + CnHeartbeatPortName = "heartbeat" + CnBrpcPortName = "brpc" + CnArrowFlightPortName = "arrow-flight" ) // HashService service hash components @@ -149,6 +152,13 @@ func getFeServicePorts(config map[string]interface{}, service *srapi.StarRocksSe Port: editPort, ContainerPort: editPort, Name: FeEditLogPortName, })) + arrowFlightPort := GetPort(config, ARROW_FLIGHT_PORT) + if arrowFlightPort != 0 { + srPorts = append(srPorts, mergePort(service, srapi.StarRocksServicePort{ + Port: arrowFlightPort, ContainerPort: arrowFlightPort, Name: FEArrowFlightPortName, + })) + } + return srPorts } diff --git a/pkg/controllers/controllers_test.go b/pkg/controllers/controllers_test.go index d8759e51..31672663 100644 --- a/pkg/controllers/controllers_test.go +++ b/pkg/controllers/controllers_test.go @@ -91,8 +91,6 @@ func TestSetupWarehouseReconciler(t *testing.T) { }, } - v1.Register() - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if err := SetupWarehouseReconciler(tt.args.mgr, tt.args.namespace); (err != nil) != tt.wantErr { diff --git a/pkg/controllers/starrockswarehouse_controller_test.go b/pkg/controllers/starrockswarehouse_controller_test.go index 2ee069fe..29ecab4a 100644 --- a/pkg/controllers/starrockswarehouse_controller_test.go +++ b/pkg/controllers/starrockswarehouse_controller_test.go @@ -2,6 +2,7 @@ package controllers import ( "context" + "os" "reflect" "testing" @@ -18,8 +19,9 @@ import ( "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn" ) -func TestMain(_ *testing.M) { +func TestMain(m *testing.M) { v1.Register() + os.Exit(m.Run()) } func newStarRocksWarehouseController(objects ...runtime.Object) *StarRocksWarehouseReconciler { @@ -124,7 +126,7 @@ func TestStarRocksWarehouseReconciler_Reconcile(t *testing.T) { reconciler: newStarRocksWarehouseController( &v1.StarRocksWarehouse{ ObjectMeta: metav1.ObjectMeta{ - Name: "test", + Name: "wh", Namespace: "test", }, Spec: v1.StarRocksWarehouseSpec{ @@ -189,7 +191,7 @@ func TestStarRocksWarehouseReconciler_Reconcile(t *testing.T) { reconciler: newStarRocksWarehouseController( &v1.StarRocksWarehouse{ ObjectMeta: metav1.ObjectMeta{ - Name: "test", + Name: "wh", Namespace: "test", }, Spec: v1.StarRocksWarehouseSpec{ diff --git a/pkg/k8sutils/k8sutils_test.go b/pkg/k8sutils/k8sutils_test.go index 5773771a..dcb0ba39 100644 --- a/pkg/k8sutils/k8sutils_test.go +++ b/pkg/k8sutils/k8sutils_test.go @@ -16,6 +16,7 @@ package k8sutils_test import ( "context" + "os" "testing" "github.com/stretchr/testify/require" @@ -32,8 +33,9 @@ import ( "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/fake" ) -func TestMain(_ *testing.M) { +func TestMain(m *testing.M) { srapi.Register() + os.Exit(m.Run()) } func Test_getValueFromConfigmap(t *testing.T) { diff --git a/pkg/subcontrollers/be/be_controller.go b/pkg/subcontrollers/be/be_controller.go index 98015280..eb03132b 100644 --- a/pkg/subcontrollers/be/be_controller.go +++ b/pkg/subcontrollers/be/be_controller.go @@ -97,23 +97,23 @@ func (be *BeController) SyncCluster(ctx context.Context, src *srapi.StarRocksClu } logger.V(log.DebugLevel).Info("get be/fe config to resolve ports", "ConfigMapInfo", beSpec.ConfigMapInfo) - config, err := be.GetBeConfig(ctx, beSpec, src.Namespace) + beConfig, err := be.GetBeConfig(ctx, beSpec, src.Namespace) if err != nil { logger.Error(err, "get be config failed", "ConfigMapInfo", beSpec.ConfigMapInfo) return err } // add query port from fe config. - config[rutils.QUERY_PORT] = strconv.FormatInt(int64(rutils.GetPort(feConfig, rutils.QUERY_PORT)), 10) + beConfig[rutils.QUERY_PORT] = strconv.FormatInt(int64(rutils.GetPort(feConfig, rutils.QUERY_PORT)), 10) // generate new be external service. defaultLabels := load.Labels(src.Name, beSpec) externalsvc := rutils.BuildExternalService(object.NewFromCluster(src), - beSpec, config, load.Selector(src.Name, beSpec), defaultLabels) + beSpec, beConfig, load.Selector(src.Name, beSpec), defaultLabels) // generate internal fe service, update the status of cn on src. - internalService := be.generateInternalService(src, &externalsvc, config, defaultLabels) + internalService := GenerateInternalService(src, &externalsvc, beConfig, defaultLabels) // create be statefulset - podTemplateSpec, err := be.buildPodTemplate(src, config) + podTemplateSpec, err := be.buildPodTemplate(src, beConfig) if err != nil { logger.Error(err, "build pod template failed") return err @@ -182,18 +182,27 @@ func (be *BeController) UpdateClusterStatus(ctx context.Context, src *srapi.Star return nil } -func (be *BeController) generateInternalService(src *srapi.StarRocksCluster, - externalService *corev1.Service, config map[string]interface{}, labels map[string]string) *corev1.Service { +func GenerateInternalService(src *srapi.StarRocksCluster, + externalService *corev1.Service, beConfig map[string]interface{}, labels map[string]string) *corev1.Service { spec := src.Spec.StarRocksBeSpec searchServiceName := service.SearchServiceName(src.Name, spec) searchSvc := service.MakeSearchService(searchServiceName, externalService, []corev1.ServicePort{ { Name: "heartbeat", - Port: rutils.GetPort(config, rutils.HEARTBEAT_SERVICE_PORT), - TargetPort: intstr.FromInt(int(rutils.GetPort(config, rutils.HEARTBEAT_SERVICE_PORT))), + Port: rutils.GetPort(beConfig, rutils.HEARTBEAT_SERVICE_PORT), + TargetPort: intstr.FromInt(int(rutils.GetPort(beConfig, rutils.HEARTBEAT_SERVICE_PORT))), }, }, labels) + arrowFlightPort := rutils.GetPort(beConfig, rutils.ARROW_FLIGHT_PORT) + if arrowFlightPort != 0 { + searchSvc.Spec.Ports = append(searchSvc.Spec.Ports, corev1.ServicePort{ + Name: rutils.BEArrowFlightPortName, + Port: arrowFlightPort, + TargetPort: intstr.FromInt(int(arrowFlightPort)), + }) + } + return searchSvc } diff --git a/pkg/subcontrollers/be/be_controller_test.go b/pkg/subcontrollers/be/be_controller_test.go index 6961804f..2583a89c 100644 --- a/pkg/subcontrollers/be/be_controller_test.go +++ b/pkg/subcontrollers/be/be_controller_test.go @@ -16,10 +16,12 @@ package be_test import ( "context" + "os" "reflect" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -27,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" srapi "github.com/StarRocks/starrocks-kubernetes-operator/pkg/apis/starrocks/v1" @@ -37,8 +40,9 @@ import ( "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be" ) -func TestMain(_ *testing.M) { +func TestMain(m *testing.M) { srapi.Register() + os.Exit(m.Run()) } func Test_ClearResources(t *testing.T) { @@ -328,3 +332,67 @@ func TestBeController_GetBeConfig(t *testing.T) { }) } } + +func TestGenerateInternalService(t *testing.T) { + type args struct { + cluster *srapi.StarRocksCluster + externalService *corev1.Service + beConfig map[string]interface{} + labels map[string]string + } + tests := []struct { + name string + args args + want *corev1.Service + }{ + { + name: "test1", + args: args{ + cluster: &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + }, + Spec: srapi.StarRocksClusterSpec{ + StarRocksBeSpec: &srapi.StarRocksBeSpec{}, + }, + }, + externalService: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-be-service", + Namespace: "default", + Labels: map[string]string{"l1": "l1"}, + Annotations: map[string]string{"a": "a"}, + }, + }, + beConfig: map[string]interface{}{ + "heartbeat_service_port": "1234", + rutils.ARROW_FLIGHT_PORT: "5678", + }, + labels: map[string]string{ + "l2": "l2", + }, + }, + want: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-be-search", + Namespace: "default", + Labels: map[string]string{"l2": "l2"}, + Annotations: nil, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "heartbeat", Port: 1234, TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 1234}}, + {Name: "arrow-flight", Port: 5678, TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 5678}}, + }, + ClusterIP: "None", + PublishNotReadyAddresses: true, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, be.GenerateInternalService(tt.args.cluster, tt.args.externalService, tt.args.beConfig, tt.args.labels), "generateInternalService(%v, %v, %v, %v)", tt.args.cluster, tt.args.externalService, tt.args.beConfig, tt.args.labels) + }) + } +} diff --git a/pkg/subcontrollers/cn/cn_controller.go b/pkg/subcontrollers/cn/cn_controller.go index ed8b07a5..c39af7a6 100644 --- a/pkg/subcontrollers/cn/cn_controller.go +++ b/pkg/subcontrollers/cn/cn_controller.go @@ -160,7 +160,7 @@ func (cc *CnController) SyncCnSpec(ctx context.Context, object object.StarRocksO } logger.V(log.DebugLevel).Info("get cn config to resolve ports", "ConfigMapInfo", cnSpec.ConfigMapInfo) - config, err := cc.GetCnConfig(ctx, cnSpec, object.Namespace) + cnConfig, err := cc.GetCnConfig(ctx, cnSpec, object.Namespace) if err != nil { return err } @@ -169,11 +169,11 @@ func (cc *CnController) SyncCnSpec(ctx context.Context, object object.StarRocksO if err != nil { return err } - config[rutils.QUERY_PORT] = strconv.FormatInt(int64(rutils.GetPort(feconfig, rutils.QUERY_PORT)), 10) - config[rutils.HTTP_PORT] = strconv.FormatInt(int64(rutils.GetPort(feconfig, rutils.HTTP_PORT)), 10) + cnConfig[rutils.QUERY_PORT] = strconv.FormatInt(int64(rutils.GetPort(feconfig, rutils.QUERY_PORT)), 10) + cnConfig[rutils.HTTP_PORT] = strconv.FormatInt(int64(rutils.GetPort(feconfig, rutils.HTTP_PORT)), 10) // build and deploy statefulset - podTemplateSpec, err := cc.buildPodTemplate(ctx, object, cnSpec, config) + podTemplateSpec, err := cc.buildPodTemplate(ctx, object, cnSpec, cnConfig) if err != nil { logger.Error(err, "build pod template failed") return err @@ -186,16 +186,9 @@ func (cc *CnController) SyncCnSpec(ctx context.Context, object object.StarRocksO // build and deploy service defaultLabels := load.Labels(object.SubResourcePrefixName, cnSpec) - externalsvc := rutils.BuildExternalService(object, cnSpec, config, + externalsvc := rutils.BuildExternalService(object, cnSpec, cnConfig, load.Selector(object.SubResourcePrefixName, cnSpec), defaultLabels) - searchServiceName := service.SearchServiceName(object.SubResourcePrefixName, cnSpec) - internalService := service.MakeSearchService(searchServiceName, &externalsvc, []corev1.ServicePort{ - { - Name: "heartbeat", - Port: rutils.GetPort(config, rutils.HEARTBEAT_SERVICE_PORT), - TargetPort: intstr.FromInt(int(rutils.GetPort(config, rutils.HEARTBEAT_SERVICE_PORT))), - }, - }, defaultLabels) + internalService := generateInternalService(object, cnSpec, &externalsvc, cnConfig, defaultLabels) if err := k8sutils.ApplyService(ctx, cc.k8sClient, &externalsvc, rutils.ServiceDeepEqual); err != nil { logger.Error(err, "sync CN external service failed") @@ -623,3 +616,26 @@ func (cc *CnController) SyncComputeNodesInFE(ctx context.Context, object object. return nil } + +func generateInternalService(object object.StarRocksObject, cnSpec *srapi.StarRocksCnSpec, + externalService *corev1.Service, cnConfig map[string]interface{}, labels map[string]string) *corev1.Service { + searchServiceName := service.SearchServiceName(object.SubResourcePrefixName, cnSpec) + searchSvc := service.MakeSearchService(searchServiceName, externalService, []corev1.ServicePort{ + { + Name: "heartbeat", + Port: rutils.GetPort(cnConfig, rutils.HEARTBEAT_SERVICE_PORT), + TargetPort: intstr.FromInt(int(rutils.GetPort(cnConfig, rutils.HEARTBEAT_SERVICE_PORT))), + }, + }, labels) + + arrowFlightPort := rutils.GetPort(cnConfig, rutils.ARROW_FLIGHT_PORT) + if arrowFlightPort != 0 { + searchSvc.Spec.Ports = append(searchSvc.Spec.Ports, corev1.ServicePort{ + Name: rutils.CnArrowFlightPortName, + Port: arrowFlightPort, + TargetPort: intstr.FromInt(int(arrowFlightPort)), + }) + } + + return searchSvc +} diff --git a/pkg/subcontrollers/cn/cn_controller_test.go b/pkg/subcontrollers/cn/cn_controller_test.go index e9a49acd..627163e0 100644 --- a/pkg/subcontrollers/cn/cn_controller_test.go +++ b/pkg/subcontrollers/cn/cn_controller_test.go @@ -19,6 +19,7 @@ import ( "database/sql" "errors" "fmt" + "os" "reflect" "testing" @@ -32,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -43,8 +45,9 @@ import ( "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/templates/service" ) -func TestMain(_ *testing.M) { +func TestMain(m *testing.M) { srapi.Register() + os.Exit(m.Run()) } func Test_ClearResources(t *testing.T) { @@ -204,7 +207,7 @@ func Test_SyncWarehouse(t *testing.T) { warehouse := &srapi.StarRocksWarehouse{ ObjectMeta: metav1.ObjectMeta{ - Name: "test", + Name: "wh1", Namespace: "default", }, Spec: srapi.StarRocksWarehouseSpec{ @@ -236,8 +239,6 @@ func Test_SyncWarehouse(t *testing.T) { }}, } - srapi.Register() - cc := New(fake.NewFakeClient(srapi.Scheme, src, feConfigMap, warehouse, ep), fake.GetEventRecorderFor(nil)) cc.addEnvForWarehouse = true @@ -258,7 +259,7 @@ func Test_SyncWarehouse(t *testing.T) { }, &externalService), ) - require.Equal(t, "test-warehouse-cn-service", externalService.Name) + require.Equal(t, "wh1-warehouse-cn-service", externalService.Name) require.NoError(t, cc.k8sClient.Get(context.Background(), types.NamespacedName{ @@ -266,7 +267,7 @@ func Test_SyncWarehouse(t *testing.T) { Namespace: "default"}, &searchService), ) - require.Equal(t, "test-warehouse-cn-search", searchService.Name) + require.Equal(t, "wh1-warehouse-cn-search", searchService.Name) require.NoError(t, cc.k8sClient.Get(context.Background(), types.NamespacedName{ @@ -274,7 +275,7 @@ func Test_SyncWarehouse(t *testing.T) { Namespace: "default"}, &sts), ) - require.Equal(t, "test-warehouse-cn", sts.Name) + require.Equal(t, "wh1-warehouse-cn", sts.Name) } func TestCnController_UpdateStatus(t *testing.T) { @@ -895,3 +896,68 @@ func TestCnController_SyncComputeNodesInFE(t *testing.T) { }) } } + +func TestGenerateInternalService(t *testing.T) { + type args struct { + object object.StarRocksObject + cnSpec *srapi.StarRocksCnSpec + externalService *corev1.Service + cnConfig map[string]interface{} + labels map[string]string + } + tests := []struct { + name string + args args + want *corev1.Service + }{ + { + name: "test1", + args: args{ + object: object.NewFromCluster( + &srapi.StarRocksCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + }, + }, + ), + cnSpec: &srapi.StarRocksCnSpec{}, + externalService: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-cn-service", + Namespace: "default", + Labels: map[string]string{"l1": "l1"}, + Annotations: map[string]string{"a": "a"}, + }, + }, + cnConfig: map[string]interface{}{ + "heartbeat_service_port": "1234", + rutils.ARROW_FLIGHT_PORT: "5678", + }, + labels: map[string]string{ + "l2": "l2", + }, + }, + want: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-cn-search", + Namespace: "default", + Labels: map[string]string{"l2": "l2"}, + Annotations: nil, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + {Name: "heartbeat", Port: 1234, TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 1234}}, + {Name: "arrow-flight", Port: 5678, TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 5678}}, + }, + ClusterIP: "None", + PublishNotReadyAddresses: true, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, generateInternalService(tt.args.object, tt.args.cnSpec, tt.args.externalService, tt.args.cnConfig, tt.args.labels), "generateInternalService(%v, %v, %v, %v, %v)", tt.args.object, tt.args.cnSpec, tt.args.externalService, tt.args.cnConfig, tt.args.labels) + }) + } +} diff --git a/pkg/subcontrollers/fe/fe_controller_test.go b/pkg/subcontrollers/fe/fe_controller_test.go index c14f7d2f..7632927b 100644 --- a/pkg/subcontrollers/fe/fe_controller_test.go +++ b/pkg/subcontrollers/fe/fe_controller_test.go @@ -19,6 +19,7 @@ package fe_test import ( "context" "fmt" + "os" "reflect" "testing" "time" @@ -41,8 +42,9 @@ import ( "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe" ) -func TestMain(_ *testing.M) { +func TestMain(m *testing.M) { srapi.Register() + os.Exit(m.Run()) } func TestFeController_updateStatus(_ *testing.T) { From 1e441b612eedf8bccb54476d9ba23c7c3b975468 Mon Sep 17 00:00:00 2001 From: jmjm15x Date: Wed, 22 Oct 2025 13:19:49 -0700 Subject: [PATCH 4/4] repair unit tests broken by recent refactoring Signed-off-by: jmjm15x --- pkg/controllers/controllers.go | 6 +-- .../starrockscluster_controller.go | 10 ++--- .../starrockscluster_controller_test.go | 43 +++++++++++++------ .../be/be_controller_reconcile_test.go | 20 ++++++++- .../cn/cn_controller_reconcile_test.go | 20 ++++++++- 5 files changed, 76 insertions(+), 23 deletions(-) diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 49da59d3..a2c30286 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -208,19 +208,19 @@ func isComponentReady(ctx context.Context, k8sClient client.Client, cluster *sra if cluster.Spec.StarRocksFeSpec == nil { return true // Component not configured, consider it ready } - serviceName = rutils.ExternalServiceName(cluster.Name, (*srapi.StarRocksFeSpec)(nil)) + serviceName = rutils.ExternalServiceName(cluster.Name, cluster.Spec.StarRocksFeSpec) statefulSetName = cluster.Name + "-fe" case componentTypeBE: if cluster.Spec.StarRocksBeSpec == nil { return true } - serviceName = rutils.ExternalServiceName(cluster.Name, (*srapi.StarRocksBeSpec)(nil)) + serviceName = rutils.ExternalServiceName(cluster.Name, cluster.Spec.StarRocksBeSpec) statefulSetName = cluster.Name + "-be" case componentTypeCN: if cluster.Spec.StarRocksCnSpec == nil { return true } - serviceName = rutils.ExternalServiceName(cluster.Name, (*srapi.StarRocksCnSpec)(nil)) + serviceName = rutils.ExternalServiceName(cluster.Name, cluster.Spec.StarRocksCnSpec) statefulSetName = cluster.Name + "-cn" default: return true diff --git a/pkg/controllers/starrockscluster_controller.go b/pkg/controllers/starrockscluster_controller.go index d37e9e53..6104527b 100644 --- a/pkg/controllers/starrockscluster_controller.go +++ b/pkg/controllers/starrockscluster_controller.go @@ -101,15 +101,15 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req controllerName := rc.GetControllerName() // During upgrades, check BE and CN are ready BEFORE syncing FE - if isUpgradeScenario && controllerName == "feController" { + if isUpgradeScenario && controllerName == r.FeController.GetControllerName() { // Check BE readiness if BE exists in spec - if src.Spec.StarRocksBeSpec != nil && !isComponentReady(ctx, r.Client, src, "be") { + if src.Spec.StarRocksBeSpec != nil && !isComponentReady(ctx, r.Client, src, componentTypeBE) { logger.Info("upgrade: waiting for BE rollout to complete before updating FE", "controller", controllerName) return ctrl.Result{}, nil } // Check CN readiness if CN exists in spec - if src.Spec.StarRocksCnSpec != nil && !isComponentReady(ctx, r.Client, src, "cn") { + if src.Spec.StarRocksCnSpec != nil && !isComponentReady(ctx, r.Client, src, componentTypeCN) { logger.Info("upgrade: waiting for CN rollout to complete before updating FE", "controller", controllerName) return ctrl.Result{}, nil @@ -128,8 +128,8 @@ func (r *StarRocksClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req // After syncing, check if we need to wait for this component to be ready before proceeding // Initial deployment: Wait for FE to be ready before creating BE/CN - if !isUpgradeScenario && controllerName == "feController" { - if src.Spec.StarRocksFeSpec != nil && !isComponentReady(ctx, r.Client, src, "fe") { + if !isUpgradeScenario && controllerName == r.FeController.GetControllerName() { + if src.Spec.StarRocksFeSpec != nil && !isComponentReady(ctx, r.Client, src, componentTypeFE) { logger.Info("initial deployment: waiting for FE to be ready before creating BE/CN", "controller", controllerName) return ctrl.Result{}, nil } diff --git a/pkg/controllers/starrockscluster_controller_test.go b/pkg/controllers/starrockscluster_controller_test.go index d7ca1695..3e21ce21 100644 --- a/pkg/controllers/starrockscluster_controller_test.go +++ b/pkg/controllers/starrockscluster_controller_test.go @@ -505,10 +505,11 @@ func TestCheckForImageChanges(t *testing.T) { // TestGetControllersInOrder tests controller ordering based on deployment scenario func TestGetControllersInOrder(t *testing.T) { ctx := context.Background() - feCtrl := fe.New(nil, nil) - beCtrl := be.New(nil, nil) - cnCtrl := cn.New(nil, nil) - feProxyCtrl := feproxy.New(nil, nil) + client := fake.NewFakeClient(srapi.Scheme) + feCtrl := fe.New(client, fake.GetEventRecorderFor(nil)) + beCtrl := be.New(client, fake.GetEventRecorderFor(nil)) + cnCtrl := cn.New(client, fake.GetEventRecorderFor(nil)) + feProxyCtrl := feproxy.New(client, fake.GetEventRecorderFor(nil)) t.Run("initial deployment uses FE-first order", func(t *testing.T) { cluster := newTestCluster("", "starrocks/fe:3.1.0") @@ -518,11 +519,11 @@ func TestGetControllersInOrder(t *testing.T) { controllers := getControllersInOrder(isUpgradeScenario, feCtrl, beCtrl, cnCtrl, feProxyCtrl) // Check FE is first - require.Equal(t, "fe", controllers[0].GetControllerName()) + require.Equal(t, "feController", controllers[0].GetControllerName()) // Verify order: FE -> BE -> CN -> FeProxy - require.Equal(t, "be", controllers[1].GetControllerName()) - require.Equal(t, "cn", controllers[2].GetControllerName()) - require.Equal(t, "feproxy", controllers[3].GetControllerName()) + require.Equal(t, "beController", controllers[1].GetControllerName()) + require.Equal(t, "cnController", controllers[2].GetControllerName()) + require.Equal(t, "feProxyController", controllers[3].GetControllerName()) }) t.Run("upgrade uses BE-first order", func(t *testing.T) { @@ -534,11 +535,11 @@ func TestGetControllersInOrder(t *testing.T) { controllers := getControllersInOrder(isUpgradeScenario, feCtrl, beCtrl, cnCtrl, feProxyCtrl) // Check BE is first - require.Equal(t, "be", controllers[0].GetControllerName()) + require.Equal(t, "beController", controllers[0].GetControllerName()) // Verify order: BE -> CN -> FE -> FeProxy (StarRocks upgrade procedure) - require.Equal(t, "cn", controllers[1].GetControllerName()) - require.Equal(t, "fe", controllers[2].GetControllerName()) - require.Equal(t, "feproxy", controllers[3].GetControllerName()) + require.Equal(t, "cnController", controllers[1].GetControllerName()) + require.Equal(t, "feController", controllers[2].GetControllerName()) + require.Equal(t, "feProxyController", controllers[3].GetControllerName()) }) } @@ -593,7 +594,23 @@ func TestIsComponentReady(t *testing.T) { }, }, } - client := fake.NewFakeClient(srapi.Scheme, endpoints) + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster-fe", + Namespace: "default", + Generation: 1, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: rutils.GetInt32Pointer(1), + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 1, + CurrentRevision: "rev1", + UpdateRevision: "rev1", + ReadyReplicas: 1, + }, + } + client := fake.NewFakeClient(srapi.Scheme, endpoints, sts) result := isComponentReady(ctx, client, cluster, "fe") require.True(t, result) }) diff --git a/pkg/subcontrollers/be/be_controller_reconcile_test.go b/pkg/subcontrollers/be/be_controller_reconcile_test.go index 981f6f8a..bf8f6544 100644 --- a/pkg/subcontrollers/be/be_controller_reconcile_test.go +++ b/pkg/subcontrollers/be/be_controller_reconcile_test.go @@ -96,6 +96,24 @@ func TestStarRocksClusterReconciler_BeResourceCreate(t *testing.T) { }}, } + // mock the fe statefulset is ready (rollout complete) + feStatefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "starrockscluster-sample-fe", + Namespace: "default", + Generation: 1, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: rutils.GetInt32Pointer(3), + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 1, + CurrentRevision: "rev1", + UpdateRevision: "rev1", + ReadyReplicas: 3, + }, + } + // mock the fe configMap feConfigMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -118,7 +136,7 @@ func TestStarRocksClusterReconciler_BeResourceCreate(t *testing.T) { }, } - r := newStarRocksClusterController(src, ep, feConfigMap, beConfigMap) + r := newStarRocksClusterController(src, ep, feStatefulSet, feConfigMap, beConfigMap) res, err := r.Reconcile(context.Background(), reconcile.Request{ NamespacedName: types.NamespacedName{ diff --git a/pkg/subcontrollers/cn/cn_controller_reconcile_test.go b/pkg/subcontrollers/cn/cn_controller_reconcile_test.go index 1821daaf..78f63ae5 100644 --- a/pkg/subcontrollers/cn/cn_controller_reconcile_test.go +++ b/pkg/subcontrollers/cn/cn_controller_reconcile_test.go @@ -169,6 +169,24 @@ func TestStarRocksClusterReconciler_CnResourceCreate(t *testing.T) { }}, } + // mock the fe statefulset is ready (rollout complete) + feStatefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "starrockscluster-sample-fe", + Namespace: "default", + Generation: 1, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: rutils.GetInt32Pointer(3), + }, + Status: appsv1.StatefulSetStatus{ + ObservedGeneration: 1, + CurrentRevision: "rev1", + UpdateRevision: "rev1", + ReadyReplicas: 3, + }, + } + // mock the fe configMap feConfigMap := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -191,7 +209,7 @@ func TestStarRocksClusterReconciler_CnResourceCreate(t *testing.T) { }, } - r := newStarRocksClusterController(src, ep, feConfigMap, cnConfigMap) + r := newStarRocksClusterController(src, ep, feStatefulSet, feConfigMap, cnConfigMap) res, err := r.Reconcile(context.Background(), reconcile.Request{ NamespacedName: types.NamespacedName{