Skip to content

Commit 6a5afa1

Browse files
committed
[Feat][RayCluster] Set RayClusterReplicaFailure condition to reflect the result of creating/deleting Pods
Signed-off-by: Rueian <[email protected]>
1 parent abafd17 commit 6a5afa1

File tree

2 files changed

+123
-47
lines changed

2 files changed

+123
-47
lines changed

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ import (
1010
"strings"
1111
"time"
1212

13+
"k8s.io/apimachinery/pkg/api/meta"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/utils/ptr"
1516

1617
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
1718
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
1819
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
20+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
1921

2022
batchv1 "k8s.io/api/batch/v1"
2123
rbacv1 "k8s.io/api/rbac/v1"
@@ -44,7 +46,10 @@ import (
4446
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4547
)
4648

47-
type reconcileFunc func(context.Context, *rayv1.RayCluster) error
49+
type (
50+
rayClusterConditions map[rayv1.RayClusterConditionType]metav1.Condition
51+
reconcileFunc func(context.Context, *rayv1.RayCluster, rayClusterConditions) error
52+
)
4853

4954
var (
5055
DefaultRequeueDuration = 2 * time.Second
@@ -300,6 +305,9 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
300305
return ctrl.Result{}, nil
301306
}
302307

308+
// conditions should be mutated by the following reconcileXXX functions.
309+
conditions := defaultRayClusterConditions()
310+
303311
reconcileFuncs := []reconcileFunc{
304312
r.reconcileAutoscalerServiceAccount,
305313
r.reconcileAutoscalerRole,
@@ -312,7 +320,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
312320
}
313321

314322
for _, fn := range reconcileFuncs {
315-
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
323+
if reconcileErr = fn(ctx, instance, conditions); reconcileErr != nil {
316324
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
317325
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
318326
break
@@ -325,7 +333,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
325333
if calculateErr != nil {
326334
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
327335
} else {
328-
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
336+
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance, conditions)
329337
}
330338

331339
// Return error based on order.
@@ -394,7 +402,7 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
394402
return false
395403
}
396404

397-
func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster) error {
405+
func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
398406
logger := ctrl.LoggerFrom(ctx)
399407
logger.Info("Reconciling Ingress")
400408
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress {
@@ -474,7 +482,7 @@ func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, i
474482
}
475483

476484
// Return nil only when the head service successfully created or already exists.
477-
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster) error {
485+
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
478486
logger := ctrl.LoggerFrom(ctx)
479487
services := corev1.ServiceList{}
480488
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
@@ -526,7 +534,7 @@ func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instanc
526534
}
527535

528536
// Return nil only when the serve service successfully created or already exists.
529-
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster) error {
537+
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
530538
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
531539
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; !exist || enableServeServiceValue != utils.EnableServeServiceTrue {
532540
return nil
@@ -555,7 +563,7 @@ func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instan
555563
}
556564

557565
// Return nil only when the headless service for multi-host worker groups is successfully created or already exists.
558-
func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, instance *rayv1.RayCluster) error {
566+
func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
559567
// Check if there are worker groups with NumOfHosts > 1 in the cluster
560568
isMultiHost := false
561569
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
@@ -591,12 +599,17 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
591599
return nil
592600
}
593601

594-
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
602+
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster, conditions rayClusterConditions) error {
595603
logger := ctrl.LoggerFrom(ctx)
596604

597605
// if RayCluster is suspended, delete all pods and skip reconcile
598606
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
599607
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
608+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
609+
Status: metav1.ConditionTrue,
610+
Reason: "FailedDeleteAllPods",
611+
Message: err.Error(),
612+
}
600613
return err
601614
}
602615

@@ -632,6 +645,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
632645
logger.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason)
633646
if shouldDelete {
634647
if err := r.Delete(ctx, &headPod); err != nil {
648+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
649+
Status: metav1.ConditionTrue,
650+
Reason: "FailedDeleteHeadPod",
651+
Message: err.Error(),
652+
}
635653
return err
636654
}
637655
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
@@ -644,6 +662,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
644662
logger.Info("reconcilePods", "Found 0 head Pods; creating a head Pod for the RayCluster.", instance.Name)
645663
common.CreatedClustersCounterInc(instance.Namespace)
646664
if err := r.createHeadPod(ctx, *instance); err != nil {
665+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
666+
Status: metav1.ConditionTrue,
667+
Reason: "FailedCreateHeadPod",
668+
Message: err.Error(),
669+
}
647670
common.FailedClustersCounterInc(instance.Namespace)
648671
return err
649672
}
@@ -663,6 +686,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
663686
// delete all the extra head pod pods
664687
for _, extraHeadPodToDelete := range headPods.Items {
665688
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
689+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
690+
Status: metav1.ConditionTrue,
691+
Reason: "FailedDeleteHeadPod",
692+
Message: err.Error(),
693+
}
666694
return err
667695
}
668696
}
@@ -690,6 +718,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
690718
numDeletedUnhealthyWorkerPods++
691719
deletedWorkers[workerPod.Name] = deleted
692720
if err := r.Delete(ctx, &workerPod); err != nil {
721+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
722+
Status: metav1.ConditionTrue,
723+
Reason: "FailedDeleteWorkerPod",
724+
Message: err.Error(),
725+
}
693726
return err
694727
}
695728
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
@@ -713,6 +746,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
713746
logger.Info("Deleting pod", "namespace", pod.Namespace, "name", pod.Name)
714747
if err := r.Delete(ctx, &pod); err != nil {
715748
if !errors.IsNotFound(err) {
749+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
750+
Status: metav1.ConditionTrue,
751+
Reason: "FailedDeleteWorkerPod",
752+
Message: err.Error(),
753+
}
716754
logger.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err)
717755
return err
718756
}
@@ -749,6 +787,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
749787
for i = 0; i < diff; i++ {
750788
logger.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff))
751789
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil {
790+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
791+
Status: metav1.ConditionTrue,
792+
Reason: "FailedCreateWorkerPod",
793+
Message: err.Error(),
794+
}
752795
return err
753796
}
754797
}
@@ -782,6 +825,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
782825
logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
783826
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
784827
if !errors.IsNotFound(err) {
828+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
829+
Status: metav1.ConditionTrue,
830+
Reason: "FailedDeleteWorkerPod",
831+
Message: err.Error(),
832+
}
785833
return err
786834
}
787835
logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
@@ -796,6 +844,12 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
796844
return nil
797845
}
798846

847+
func defaultRayClusterConditions() rayClusterConditions {
848+
return map[rayv1.RayClusterConditionType]metav1.Condition{
849+
rayv1.RayClusterReplicaFailure: {Status: metav1.ConditionFalse}, // omit the Condition.Type here for simplicity. we will set it later in the updateRayClusterStatus().
850+
}
851+
}
852+
799853
// shouldDeletePod returns whether the Pod should be deleted and the reason
800854
//
801855
// @param pod: The Pod to be checked.
@@ -1301,7 +1355,7 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
13011355
return nil
13021356
}
13031357

1304-
func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Context, instance *rayv1.RayCluster) error {
1358+
func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
13051359
logger := ctrl.LoggerFrom(ctx)
13061360
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
13071361
return nil
@@ -1356,7 +1410,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Con
13561410
return nil
13571411
}
13581412

1359-
func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, instance *rayv1.RayCluster) error {
1413+
func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
13601414
logger := ctrl.LoggerFrom(ctx)
13611415
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
13621416
return nil
@@ -1397,7 +1451,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, inst
13971451
return nil
13981452
}
13991453

1400-
func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Context, instance *rayv1.RayCluster) error {
1454+
func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
14011455
logger := ctrl.LoggerFrom(ctx)
14021456
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
14031457
return nil
@@ -1438,11 +1492,21 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
14381492
return nil
14391493
}
14401494

1441-
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
1495+
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster, conditions rayClusterConditions) error {
14421496
logger := ctrl.LoggerFrom(ctx)
1443-
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
1497+
1498+
inconsistent := false
1499+
if features.Enabled(features.RayClusterStatusConditions) {
1500+
for typ, condition := range conditions {
1501+
condition.Type = string(typ) // make sure the condition.Type is set correctly.
1502+
inconsistent = meta.SetStatusCondition(&newInstance.Status.Conditions, condition) || inconsistent
1503+
}
1504+
}
1505+
inconsistent = r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) || inconsistent
1506+
if !inconsistent {
14441507
return nil
14451508
}
1509+
14461510
logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
14471511
err := r.Status().Update(ctx, newInstance)
14481512
if err != nil {

0 commit comments

Comments
 (0)