Skip to content

Commit 67d300a

Browse files
committed
[Feat][RayCluster] Set RayClusterReplicaFailure condition to reflect the result of creating/deleting Pods
Signed-off-by: Rueian <[email protected]>
1 parent abafd17 commit 67d300a

File tree

2 files changed

+121
-47
lines changed

2 files changed

+121
-47
lines changed

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@ import (
1010
"strings"
1111
"time"
1212

13+
"k8s.io/apimachinery/pkg/api/meta"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/utils/ptr"
1516

1617
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
1718
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
1819
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
20+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
1921

2022
batchv1 "k8s.io/api/batch/v1"
2123
rbacv1 "k8s.io/api/rbac/v1"
@@ -44,7 +46,8 @@ import (
4446
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4547
)
4648

47-
type reconcileFunc func(context.Context, *rayv1.RayCluster) error
49+
type rayClusterConditions map[rayv1.RayClusterConditionType]metav1.Condition
50+
type reconcileFunc func(context.Context, *rayv1.RayCluster, rayClusterConditions) error
4851

4952
var (
5053
DefaultRequeueDuration = 2 * time.Second
@@ -300,6 +303,9 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
300303
return ctrl.Result{}, nil
301304
}
302305

306+
// conditions should be mutated by the following reconcileXXX functions.
307+
conditions := defaultRayClusterConditions()
308+
303309
reconcileFuncs := []reconcileFunc{
304310
r.reconcileAutoscalerServiceAccount,
305311
r.reconcileAutoscalerRole,
@@ -312,7 +318,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
312318
}
313319

314320
for _, fn := range reconcileFuncs {
315-
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
321+
if reconcileErr = fn(ctx, instance, conditions); reconcileErr != nil {
316322
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
317323
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
318324
break
@@ -325,7 +331,7 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
325331
if calculateErr != nil {
326332
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
327333
} else {
328-
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
334+
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance, conditions)
329335
}
330336

331337
// Return error based on order.
@@ -394,7 +400,7 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
394400
return false
395401
}
396402

397-
func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster) error {
403+
func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
398404
logger := ctrl.LoggerFrom(ctx)
399405
logger.Info("Reconciling Ingress")
400406
if instance.Spec.HeadGroupSpec.EnableIngress == nil || !*instance.Spec.HeadGroupSpec.EnableIngress {
@@ -474,7 +480,7 @@ func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, i
474480
}
475481

476482
// Return nil only when the head service successfully created or already exists.
477-
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster) error {
483+
func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
478484
logger := ctrl.LoggerFrom(ctx)
479485
services := corev1.ServiceList{}
480486
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
@@ -526,7 +532,7 @@ func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instanc
526532
}
527533

528534
// Return nil only when the serve service successfully created or already exists.
529-
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster) error {
535+
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
530536
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
531537
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; !exist || enableServeServiceValue != utils.EnableServeServiceTrue {
532538
return nil
@@ -555,7 +561,7 @@ func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instan
555561
}
556562

557563
// Return nil only when the headless service for multi-host worker groups is successfully created or already exists.
558-
func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, instance *rayv1.RayCluster) error {
564+
func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
559565
// Check if there are worker groups with NumOfHosts > 1 in the cluster
560566
isMultiHost := false
561567
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
@@ -591,12 +597,17 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
591597
return nil
592598
}
593599

594-
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
600+
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster, conditions rayClusterConditions) error {
595601
logger := ctrl.LoggerFrom(ctx)
596602

597603
// if RayCluster is suspended, delete all pods and skip reconcile
598604
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
599605
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
606+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
607+
Status: metav1.ConditionTrue,
608+
Reason: "FailedDeleteAllPods",
609+
Message: err.Error(),
610+
}
600611
return err
601612
}
602613

@@ -632,6 +643,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
632643
logger.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason)
633644
if shouldDelete {
634645
if err := r.Delete(ctx, &headPod); err != nil {
646+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
647+
Status: metav1.ConditionTrue,
648+
Reason: "FailedDeleteHeadPod",
649+
Message: err.Error(),
650+
}
635651
return err
636652
}
637653
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
@@ -644,6 +660,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
644660
logger.Info("reconcilePods", "Found 0 head Pods; creating a head Pod for the RayCluster.", instance.Name)
645661
common.CreatedClustersCounterInc(instance.Namespace)
646662
if err := r.createHeadPod(ctx, *instance); err != nil {
663+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
664+
Status: metav1.ConditionTrue,
665+
Reason: "FailedCreateHeadPod",
666+
Message: err.Error(),
667+
}
647668
common.FailedClustersCounterInc(instance.Namespace)
648669
return err
649670
}
@@ -663,6 +684,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
663684
// delete all the extra head pod pods
664685
for _, extraHeadPodToDelete := range headPods.Items {
665686
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
687+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
688+
Status: metav1.ConditionTrue,
689+
Reason: "FailedDeleteHeadPod",
690+
Message: err.Error(),
691+
}
666692
return err
667693
}
668694
}
@@ -690,6 +716,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
690716
numDeletedUnhealthyWorkerPods++
691717
deletedWorkers[workerPod.Name] = deleted
692718
if err := r.Delete(ctx, &workerPod); err != nil {
719+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
720+
Status: metav1.ConditionTrue,
721+
Reason: "FailedDeleteWorkerPod",
722+
Message: err.Error(),
723+
}
693724
return err
694725
}
695726
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
@@ -713,6 +744,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
713744
logger.Info("Deleting pod", "namespace", pod.Namespace, "name", pod.Name)
714745
if err := r.Delete(ctx, &pod); err != nil {
715746
if !errors.IsNotFound(err) {
747+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
748+
Status: metav1.ConditionTrue,
749+
Reason: "FailedDeleteWorkerPod",
750+
Message: err.Error(),
751+
}
716752
logger.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err)
717753
return err
718754
}
@@ -749,6 +785,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
749785
for i = 0; i < diff; i++ {
750786
logger.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff))
751787
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil {
788+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
789+
Status: metav1.ConditionTrue,
790+
Reason: "FailedCreateWorkerPod",
791+
Message: err.Error(),
792+
}
752793
return err
753794
}
754795
}
@@ -782,6 +823,11 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
782823
logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
783824
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
784825
if !errors.IsNotFound(err) {
826+
conditions[rayv1.RayClusterReplicaFailure] = metav1.Condition{
827+
Status: metav1.ConditionTrue,
828+
Reason: "FailedDeleteWorkerPod",
829+
Message: err.Error(),
830+
}
785831
return err
786832
}
787833
logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
@@ -796,6 +842,12 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
796842
return nil
797843
}
798844

845+
func defaultRayClusterConditions() rayClusterConditions {
846+
return map[rayv1.RayClusterConditionType]metav1.Condition{
847+
rayv1.RayClusterReplicaFailure: {Status: metav1.ConditionFalse}, // omit the Condition.Type here for simplicity. we will set it later in the updateRayClusterStatus().
848+
}
849+
}
850+
799851
// shouldDeletePod returns whether the Pod should be deleted and the reason
800852
//
801853
// @param pod: The Pod to be checked.
@@ -1301,7 +1353,7 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
13011353
return nil
13021354
}
13031355

1304-
func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Context, instance *rayv1.RayCluster) error {
1356+
func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
13051357
logger := ctrl.LoggerFrom(ctx)
13061358
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
13071359
return nil
@@ -1356,7 +1408,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerServiceAccount(ctx context.Con
13561408
return nil
13571409
}
13581410

1359-
func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, instance *rayv1.RayCluster) error {
1411+
func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
13601412
logger := ctrl.LoggerFrom(ctx)
13611413
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
13621414
return nil
@@ -1397,7 +1449,7 @@ func (r *RayClusterReconciler) reconcileAutoscalerRole(ctx context.Context, inst
13971449
return nil
13981450
}
13991451

1400-
func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Context, instance *rayv1.RayCluster) error {
1452+
func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Context, instance *rayv1.RayCluster, _ rayClusterConditions) error {
14011453
logger := ctrl.LoggerFrom(ctx)
14021454
if instance.Spec.EnableInTreeAutoscaling == nil || !*instance.Spec.EnableInTreeAutoscaling {
14031455
return nil
@@ -1438,11 +1490,21 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
14381490
return nil
14391491
}
14401492

1441-
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
1493+
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster, conditions rayClusterConditions) error {
14421494
logger := ctrl.LoggerFrom(ctx)
1443-
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
1495+
1496+
inconsistent := false
1497+
if features.Enabled(features.RayClusterStatusConditions) {
1498+
for typ, condition := range conditions {
1499+
condition.Type = string(typ) // make sure the condition.Type is set correctly.
1500+
inconsistent = meta.SetStatusCondition(&newInstance.Status.Conditions, condition) || inconsistent
1501+
}
1502+
}
1503+
inconsistent = r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) || inconsistent
1504+
if !inconsistent {
14441505
return nil
14451506
}
1507+
14461508
logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
14471509
err := r.Status().Update(ctx, newInstance)
14481510
if err != nil {

0 commit comments

Comments
 (0)