Skip to content
Merged
39 changes: 31 additions & 8 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ray

import (
"context"
errstd "errors"
"fmt"
"os"
"reflect"
Expand All @@ -10,12 +11,14 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

batchv1 "k8s.io/api/batch/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -391,6 +394,10 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
oldStatus.Endpoints, newStatus.Endpoints, oldStatus.Head, newStatus.Head))
return true
}
if !reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) {
logger.Info("inconsistentRayClusterStatus", "old conditions", oldStatus.Conditions, "new conditions", newStatus.Conditions)
return true
}
return false
}

Expand Down Expand Up @@ -597,7 +604,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
return err
return errstd.Join(utils.ErrFailedDeleteAllPods, err)
}

r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
Expand Down Expand Up @@ -632,7 +639,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason)
if shouldDelete {
if err := r.Delete(ctx, &headPod); err != nil {
return err
return errstd.Join(utils.ErrFailedDeleteHeadPod, err)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted head Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
Expand All @@ -645,7 +652,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
common.CreatedClustersCounterInc(instance.Namespace)
if err := r.createHeadPod(ctx, *instance); err != nil {
common.FailedClustersCounterInc(instance.Namespace)
return err
return errstd.Join(utils.ErrFailedCreateHeadPod, err)
}
common.SuccessfulClustersCounterInc(instance.Namespace)
} else if len(headPods.Items) > 1 {
Expand All @@ -663,7 +670,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// delete all the extra head pod pods
for _, extraHeadPodToDelete := range headPods.Items {
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
return err
return errstd.Join(utils.ErrFailedDeleteHeadPod, err)
}
}
}
Expand All @@ -690,7 +697,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
numDeletedUnhealthyWorkerPods++
deletedWorkers[workerPod.Name] = deleted
if err := r.Delete(ctx, &workerPod); err != nil {
return err
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
"Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
Expand All @@ -714,7 +721,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
if err := r.Delete(ctx, &pod); err != nil {
if !errors.IsNotFound(err) {
logger.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err)
return err
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
}
logger.Info("reconcilePods", "The worker Pod has already been deleted", pod.Name)
} else {
Expand Down Expand Up @@ -749,7 +756,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
for i = 0; i < diff; i++ {
logger.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff))
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil {
return err
return errstd.Join(utils.ErrFailedCreateWorkerPod, err)
}
}
} else if diff == 0 {
Expand Down Expand Up @@ -782,7 +789,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
if !errors.IsNotFound(err) {
return err
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
}
logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
}
Expand Down Expand Up @@ -1155,6 +1162,22 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

if features.Enabled(features.RayClusterStatusConditions) {
if reconcileErr != nil {
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterReplicaFailure),
Status: metav1.ConditionTrue,
Reason: reason,
Message: reconcileErr.Error(),
})
}
} else {
// if reconcileErr == nil, we can safely remove the RayClusterReplicaFailure condition.
meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure))
}
}

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation

Expand Down
18 changes: 18 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

. "github.com/onsi/ginkgo/v2"
"github.com/stretchr/testify/assert"
Expand All @@ -33,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -1637,6 +1639,11 @@ func TestInconsistentRayClusterStatus(t *testing.T) {
newStatus = oldStatus.DeepCopy()
newStatus.ObservedGeneration = oldStatus.ObservedGeneration + 1
assert.False(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))

// Case 12: `Conditions` is different => return true
newStatus = oldStatus.DeepCopy()
meta.SetStatusCondition(&newStatus.Conditions, metav1.Condition{Type: string(rayv1.RayClusterReplicaFailure), Status: metav1.ConditionTrue})
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))
}

func TestCalculateStatus(t *testing.T) {
Expand Down Expand Up @@ -1687,6 +1694,17 @@ func TestCalculateStatus(t *testing.T) {
assert.Equal(t, headService.Name, newInstance.Status.Head.ServiceName)
assert.NotNil(t, newInstance.Status.StateTransitionTimes, "Cluster state transition timestamp should be created")
assert.Equal(t, newInstance.Status.LastUpdateTime, newInstance.Status.StateTransitionTimes[rayv1.Ready])

// Test reconcilePodsErr with the feature gate disabled
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
assert.Nil(t, err)
assert.Empty(t, newInstance.Status.Conditions)

// Test reconcilePodsErr with the feature gate enabled
defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)()
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
assert.Nil(t, err)
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))
}

func TestStateTransitionTimes_NoStateChange(t *testing.T) {
Expand Down
22 changes: 22 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package utils

import "errors"

const (

// Default application name
Expand Down Expand Up @@ -194,3 +196,23 @@ const (
func RayOriginatedFromCRDLabelValue(crdType CRDType) string {
return string(crdType)
}

// These are markers used by the calculateStatus() for setting the RayClusterReplicaFailure condition.
var (
errRayClusterReplicaFailure = errors.New("RayClusterReplicaFailure")
ErrFailedDeleteAllPods = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteAllPods"))
ErrFailedDeleteHeadPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteHeadPod"))
ErrFailedCreateHeadPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedCreateHeadPod"))
ErrFailedDeleteWorkerPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteWorkerPod"))
ErrFailedCreateWorkerPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedCreateWorkerPod"))
)

func RayClusterReplicaFailureReason(err error) string {
if e, ok := err.(interface{ Unwrap() []error }); ok {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err.(interface{ Unwrap() []error }).Unwrap() is the only special trick to unwrap a joined error.

https://cs.opensource.google/go/go/+/refs/tags/go1.22.5:src/errors/join.go;l=60-62

errs := e.Unwrap()
if len(errs) >= 2 && errors.Is(errs[0], errRayClusterReplicaFailure) {
return errs[1].Error()
}
}
return ""
}
10 changes: 10 additions & 0 deletions ray-operator/controllers/ray/utils/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -523,3 +524,12 @@ env_vars:
})
}
}

func TestErrRayClusterReplicaFailureReason(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test the case that the error is not a RayClusterReplicaFailure?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I will add a test that the RayClusterReplicaFailureReason() should return an empty string for a random error.

assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedCreateHeadPod), "FailedCreateHeadPod")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteWorkerPod), "FailedDeleteWorkerPod")
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedCreateWorkerPod), "FailedCreateWorkerPod")
assert.Equal(t, RayClusterReplicaFailureReason(errors.New("other error")), "")
}