diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index d44f60a6f9c..5bf269d4330 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -2,6 +2,7 @@ package ray import ( "context" + errstd "errors" "fmt" "os" "reflect" @@ -10,12 +11,14 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/pkg/features" batchv1 "k8s.io/api/batch/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -391,6 +394,10 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context, oldStatus.Endpoints, newStatus.Endpoints, oldStatus.Head, newStatus.Head)) return true } + if !reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) { + logger.Info("inconsistentRayClusterStatus", "old conditions", oldStatus.Conditions, "new conditions", newStatus.Conditions) + return true + } return false } @@ -597,7 +604,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv // if RayCluster is suspended, delete all pods and skip reconcile if instance.Spec.Suspend != nil && *instance.Spec.Suspend { if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil { - return err + return errstd.Join(utils.ErrFailedDeleteAllPods, err) } r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", @@ -632,7 +639,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv logger.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason) if shouldDelete { if err := r.Delete(ctx, &headPod); err != nil { - return err + return errstd.Join(utils.ErrFailedDeleteHeadPod, err) } r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted head Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v", @@ -645,7 +652,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv common.CreatedClustersCounterInc(instance.Namespace) if err := r.createHeadPod(ctx, *instance); err != nil { common.FailedClustersCounterInc(instance.Namespace) - return err + return errstd.Join(utils.ErrFailedCreateHeadPod, err) } common.SuccessfulClustersCounterInc(instance.Namespace) } else if len(headPods.Items) > 1 { @@ -663,7 +670,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv // delete all the extra head pod pods for _, extraHeadPodToDelete := range headPods.Items { if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil { - return err + return errstd.Join(utils.ErrFailedDeleteHeadPod, err) } } } @@ -690,7 +697,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv numDeletedUnhealthyWorkerPods++ deletedWorkers[workerPod.Name] = deleted if err := r.Delete(ctx, &workerPod); err != nil { - return err + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) } r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted", "Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v", @@ -714,7 +721,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv if err := r.Delete(ctx, &pod); err != nil { if !errors.IsNotFound(err) { logger.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err) - return err + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) } logger.Info("reconcilePods", "The worker Pod has already been deleted", pod.Name) } else { @@ -749,7 +756,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv for i = 0; i < diff; i++ { logger.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff)) if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil { - return err + return errstd.Join(utils.ErrFailedCreateWorkerPod, err) } } } else if diff == 0 { @@ -782,7 +789,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name) if err := r.Delete(ctx, &randomPodToDelete); err != nil { if !errors.IsNotFound(err) { - return err + return errstd.Join(utils.ErrFailedDeleteWorkerPod, err) } logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name) } @@ -1155,6 +1162,22 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra // Deep copy the instance, so we don't mutate the original object. newInstance := instance.DeepCopy() + if features.Enabled(features.RayClusterStatusConditions) { + if reconcileErr != nil { + if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" { + meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{ + Type: string(rayv1.RayClusterReplicaFailure), + Status: metav1.ConditionTrue, + Reason: reason, + Message: reconcileErr.Error(), + }) + } + } else { + // if reconcileErr == nil, we can safely remove the RayClusterReplicaFailure condition. + meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure)) + } + } + // TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not. newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index c06cc7f428f..ccd578be18f 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -25,6 +25,7 @@ import ( "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" "github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme" + "github.com/ray-project/kuberay/ray-operator/pkg/features" . "github.com/onsi/ginkgo/v2" "github.com/stretchr/testify/assert" @@ -33,6 +34,7 @@ import ( corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -1637,6 +1639,11 @@ func TestInconsistentRayClusterStatus(t *testing.T) { newStatus = oldStatus.DeepCopy() newStatus.ObservedGeneration = oldStatus.ObservedGeneration + 1 assert.False(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus)) + + // Case 12: `Conditions` is different => return true + newStatus = oldStatus.DeepCopy() + meta.SetStatusCondition(&newStatus.Conditions, metav1.Condition{Type: string(rayv1.RayClusterReplicaFailure), Status: metav1.ConditionTrue}) + assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus)) } func TestCalculateStatus(t *testing.T) { @@ -1687,6 +1694,17 @@ func TestCalculateStatus(t *testing.T) { assert.Equal(t, headService.Name, newInstance.Status.Head.ServiceName) assert.NotNil(t, newInstance.Status.StateTransitionTimes, "Cluster state transition timestamp should be created") assert.Equal(t, newInstance.Status.LastUpdateTime, newInstance.Status.StateTransitionTimes[rayv1.Ready]) + + // Test reconcilePodsErr with the feature gate disabled + newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod) + assert.Nil(t, err) + assert.Empty(t, newInstance.Status.Conditions) + + // Test reconcilePodsErr with the feature gate enabled + defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)() + newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod) + assert.Nil(t, err) + assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue)) } func TestStateTransitionTimes_NoStateChange(t *testing.T) { diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 4b825dc2988..fe7ab4c9522 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -1,5 +1,7 @@ package utils +import "errors" + const ( // Default application name @@ -194,3 +196,23 @@ const ( func RayOriginatedFromCRDLabelValue(crdType CRDType) string { return string(crdType) } + +// These are markers used by the calculateStatus() for setting the RayClusterReplicaFailure condition. +var ( + errRayClusterReplicaFailure = errors.New("RayClusterReplicaFailure") + ErrFailedDeleteAllPods = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteAllPods")) + ErrFailedDeleteHeadPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteHeadPod")) + ErrFailedCreateHeadPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedCreateHeadPod")) + ErrFailedDeleteWorkerPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteWorkerPod")) + ErrFailedCreateWorkerPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedCreateWorkerPod")) +) + +func RayClusterReplicaFailureReason(err error) string { + if e, ok := err.(interface{ Unwrap() []error }); ok { + errs := e.Unwrap() + if len(errs) >= 2 && errors.Is(errs[0], errRayClusterReplicaFailure) { + return errs[1].Error() + } + } + return "" +} diff --git a/ray-operator/controllers/ray/utils/util_test.go b/ray-operator/controllers/ray/utils/util_test.go index 30075c43abc..1c781cef5b5 100644 --- a/ray-operator/controllers/ray/utils/util_test.go +++ b/ray-operator/controllers/ray/utils/util_test.go @@ -2,6 +2,7 @@ package utils import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -523,3 +524,12 @@ env_vars: }) } } + +func TestErrRayClusterReplicaFailureReason(t *testing.T) { + assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods") + assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod") + assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedCreateHeadPod), "FailedCreateHeadPod") + assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteWorkerPod), "FailedDeleteWorkerPod") + assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedCreateWorkerPod), "FailedCreateWorkerPod") + assert.Equal(t, RayClusterReplicaFailureReason(errors.New("other error")), "") +}