Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/common/resource_utils/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (

// the fe ports key
const (
HTTP_PORT = "http_port"
RPC_PORT = "rpc_port"
QUERY_PORT = "query_port"
EDIT_LOG_PORT = "edit_log_port"
HTTP_PORT = "http_port"
RPC_PORT = "rpc_port"
QUERY_PORT = "query_port"
EDIT_LOG_PORT = "edit_log_port"
ARROW_FLIGHT_PORT = "arrow_flight_port"
)

// the cn or be ports key
Expand Down
38 changes: 24 additions & 14 deletions pkg/common/resource_utils/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ import (
)

const (
FeHTTPPortName = "http"
FeRPCPortName = "rpc"
FeQueryPortName = "query"
FeEditLogPortName = "edit-log"

BePortName = "be"
BeWebserverPortName = "webserver"
BeHeartbeatPortName = "heartbeat"
BeBrpcPortName = "brpc"

CnThriftPortName = "thrift"
CnWebserverPortName = "webserver"
CnHeartbeatPortName = "heartbeat"
CnBrpcPortName = "brpc"
FeHTTPPortName = "http"
FeRPCPortName = "rpc"
FeQueryPortName = "query"
FeEditLogPortName = "edit-log"
FEArrowFlightPortName = "arrow-flight"

BePortName = "be"
BeWebserverPortName = "webserver"
BeHeartbeatPortName = "heartbeat"
BeBrpcPortName = "brpc"
BEArrowFlightPortName = "arrow-flight"

CnThriftPortName = "thrift"
CnWebserverPortName = "webserver"
CnHeartbeatPortName = "heartbeat"
CnBrpcPortName = "brpc"
CnArrowFlightPortName = "arrow-flight"
)

// HashService service hash components
Expand Down Expand Up @@ -149,6 +152,13 @@ func getFeServicePorts(config map[string]interface{}, service *srapi.StarRocksSe
Port: editPort, ContainerPort: editPort, Name: FeEditLogPortName,
}))

arrowFlightPort := GetPort(config, ARROW_FLIGHT_PORT)
if arrowFlightPort != 0 {
srPorts = append(srPorts, mergePort(service, srapi.StarRocksServicePort{
Port: arrowFlightPort, ContainerPort: arrowFlightPort, Name: FEArrowFlightPortName,
}))
}

return srPorts
}

Expand Down
227 changes: 221 additions & 6 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,42 @@ package controllers
import (
"context"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

srapi "github.com/StarRocks/starrocks-kubernetes-operator/pkg/apis/starrocks/v1"
rutils "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/templates/service"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/be"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/cn"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe"
"github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/feproxy"
)

const (
componentTypeFE = "fe"
componentTypeBE = "be"
componentTypeCN = "cn"
)

func SetupClusterReconciler(mgr ctrl.Manager) error {
feController := fe.New(mgr.GetClient(), mgr.GetEventRecorderFor)
beController := be.New(mgr.GetClient(), mgr.GetEventRecorderFor)
cnController := cn.New(mgr.GetClient(), mgr.GetEventRecorderFor)
feProxyController := feproxy.New(mgr.GetClient(), mgr.GetEventRecorderFor)
subcs := []subcontrollers.ClusterSubController{
feController, beController, cnController, feProxyController,
}

reconciler := &StarRocksClusterReconciler{
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("starrockscluster-controller"),
Scs: subcs,
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor("starrockscluster-controller"),
FeController: feController,
BeController: beController,
CnController: cnController,
FeProxyController: feProxyController,
}

if err := reconciler.SetupWithManager(mgr); err != nil {
Expand All @@ -38,6 +47,109 @@ func SetupClusterReconciler(mgr ctrl.Manager) error {
return nil
}

// getControllersInOrder returns controllers in the appropriate order based on deployment scenario
func getControllersInOrder(
isUpgradeScenario bool,
fe, be, cn, feproxy subcontrollers.ClusterSubController,
) []subcontrollers.ClusterSubController {
if isUpgradeScenario {
return []subcontrollers.ClusterSubController{be, cn, fe, feproxy}
}

// default order
return []subcontrollers.ClusterSubController{fe, be, cn, feproxy}
}

// isUpgrade determines if the current reconciliation is an upgrade scenario.
// Returns true only if StatefulSets exist AND there are image changes detected.
func isUpgrade(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool {
logger := logr.FromContextOrDiscard(ctx)

// Check FE first (always required in StarRocks)
feSts := &appsv1.StatefulSet{}
feExists := kubeClient.Get(ctx, types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name + "-fe",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use load.Name(cluster.Name, cluster.Spec.StarRocksFeSpec)

}, feSts) == nil

beSts := &appsv1.StatefulSet{}
beExists := kubeClient.Get(ctx, types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name + "-be",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use load.Name(cluster.Name, cluster.Spec.StarRocksBeSpec)

}, beSts) == nil

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is missing the CN check

// Corrupted state safeguard: BE exists but FE doesn't (invalid configuration).
// Treat as initial deployment so FE is reconciled first.
// Rationale: FE is a prerequisite for BE/CN; prioritizing FE allows recovery without misordering.
if beExists && !feExists {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is duplicated with the following !feExists condition

logger.Info("WARNING: BE StatefulSet exists without FE - treating as initial deployment to recreate FE first")
return false
}

if !feExists {
return false
}

return checkForImageChanges(ctx, kubeClient, cluster)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above code is detecting whether sts is existing, checkForImageChanges compare their images. My suggestion is can we merge them together?

  1. if cluster spec fe exist, check whether sts exist, then check the image.

}

// checkForImageChanges compares the desired StarRocks component images
// against the currently deployed StatefulSet images.
// Returns true if any component image differs.
func checkForImageChanges(ctx context.Context, kubeClient client.Client, cluster *srapi.StarRocksCluster) bool {
if cluster.Spec.StarRocksFeSpec != nil {
desiredImage := cluster.Spec.StarRocksFeSpec.Image
currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-fe")
if currentImage != "" && desiredImage != currentImage {
return true
}
}

if cluster.Spec.StarRocksBeSpec != nil {
desiredImage := cluster.Spec.StarRocksBeSpec.Image
currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-be")
if currentImage != "" && desiredImage != currentImage {
return true
}
}

if cluster.Spec.StarRocksCnSpec != nil {
desiredImage := cluster.Spec.StarRocksCnSpec.Image
currentImage := getCurrentImageFromStatefulSet(ctx, kubeClient, cluster.Namespace, cluster.Name+"-cn")
if currentImage != "" && desiredImage != currentImage {
return true
}
}

return false
}

// getCurrentImageFromStatefulSet returns the container image used in a StatefulSet.
// Returns an empty string if the StatefulSet is missing or has no containers.
func getCurrentImageFromStatefulSet(ctx context.Context, kubeClient client.Client, namespace, name string) string {
var sts appsv1.StatefulSet
if err := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &sts); err != nil {
// StatefulSet does not exist
return ""
}

containers := sts.Spec.Template.Spec.Containers
if len(containers) == 0 {
return ""
}

// Prefer a named match for known StarRocks components
for _, c := range containers {
switch c.Name {
case "fe", "be", "cn":
return c.Image
}
}

// Fallback for backward compatibility
return containers[0].Image
}

// SetupWithManager sets up the controller with the Manager.
func (r *StarRocksClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
// cannot add Owns(&v2.HorizontalPodAutoscaler{}), because if a kubernetes version is lower than 1.23,
Expand Down Expand Up @@ -82,6 +194,109 @@ func SetupWarehouseReconciler(mgr ctrl.Manager, namespace string) error {
return nil
}

// isComponentReady checks if a component is ready by verifying:
// 1. Its service endpoints have ready addresses (pods are healthy)
// 2. Its StatefulSet rollout is complete (no pending updates)
func isComponentReady(ctx context.Context, k8sClient client.Client, cluster *srapi.StarRocksCluster, componentType string) bool {
logger := logr.FromContextOrDiscard(ctx)

var serviceName string
var statefulSetName string

switch componentType {
case componentTypeFE:
if cluster.Spec.StarRocksFeSpec == nil {
return true // Component not configured, consider it ready
}
serviceName = rutils.ExternalServiceName(cluster.Name, cluster.Spec.StarRocksFeSpec)
statefulSetName = cluster.Name + "-fe"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use load.Name(cluster.Name, cluster.Spec.StarRocksFeSpec), and you can pass a nil pointer for the second parameter.

case componentTypeBE:
if cluster.Spec.StarRocksBeSpec == nil {
return true
}
serviceName = rutils.ExternalServiceName(cluster.Name, cluster.Spec.StarRocksBeSpec)
statefulSetName = cluster.Name + "-be"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same issue

case componentTypeCN:
if cluster.Spec.StarRocksCnSpec == nil {
return true
}
serviceName = rutils.ExternalServiceName(cluster.Name, cluster.Spec.StarRocksCnSpec)
statefulSetName = cluster.Name + "-cn"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same issuse

default:
return true
}

// Check 1: Service endpoints must have ready addresses
endpoints := corev1.Endpoints{}
if err := k8sClient.Get(ctx, types.NamespacedName{
Namespace: cluster.Namespace,
Name: serviceName,
}, &endpoints); err != nil {
logger.V(5).Info("get component service endpoints failed", "component", componentType, "serviceName", serviceName, "error", err)
return false
}

hasReadyEndpoints := false
for _, sub := range endpoints.Subsets {
if len(sub.Addresses) > 0 {
hasReadyEndpoints = true
break
}
}

if !hasReadyEndpoints {
logger.Info("component not ready: no ready endpoints", "component", componentType, "serviceName", serviceName)
return false
}

// Check 2: StatefulSet rollout must be complete (currentRevision == updateRevision)
sts := &appsv1.StatefulSet{}
if err := k8sClient.Get(ctx, types.NamespacedName{
Namespace: cluster.Namespace,
Name: statefulSetName,
}, sts); err != nil {
logger.V(5).Info("get component StatefulSet failed", "component", componentType, "statefulSetName", statefulSetName, "error", err)
return false
}

// Check if StatefulSet controller has observed our latest spec change
if sts.Generation != sts.Status.ObservedGeneration {
logger.Info("component not ready: StatefulSet spec change not yet observed",
"component", componentType,
"statefulSetName", statefulSetName,
"generation", sts.Generation,
"observedGeneration", sts.Status.ObservedGeneration)
return false
}

// Check if rollout is complete
if sts.Status.CurrentRevision != sts.Status.UpdateRevision {
logger.Info("component not ready: StatefulSet rollout in progress",
"component", componentType,
"statefulSetName", statefulSetName,
"currentRevision", sts.Status.CurrentRevision,
"updateRevision", sts.Status.UpdateRevision)
return false
}

// Check if all replicas are ready
if sts.Status.ReadyReplicas != *sts.Spec.Replicas {
logger.Info("component not ready: waiting for replicas",
"component", componentType,
"statefulSetName", statefulSetName,
"readyReplicas", sts.Status.ReadyReplicas,
"desiredReplicas", *sts.Spec.Replicas)
return false
}

logger.Info("component is ready",
"component", componentType,
"serviceName", serviceName,
"readyAddresses", len(endpoints.Subsets[0].Addresses),
"revision", sts.Status.CurrentRevision)
return true
}

// SetupWithManager sets up the controller with the Manager.
func (r *StarRocksWarehouseReconciler) SetupWithManager(mgr ctrl.Manager) error {
// cannot add Owns(&v2.HorizontalPodAutoscaler{}), because if a kubernetes version is lower than 1.23,
Expand Down
2 changes: 0 additions & 2 deletions pkg/controllers/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ func TestSetupWarehouseReconciler(t *testing.T) {
},
}

v1.Register()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := SetupWarehouseReconciler(tt.args.mgr, tt.args.namespace); (err != nil) != tt.wantErr {
Expand Down
Loading