Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 77 additions & 13 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

batchv1 "k8s.io/api/batch/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -44,7 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type reconcileFunc func(context.Context, *rayv1.RayCluster) error
type (
rayClusterConditions map[rayv1.RayClusterConditionType]metav1.Condition
reconcileFunc func(context.Context, *rayv1.RayCluster, rayClusterConditions) error
)

var (
DefaultRequeueDuration = 2 * time.Second
Expand Down Expand Up @@ -300,6 +305,9 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
return ctrl.Result{}, nil
}

// conditions should be mutated by the following reconcileXXX functions.
conditions := defaultRayClusterConditions()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaultRayClusterConditions returns a simple map. I made it a function because it can be reused in tests.


reconcileFuncs := []reconcileFunc{
r.reconcileAutoscalerServiceAccount,
r.reconcileAutoscalerRole,
Expand All @@ -312,7 +320,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
}

for _, fn := range reconcileFuncs {
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
if reconcileErr = fn(ctx, instance, conditions); reconcileErr != nil {
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
break
Expand All @@ -325,7 +333,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
if calculateErr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to set conditions in the reconcile functions. Instead, we should set conditions in calculateStatus based on reconcileErr.

func calculateStatus(...) {
    if reconcileErr is PodError {
       // set condition
    }

}

logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
} else {
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance, conditions)
}

// Return error based on order.
Expand Down Expand Up @@ -394,7 +402,7 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
return false
}

func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)
logger.Info("Reconciling Ingress")
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress {
Expand Down Expand Up @@ -474,7 +482,7 @@ func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, i
}

// Return nil only when the head service successfully created or already exists.
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)
services := corev1.ServiceList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
Expand Down Expand Up @@ -526,7 +534,7 @@ func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instanc
}

// Return nil only when the serve service successfully created or already exists.
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; !exist || enableServeServiceValue != utils.EnableServeServiceTrue {
return nil
Expand Down Expand Up @@ -555,7 +563,7 @@ func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instan
}

// Return nil only when the headless service for multi-host worker groups is successfully created or already exists.
func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
// Check if there are worker groups with NumOfHosts > 1 in the cluster
isMultiHost := false
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
Expand Down Expand Up @@ -591,12 +599,17 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
return nil
}

func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster, conditions rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)

// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedDeleteAllPods",
Message: err.Error(),
}
return err
}

Expand Down Expand Up @@ -632,6 +645,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason)
if shouldDelete {
if err := r.Delete(ctx, &headPod); err != nil {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedDeleteHeadPod",
Message: err.Error(),
}
return err
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
Expand All @@ -644,6 +662,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("reconcilePods", "Found 0 head Pods; creating a head Pod for the RayCluster.", instance.Name)
common.CreatedClustersCounterInc(instance.Namespace)
if err := r.createHeadPod(ctx, *instance); err != nil {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedCreateHeadPod",
Message: err.Error(),
}
common.FailedClustersCounterInc(instance.Namespace)
return err
}
Expand All @@ -663,6 +686,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// delete all the extra head pod pods
for _, extraHeadPodToDelete := range headPods.Items {
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedDeleteHeadPod",
Message: err.Error(),
}
return err
}
}
Expand Down Expand Up @@ -690,6 +718,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
numDeletedUnhealthyWorkerPods++
deletedWorkers[workerPod.Name] = deleted
if err := r.Delete(ctx, &workerPod); err != nil {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedDeleteWorkerPod",
Message: err.Error(),
}
return err
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
Expand All @@ -713,6 +746,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("Deleting pod", "namespace", pod.Namespace, "name", pod.Name)
if err := r.Delete(ctx, &pod); err != nil {
if !errors.IsNotFound(err) {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedDeleteWorkerPod",
Message: err.Error(),
}
logger.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err)
return err
}
Expand Down Expand Up @@ -749,6 +787,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
for i = 0; i < diff; i++ {
logger.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff))
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedCreateWorkerPod",
Message: err.Error(),
}
return err
}
}
Expand Down Expand Up @@ -782,6 +825,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
if !errors.IsNotFound(err) {
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
Status: metav1.ConditionTrue,
Reason: "FailedDeleteWorkerPod",
Message: err.Error(),
}
return err
}
logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
Expand All @@ -796,6 +844,12 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

func defaultRayClusterConditions() rayClusterConditions {
return map[rayv1.RayClusterConditionType]metav1.Condition{
rayv1.RayClusterReplicaFailure: {Status: metav1.ConditionFalse}, // omit the Condition.Type here for simplicity. we will set it later in the updateRayClusterStatus().
}
}

// shouldDeletePod returns whether the Pod should be deleted and the reason
//
// @param pod: The Pod to be checked.
Expand Down Expand Up @@ -1301,7 +1355,7 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
return nil
}

func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
return nil
Expand Down Expand Up @@ -1356,7 +1410,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Con
return nil
}

func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
return nil
Expand Down Expand Up @@ -1397,7 +1451,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, inst
return nil
}

func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Context, instance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
return nil
Expand Down Expand Up @@ -1438,11 +1492,21 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
return nil
}

func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster, conditions rayClusterConditions) error {
logger := ctrl.LoggerFrom(ctx)
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {

inconsistent := false
if features.Enabled(features.RayClusterStatusConditions) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conditions will only be set if the gate is enabled.

for typ, condition := range conditions {
condition.Type = string(typ) // make sure the condition.Type is set correctly.
inconsistent = meta.SetStatusCondition(&newInstance.Status.Conditions, condition) || inconsistent
}
}
inconsistent = r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) || inconsistent
if !inconsistent {
return nil
}

logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
err := r.Status().Update(ctx, newInstance)
if err != nil {
Expand Down
Loading