Skip to content

Commit ca39dc9

Browse files
authored
[Feat][RayCluster] Use a new RayClusterReplicaFailure condition to reflect the result of reconcilePods (#2259)
1 parent ee0a895 commit ca39dc9

File tree

4 files changed

+81
-8
lines changed

4 files changed

+81
-8
lines changed

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ray
22

33
import (
44
"context"
5+
errstd "errors"
56
"fmt"
67
"os"
78
"reflect"
@@ -10,12 +11,14 @@ import (
1011
"strings"
1112
"time"
1213

14+
"k8s.io/apimachinery/pkg/api/meta"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1416
"k8s.io/utils/ptr"
1517

1618
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
1719
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
1820
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
21+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
1922

2023
batchv1 "k8s.io/api/batch/v1"
2124
rbacv1 "k8s.io/api/rbac/v1"
@@ -391,6 +394,10 @@ func (r *RayClusterReconciler) inconsistentRayClusterStatus(ctx context.Context,
391394
oldStatus.Endpoints, newStatus.Endpoints, oldStatus.Head, newStatus.Head))
392395
return true
393396
}
397+
if !reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) {
398+
logger.Info("inconsistentRayClusterStatus", "old conditions", oldStatus.Conditions, "new conditions", newStatus.Conditions)
399+
return true
400+
}
394401
return false
395402
}
396403

@@ -597,7 +604,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
597604
// if RayCluster is suspended, delete all pods and skip reconcile
598605
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
599606
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
600-
return err
607+
return errstd.Join(utils.ErrFailedDeleteAllPods, err)
601608
}
602609

603610
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
@@ -632,7 +639,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
632639
logger.Info("reconcilePods", "head Pod", headPod.Name, "shouldDelete", shouldDelete, "reason", reason)
633640
if shouldDelete {
634641
if err := r.Delete(ctx, &headPod); err != nil {
635-
return err
642+
return errstd.Join(utils.ErrFailedDeleteHeadPod, err)
636643
}
637644
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
638645
"Deleted head Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
@@ -645,7 +652,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
645652
common.CreatedClustersCounterInc(instance.Namespace)
646653
if err := r.createHeadPod(ctx, *instance); err != nil {
647654
common.FailedClustersCounterInc(instance.Namespace)
648-
return err
655+
return errstd.Join(utils.ErrFailedCreateHeadPod, err)
649656
}
650657
common.SuccessfulClustersCounterInc(instance.Namespace)
651658
} else if len(headPods.Items) > 1 {
@@ -663,7 +670,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
663670
// delete all the extra head pod pods
664671
for _, extraHeadPodToDelete := range headPods.Items {
665672
if err := r.Delete(ctx, &extraHeadPodToDelete); err != nil {
666-
return err
673+
return errstd.Join(utils.ErrFailedDeleteHeadPod, err)
667674
}
668675
}
669676
}
@@ -690,7 +697,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
690697
numDeletedUnhealthyWorkerPods++
691698
deletedWorkers[workerPod.Name] = deleted
692699
if err := r.Delete(ctx, &workerPod); err != nil {
693-
return err
700+
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
694701
}
695702
r.Recorder.Eventf(instance, corev1.EventTypeNormal, "Deleted",
696703
"Deleted worker Pod %s; Pod status: %s; Pod restart policy: %s; Ray container terminated status: %v",
@@ -714,7 +721,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
714721
if err := r.Delete(ctx, &pod); err != nil {
715722
if !errors.IsNotFound(err) {
716723
logger.Info("reconcilePods", "Fail to delete Pod", pod.Name, "error", err)
717-
return err
724+
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
718725
}
719726
logger.Info("reconcilePods", "The worker Pod has already been deleted", pod.Name)
720727
} else {
@@ -749,7 +756,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
749756
for i = 0; i < diff; i++ {
750757
logger.Info("reconcilePods", "creating worker for group", worker.GroupName, fmt.Sprintf("index %d", i), fmt.Sprintf("in total %d", diff))
751758
if err := r.createWorkerPod(ctx, *instance, *worker.DeepCopy()); err != nil {
752-
return err
759+
return errstd.Join(utils.ErrFailedCreateWorkerPod, err)
753760
}
754761
}
755762
} else if diff == 0 {
@@ -782,7 +789,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
782789
logger.Info("Randomly deleting Pod", "progress", fmt.Sprintf("%d / %d", i+1, randomlyRemovedWorkers), "with name", randomPodToDelete.Name)
783790
if err := r.Delete(ctx, &randomPodToDelete); err != nil {
784791
if !errors.IsNotFound(err) {
785-
return err
792+
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
786793
}
787794
logger.Info("reconcilePods", "The worker Pod has already been deleted", randomPodToDelete.Name)
788795
}
@@ -1155,6 +1162,22 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
11551162
// Deep copy the instance, so we don't mutate the original object.
11561163
newInstance := instance.DeepCopy()
11571164

1165+
if features.Enabled(features.RayClusterStatusConditions) {
1166+
if reconcileErr != nil {
1167+
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
1168+
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
1169+
Type: string(rayv1.RayClusterReplicaFailure),
1170+
Status: metav1.ConditionTrue,
1171+
Reason: reason,
1172+
Message: reconcileErr.Error(),
1173+
})
1174+
}
1175+
} else {
1176+
// if reconcileErr == nil, we can safely remove the RayClusterReplicaFailure condition.
1177+
meta.RemoveStatusCondition(&newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure))
1178+
}
1179+
}
1180+
11581181
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
11591182
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation
11601183

ray-operator/controllers/ray/raycluster_controller_unit_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
2626
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2727
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
28+
"github.com/ray-project/kuberay/ray-operator/pkg/features"
2829

2930
. "github.com/onsi/ginkgo/v2"
3031
"github.com/stretchr/testify/assert"
@@ -33,6 +34,7 @@ import (
3334
corev1 "k8s.io/api/core/v1"
3435
rbacv1 "k8s.io/api/rbac/v1"
3536
k8serrors "k8s.io/apimachinery/pkg/api/errors"
37+
"k8s.io/apimachinery/pkg/api/meta"
3638
"k8s.io/apimachinery/pkg/api/resource"
3739
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3840
"k8s.io/apimachinery/pkg/labels"
@@ -1637,6 +1639,11 @@ func TestInconsistentRayClusterStatus(t *testing.T) {
16371639
newStatus = oldStatus.DeepCopy()
16381640
newStatus.ObservedGeneration = oldStatus.ObservedGeneration + 1
16391641
assert.False(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))
1642+
1643+
// Case 12: `Conditions` is different => return true
1644+
newStatus = oldStatus.DeepCopy()
1645+
meta.SetStatusCondition(&newStatus.Conditions, metav1.Condition{Type: string(rayv1.RayClusterReplicaFailure), Status: metav1.ConditionTrue})
1646+
assert.True(t, r.inconsistentRayClusterStatus(ctx, oldStatus, *newStatus))
16401647
}
16411648

16421649
func TestCalculateStatus(t *testing.T) {
@@ -1687,6 +1694,17 @@ func TestCalculateStatus(t *testing.T) {
16871694
assert.Equal(t, headService.Name, newInstance.Status.Head.ServiceName)
16881695
assert.NotNil(t, newInstance.Status.StateTransitionTimes, "Cluster state transition timestamp should be created")
16891696
assert.Equal(t, newInstance.Status.LastUpdateTime, newInstance.Status.StateTransitionTimes[rayv1.Ready])
1697+
1698+
// Test reconcilePodsErr with the feature gate disabled
1699+
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
1700+
assert.Nil(t, err)
1701+
assert.Empty(t, newInstance.Status.Conditions)
1702+
1703+
// Test reconcilePodsErr with the feature gate enabled
1704+
defer features.SetFeatureGateDuringTest(t, features.RayClusterStatusConditions, true)()
1705+
newInstance, err = r.calculateStatus(ctx, testRayCluster, utils.ErrFailedCreateHeadPod)
1706+
assert.Nil(t, err)
1707+
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))
16901708
}
16911709

16921710
func TestStateTransitionTimes_NoStateChange(t *testing.T) {

ray-operator/controllers/ray/utils/constant.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package utils
22

3+
import "errors"
4+
35
const (
46

57
// Default application name
@@ -194,3 +196,23 @@ const (
194196
func RayOriginatedFromCRDLabelValue(crdType CRDType) string {
195197
return string(crdType)
196198
}
199+
200+
// These are markers used by the calculateStatus() for setting the RayClusterReplicaFailure condition.
201+
var (
202+
errRayClusterReplicaFailure = errors.New("RayClusterReplicaFailure")
203+
ErrFailedDeleteAllPods = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteAllPods"))
204+
ErrFailedDeleteHeadPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteHeadPod"))
205+
ErrFailedCreateHeadPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedCreateHeadPod"))
206+
ErrFailedDeleteWorkerPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedDeleteWorkerPod"))
207+
ErrFailedCreateWorkerPod = errors.Join(errRayClusterReplicaFailure, errors.New("FailedCreateWorkerPod"))
208+
)
209+
210+
func RayClusterReplicaFailureReason(err error) string {
211+
if e, ok := err.(interface{ Unwrap() []error }); ok {
212+
errs := e.Unwrap()
213+
if len(errs) >= 2 && errors.Is(errs[0], errRayClusterReplicaFailure) {
214+
return errs[1].Error()
215+
}
216+
}
217+
return ""
218+
}

ray-operator/controllers/ray/utils/util_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package utils
22

33
import (
44
"context"
5+
"errors"
56
"testing"
67

78
"github.com/stretchr/testify/assert"
@@ -523,3 +524,12 @@ env_vars:
523524
})
524525
}
525526
}
527+
528+
func TestErrRayClusterReplicaFailureReason(t *testing.T) {
529+
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteAllPods), "FailedDeleteAllPods")
530+
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteHeadPod), "FailedDeleteHeadPod")
531+
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedCreateHeadPod), "FailedCreateHeadPod")
532+
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedDeleteWorkerPod), "FailedDeleteWorkerPod")
533+
assert.Equal(t, RayClusterReplicaFailureReason(ErrFailedCreateWorkerPod), "FailedCreateWorkerPod")
534+
assert.Equal(t, RayClusterReplicaFailureReason(errors.New("other error")), "")
535+
}

0 commit comments

Comments
 (0)