@@ -10,12 +10,14 @@ import (
1010 "strings"
1111 "time"
1212
13+ "k8s.io/apimachinery/pkg/api/meta"
1314 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415 "k8s.io/utils/ptr"
1516
1617 "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
1718 "github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
1819 "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
20+ "github.com/ray-project/kuberay/ray-operator/pkg/features"
1921
2022 batchv1 "k8s.io/api/batch/v1"
2123 rbacv1 "k8s.io/api/rbac/v1"
@@ -44,7 +46,8 @@ import (
4446 "sigs.k8s.io/controller-runtime/pkg/reconcile"
4547)
4648
47- type reconcileFunc func (context.Context , * rayv1.RayCluster ) error
49+ type rayClusterConditions map [rayv1.RayClusterConditionType ]metav1.Condition
50+ type reconcileFunc func (context.Context , * rayv1.RayCluster , rayClusterConditions ) error
4851
4952var (
5053 DefaultRequeueDuration = 2 * time .Second
@@ -300,6 +303,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
300303 return ctrl.Result {}, nil
301304 }
302305
306+ // conditions should be mutated by the following reconcileXXX functions.
307+ conditions := map [rayv1.RayClusterConditionType ]metav1.Condition {
308+ rayv1 .RayClusterReplicaFailure : {Status : metav1 .ConditionFalse }, // omit the Condition.Type here for simplicity. we will set it later in the updateRayClusterStatus().
309+ }
310+
303311 reconcileFuncs := []reconcileFunc {
304312 r .reconcileAutoscalerServiceAccount ,
305313 r .reconcileAutoscalerRole ,
@@ -312,7 +320,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
312320 }
313321
314322 for _ , fn := range reconcileFuncs {
315- if reconcileErr = fn (ctx , instance ); reconcileErr != nil {
323+ if reconcileErr = fn (ctx , instance , conditions ); reconcileErr != nil {
316324 funcName := runtime .FuncForPC (reflect .ValueOf (fn ).Pointer ()).Name ()
317325 logger .Error (reconcileErr , "Error reconcile resources" , "function name" , funcName )
318326 break
@@ -325,7 +333,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
325333 if calculateErr != nil {
326334 logger .Info ("Got error when calculating new status" , "cluster name" , request .Name , "error" , calculateErr )
327335 } else {
328- updateErr = r .updateRayClusterStatus (ctx , originalRayClusterInstance , newInstance )
336+ updateErr = r .updateRayClusterStatus (ctx , originalRayClusterInstance , newInstance , conditions )
329337 }
330338
331339 // Return error based on order.
@@ -394,7 +402,7 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
394402 return false
395403}
396404
397- func (r * RayClusterReconciler ) reconcileIngress (ctx context.Context , instance * rayv1.RayCluster ) error {
405+ func (r * RayClusterReconciler ) reconcileIngress (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
398406 logger := ctrl .LoggerFrom (ctx )
399407 logger .Info ("Reconciling Ingress" )
400408 if instance .Spec .HeadGroupSpec .EnableIngress == nil || ! * instance .Spec .HeadGroupSpec .EnableIngress {
@@ -474,7 +482,7 @@ func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, i
474482}
475483
476484// Return nil only when the head service successfully created or already exists.
477- func (r * RayClusterReconciler ) reconcileHeadService (ctx context.Context , instance * rayv1.RayCluster ) error {
485+ func (r * RayClusterReconciler ) reconcileHeadService (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
478486 logger := ctrl .LoggerFrom (ctx )
479487 services := corev1.ServiceList {}
480488 filterLabels := client.MatchingLabels {utils .RayClusterLabelKey : instance .Name , utils .RayNodeTypeLabelKey : string (rayv1 .HeadNode )}
@@ -526,7 +534,7 @@ func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instanc
526534}
527535
528536// Return nil only when the serve service successfully created or already exists.
529- func (r * RayClusterReconciler ) reconcileServeService (ctx context.Context , instance * rayv1.RayCluster ) error {
537+ func (r * RayClusterReconciler ) reconcileServeService (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
530538 // Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
531539 if enableServeServiceValue , exist := instance .Annotations [utils .EnableServeServiceKey ]; ! exist || enableServeServiceValue != utils .EnableServeServiceTrue {
532540 return nil
@@ -555,7 +563,7 @@ func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instan
555563}
556564
557565// Return nil only when the headless service for multi-host worker groups is successfully created or already exists.
558- func (r * RayClusterReconciler ) reconcileHeadlessService (ctx context.Context , instance * rayv1.RayCluster ) error {
566+ func (r * RayClusterReconciler ) reconcileHeadlessService (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
559567 // Check if there are worker groups with NumOfHosts > 1 in the cluster
560568 isMultiHost := false
561569 for _ , workerGroup := range instance .Spec .WorkerGroupSpecs {
@@ -591,12 +599,17 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
591599 return nil
592600}
593601
594- func (r * RayClusterReconciler ) reconcilePods (ctx context.Context , instance * rayv1.RayCluster ) error {
602+ func (r * RayClusterReconciler ) reconcilePods (ctx context.Context , instance * rayv1.RayCluster , conditions rayClusterConditions ) error {
595603 logger := ctrl .LoggerFrom (ctx )
596604
597605 // if RayCluster is suspended, delete all pods and skip reconcile
598606 if instance .Spec .Suspend != nil && * instance .Spec .Suspend {
599607 if _ , err := r .deleteAllPods (ctx , common .RayClusterAllPodsAssociationOptions (instance )); err != nil {
608+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
609+ Status : metav1 .ConditionTrue ,
610+ Reason : "FailedDeleteAllPods" ,
611+ Message : err .Error (),
612+ }
600613 return err
601614 }
602615
@@ -632,6 +645,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
632645 logger .Info ("reconcilePods" , "head Pod" , headPod .Name , "shouldDelete" , shouldDelete , "reason" , reason )
633646 if shouldDelete {
634647 if err := r .Delete (ctx , & headPod ); err != nil {
648+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
649+ Status : metav1 .ConditionTrue ,
650+ Reason : "FailedDeleteHeadPod" ,
651+ Message : err .Error (),
652+ }
635653 return err
636654 }
637655 r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "Deleted" ,
@@ -644,6 +662,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
644662 logger .Info ("reconcilePods" , "Found 0 head Pods; creating a head Pod for the RayCluster." , instance .Name )
645663 common .CreatedClustersCounterInc (instance .Namespace )
646664 if err := r .createHeadPod (ctx , * instance ); err != nil {
665+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
666+ Status : metav1 .ConditionTrue ,
667+ Reason : "FailedCreateHeadPod" ,
668+ Message : err .Error (),
669+ }
647670 common .FailedClustersCounterInc (instance .Namespace )
648671 return err
649672 }
@@ -663,6 +686,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
663686 // delete all the extra head pod pods
664687 for _ , extraHeadPodToDelete := range headPods .Items {
665688 if err := r .Delete (ctx , & extraHeadPodToDelete ); err != nil {
689+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
690+ Status : metav1 .ConditionTrue ,
691+ Reason : "FailedDeleteHeadPod" ,
692+ Message : err .Error (),
693+ }
666694 return err
667695 }
668696 }
@@ -690,6 +718,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
690718 numDeletedUnhealthyWorkerPods ++
691719 deletedWorkers [workerPod .Name ] = deleted
692720 if err := r .Delete (ctx , & workerPod ); err != nil {
721+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
722+ Status : metav1 .ConditionTrue ,
723+ Reason : "FailedDeleteWorkerPod" ,
724+ Message : err .Error (),
725+ }
693726 return err
694727 }
695728 r .Recorder .Eventf (instance , corev1 .EventTypeNormal , "Deleted" ,
@@ -713,6 +746,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
713746 logger .Info ("Deleting pod" , "namespace" , pod .Namespace , "name" , pod .Name )
714747 if err := r .Delete (ctx , & pod ); err != nil {
715748 if ! errors .IsNotFound (err ) {
749+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
750+ Status : metav1 .ConditionTrue ,
751+ Reason : "FailedDeleteWorkerPod" ,
752+ Message : err .Error (),
753+ }
716754 logger .Info ("reconcilePods" , "Fail to delete Pod" , pod .Name , "error" , err )
717755 return err
718756 }
@@ -749,6 +787,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
749787 for i = 0 ; i < diff ; i ++ {
750788 logger .Info ("reconcilePods" , "creating worker for group" , worker .GroupName , fmt .Sprintf ("index %d" , i ), fmt .Sprintf ("in total %d" , diff ))
751789 if err := r .createWorkerPod (ctx , * instance , * worker .DeepCopy ()); err != nil {
790+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
791+ Status : metav1 .ConditionTrue ,
792+ Reason : "FailedCreateWorkerPod" ,
793+ Message : err .Error (),
794+ }
752795 return err
753796 }
754797 }
@@ -782,6 +825,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
782825 logger .Info ("Randomly deleting Pod" , "progress" , fmt .Sprintf ("%d / %d" , i + 1 , randomlyRemovedWorkers ), "with name" , randomPodToDelete .Name )
783826 if err := r .Delete (ctx , & randomPodToDelete ); err != nil {
784827 if ! errors .IsNotFound (err ) {
828+ conditions [rayv1 .RayClusterReplicaFailure ] = metav1.Condition {
829+ Status : metav1 .ConditionTrue ,
830+ Reason : "FailedDeleteWorkerPod" ,
831+ Message : err .Error (),
832+ }
785833 return err
786834 }
787835 logger .Info ("reconcilePods" , "The worker Pod has already been deleted" , randomPodToDelete .Name )
@@ -1301,7 +1349,7 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
13011349 return nil
13021350}
13031351
1304- func (r * RayClusterReconciler ) reconcileAutoscalerServiceAccount (ctx context.Context , instance * rayv1.RayCluster ) error {
1352+ func (r * RayClusterReconciler ) reconcileAutoscalerServiceAccount (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
13051353 logger := ctrl .LoggerFrom (ctx )
13061354 if instance .Spec .EnableInTreeAutoscaling == nil || ! * instance .Spec .EnableInTreeAutoscaling {
13071355 return nil
@@ -1356,7 +1404,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Con
13561404 return nil
13571405}
13581406
1359- func (r * RayClusterReconciler ) reconcileAutoscalerRole (ctx context.Context , instance * rayv1.RayCluster ) error {
1407+ func (r * RayClusterReconciler ) reconcileAutoscalerRole (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
13601408 logger := ctrl .LoggerFrom (ctx )
13611409 if instance .Spec .EnableInTreeAutoscaling == nil || ! * instance .Spec .EnableInTreeAutoscaling {
13621410 return nil
@@ -1397,7 +1445,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, inst
13971445 return nil
13981446}
13991447
1400- func (r * RayClusterReconciler ) reconcileAutoscalerRoleBinding (ctx context.Context , instance * rayv1.RayCluster ) error {
1448+ func (r * RayClusterReconciler ) reconcileAutoscalerRoleBinding (ctx context.Context , instance * rayv1.RayCluster , _ rayClusterConditions ) error {
14011449 logger := ctrl .LoggerFrom (ctx )
14021450 if instance .Spec .EnableInTreeAutoscaling == nil || ! * instance .Spec .EnableInTreeAutoscaling {
14031451 return nil
@@ -1438,11 +1486,21 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
14381486 return nil
14391487}
14401488
1441- func (r * RayClusterReconciler ) updateRayClusterStatus (ctx context.Context , originalRayClusterInstance , newInstance * rayv1.RayCluster ) error {
1489+ func (r * RayClusterReconciler ) updateRayClusterStatus (ctx context.Context , originalRayClusterInstance , newInstance * rayv1.RayCluster , conditions rayClusterConditions ) error {
14421490 logger := ctrl .LoggerFrom (ctx )
1443- if ! r .inconsistentRayClusterStatus (ctx , originalRayClusterInstance .Status , newInstance .Status ) {
1491+
1492+ inconsistent := false
1493+ if features .Enabled (features .RayClusterStatusConditions ) {
1494+ for typ , condition := range conditions {
1495+ condition .Type = string (typ ) // make sure the condition.Type is set correctly.
1496+ inconsistent = meta .SetStatusCondition (& newInstance .Status .Conditions , condition ) || inconsistent
1497+ }
1498+ }
1499+ inconsistent = r .inconsistentRayClusterStatus (ctx , originalRayClusterInstance .Status , newInstance .Status ) || inconsistent
1500+ if ! inconsistent {
14441501 return nil
14451502 }
1503+
14461504 logger .Info ("updateRayClusterStatus" , "name" , originalRayClusterInstance .Name , "old status" , originalRayClusterInstance .Status , "new status" , newInstance .Status )
14471505 err := r .Status ().Update (ctx , newInstance )
14481506 if err != nil {
0 commit comments