Skip to content

Commit 0c99b42

Browse files
[Refactor][RayCluster] Unify status update to single place (#2249)
1 parent 8bdd7de commit 0c99b42

File tree

2 files changed

+92
-113
lines changed

2 files changed

+92
-113
lines changed

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 83 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"reflect"
8+
"runtime"
89
"strconv"
910
"strings"
1011
"time"
@@ -32,7 +33,7 @@ import (
3233
networkingv1 "k8s.io/api/networking/v1"
3334
"k8s.io/apimachinery/pkg/api/errors"
3435
"k8s.io/apimachinery/pkg/api/resource"
35-
"k8s.io/apimachinery/pkg/runtime"
36+
k8sruntime "k8s.io/apimachinery/pkg/runtime"
3637
ctrl "sigs.k8s.io/controller-runtime"
3738
"sigs.k8s.io/controller-runtime/pkg/builder"
3839
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -43,6 +44,8 @@ import (
4344
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4445
)
4546

47+
type reconcileFunc func(context.Context, *rayv1.RayCluster) error
48+
4649
var (
4750
DefaultRequeueDuration = 2 * time.Second
4851
EnableBatchScheduler bool
@@ -117,7 +120,7 @@ var _ reconcile.Reconciler = &RayClusterReconciler{}
117120
// RayClusterReconciler reconciles a RayCluster object
118121
type RayClusterReconciler struct {
119122
client.Client
120-
Scheme *runtime.Scheme
123+
Scheme *k8sruntime.Scheme
121124
Recorder record.EventRecorder
122125
BatchSchedulerMgr *batchscheduler.SchedulerManager
123126

@@ -194,6 +197,7 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common
194197
}
195198

196199
func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) {
200+
var reconcileErr error
197201
logger := ctrl.LoggerFrom(ctx)
198202

199203
// Please do NOT modify `originalRayClusterInstance` in the following code.
@@ -296,77 +300,45 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
296300
return ctrl.Result{}, nil
297301
}
298302

299-
if err := r.reconcileAutoscalerServiceAccount(ctx, instance); err != nil {
300-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
301-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
302-
}
303-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
303+
reconcileFuncs := []reconcileFunc{
304+
r.reconcileAutoscalerServiceAccount,
305+
r.reconcileAutoscalerRole,
306+
r.reconcileAutoscalerRoleBinding,
307+
r.reconcileIngress,
308+
r.reconcileHeadService,
309+
r.reconcileHeadlessService,
310+
r.reconcileServeService,
311+
r.reconcilePods,
304312
}
305313

306-
if err := r.reconcileAutoscalerRole(ctx, instance); err != nil {
307-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
308-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
309-
}
310-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
311-
}
312-
if err := r.reconcileAutoscalerRoleBinding(ctx, instance); err != nil {
313-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
314-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
315-
}
316-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
317-
}
318-
if err := r.reconcileIngress(ctx, instance); err != nil {
319-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
320-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
321-
}
322-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
323-
}
324-
if err := r.reconcileHeadService(ctx, instance); err != nil {
325-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
326-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
327-
}
328-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
329-
}
330-
if err := r.reconcileHeadlessService(ctx, instance); err != nil {
331-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
332-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
333-
}
334-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
335-
}
336-
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
337-
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; exist && enableServeServiceValue == utils.EnableServeServiceTrue {
338-
if err := r.reconcileServeService(ctx, instance); err != nil {
339-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
340-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
341-
}
342-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
343-
}
344-
}
345-
if err := r.reconcilePods(ctx, instance); err != nil {
346-
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
347-
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
348-
}
349-
if updateErr := r.updateClusterReason(ctx, instance, err.Error()); updateErr != nil {
350-
logger.Error(updateErr, "RayCluster update reason error", "cluster name", request.Name)
314+
for _, fn := range reconcileFuncs {
315+
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
316+
funcName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
317+
logger.Error(reconcileErr, "Error reconcile resources", "function name", funcName)
318+
break
351319
}
352-
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1.PodReconciliationError), err.Error())
353-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
354320
}
355321

356322
// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
357-
newInstance, err := r.calculateStatus(ctx, instance)
358-
if err != nil {
359-
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", err)
360-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
323+
newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr)
324+
var updateErr error
325+
if calculateErr != nil {
326+
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
327+
} else {
328+
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
361329
}
362330

363-
// Check if need to update the status.
364-
if r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
365-
logger.Info("rayClusterReconcile", "Update CR status", request.Name, "status", newInstance.Status)
366-
if err := r.Status().Update(ctx, newInstance); err != nil {
367-
logger.Info("Got error when updating status", "cluster name", request.Name, "error", err, "RayCluster", newInstance)
368-
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
369-
}
331+
// Return error based on order.
332+
var err error
333+
if reconcileErr != nil {
334+
err = reconcileErr
335+
} else if calculateErr != nil {
336+
err = calculateErr
337+
} else {
338+
err = updateErr
339+
}
340+
if err != nil {
341+
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
370342
}
371343

372344
// Unconditionally requeue after the number of seconds specified in the
@@ -555,6 +527,11 @@ func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instanc
555527

556528
// Return nil only when the serve service successfully created or already exists.
557529
func (r *RayClusterReconciler) reconcileServeService(ctx context.Context, instance *rayv1.RayCluster) error {
530+
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
531+
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; !exist || enableServeServiceValue != utils.EnableServeServiceTrue {
532+
return nil
533+
}
534+
558535
// Retrieve the Service from the Kubernetes cluster with the name and namespace.
559536
svc := &corev1.Service{}
560537
err := r.Get(ctx, common.RayClusterServeServiceNamespacedName(instance), svc)
@@ -1168,45 +1145,50 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
11681145
Complete(r)
11691146
}
11701147

1171-
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster) (*rayv1.RayCluster, error) {
1148+
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster, reconcileErr error) (*rayv1.RayCluster, error) {
11721149
// Deep copy the instance, so we don't mutate the original object.
11731150
newInstance := instance.DeepCopy()
11741151

1175-
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
1176-
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation
1152+
if reconcileErr != nil {
1153+
newInstance.Status.State = rayv1.Failed
1154+
newInstance.Status.Reason = reconcileErr.Error()
1155+
} else {
1156+
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
1157+
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation
11771158

1178-
runtimePods := corev1.PodList{}
1179-
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: newInstance.Name}
1180-
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil {
1181-
return nil, err
1182-
}
1159+
runtimePods := corev1.PodList{}
1160+
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: newInstance.Name}
1161+
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil {
1162+
return nil, err
1163+
}
11831164

1184-
newInstance.Status.ReadyWorkerReplicas = utils.CalculateReadyReplicas(runtimePods)
1185-
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
1186-
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(ctx, newInstance)
1187-
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
1188-
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance)
1165+
newInstance.Status.ReadyWorkerReplicas = utils.CalculateReadyReplicas(runtimePods)
1166+
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
1167+
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(ctx, newInstance)
1168+
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
1169+
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance)
11891170

1190-
totalResources := utils.CalculateDesiredResources(newInstance)
1191-
newInstance.Status.DesiredCPU = totalResources[corev1.ResourceCPU]
1192-
newInstance.Status.DesiredMemory = totalResources[corev1.ResourceMemory]
1193-
newInstance.Status.DesiredGPU = sumGPUs(totalResources)
1194-
newInstance.Status.DesiredTPU = totalResources[corev1.ResourceName("google.com/tpu")]
1171+
totalResources := utils.CalculateDesiredResources(newInstance)
1172+
newInstance.Status.DesiredCPU = totalResources[corev1.ResourceCPU]
1173+
newInstance.Status.DesiredMemory = totalResources[corev1.ResourceMemory]
1174+
newInstance.Status.DesiredGPU = sumGPUs(totalResources)
1175+
newInstance.Status.DesiredTPU = totalResources[corev1.ResourceName("google.com/tpu")]
11951176

1196-
if utils.CheckAllPodsRunning(ctx, runtimePods) {
1197-
newInstance.Status.State = rayv1.Ready
1198-
}
1177+
if utils.CheckAllPodsRunning(ctx, runtimePods) {
1178+
newInstance.Status.State = rayv1.Ready
1179+
}
11991180

1200-
if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
1201-
newInstance.Status.State = rayv1.Suspended
1202-
}
1181+
if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
1182+
newInstance.Status.State = rayv1.Suspended
1183+
}
12031184

1204-
if err := r.updateEndpoints(ctx, newInstance); err != nil {
1205-
return nil, err
1206-
}
1185+
if err := r.updateEndpoints(ctx, newInstance); err != nil {
1186+
return nil, err
1187+
}
12071188

1208-
if err := r.updateHeadInfo(ctx, newInstance); err != nil {
1209-
return nil, err
1189+
if err := r.updateHeadInfo(ctx, newInstance); err != nil {
1190+
return nil, err
1191+
}
12101192
}
12111193

12121194
timeNow := metav1.Now()
@@ -1456,24 +1438,17 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
14561438
return nil
14571439
}
14581440

1459-
func (r *RayClusterReconciler) updateClusterState(ctx context.Context, instance *rayv1.RayCluster, clusterState rayv1.ClusterState) error {
1441+
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
14601442
logger := ctrl.LoggerFrom(ctx)
1461-
if instance.Status.State == clusterState {
1443+
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
14621444
return nil
14631445
}
1464-
instance.Status.State = clusterState
1465-
logger.Info("updateClusterState", "Update CR Status.State", clusterState)
1466-
return r.Status().Update(ctx, instance)
1467-
}
1468-
1469-
func (r *RayClusterReconciler) updateClusterReason(ctx context.Context, instance *rayv1.RayCluster, clusterReason string) error {
1470-
logger := ctrl.LoggerFrom(ctx)
1471-
if instance.Status.Reason == clusterReason {
1472-
return nil
1446+
logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
1447+
err := r.Status().Update(ctx, newInstance)
1448+
if err != nil {
1449+
logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance)
14731450
}
1474-
instance.Status.Reason = clusterReason
1475-
logger.Info("updateClusterReason", "Update CR Status.Reason", clusterReason)
1476-
return r.Status().Update(ctx, instance)
1451+
return err
14771452
}
14781453

14791454
// sumGPUs sums the GPUs in the given resource list.

ray-operator/controllers/ray/raycluster_controller_unit_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,7 +1257,9 @@ func TestReconcile_UpdateClusterReason(t *testing.T) {
12571257
}
12581258
reason := "test reason"
12591259

1260-
err = testRayClusterReconciler.updateClusterReason(ctx, testRayCluster, reason)
1260+
newTestRayCluster := testRayCluster.DeepCopy()
1261+
newTestRayCluster.Status.Reason = reason
1262+
err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster)
12611263
assert.Nil(t, err, "Fail to update cluster reason")
12621264

12631265
err = fakeClient.Get(ctx, namespacedName, &cluster)
@@ -1496,7 +1498,7 @@ func TestUpdateStatusObservedGeneration(t *testing.T) {
14961498
}
14971499

14981500
// Compare the values of `Generation` and `ObservedGeneration` to check if they match.
1499-
newInstance, err := testRayClusterReconciler.calculateStatus(ctx, testRayCluster)
1501+
newInstance, err := testRayClusterReconciler.calculateStatus(ctx, testRayCluster, nil)
15001502
assert.Nil(t, err)
15011503
err = fakeClient.Get(ctx, namespacedName, &cluster)
15021504
assert.Nil(t, err)
@@ -1532,7 +1534,9 @@ func TestReconcile_UpdateClusterState(t *testing.T) {
15321534
}
15331535

15341536
state := rayv1.Ready
1535-
err = testRayClusterReconciler.updateClusterState(ctx, testRayCluster, state)
1537+
newTestRayCluster := testRayCluster.DeepCopy()
1538+
newTestRayCluster.Status.State = state
1539+
err = testRayClusterReconciler.updateRayClusterStatus(ctx, testRayCluster, newTestRayCluster)
15361540
assert.Nil(t, err, "Fail to update cluster state")
15371541

15381542
err = fakeClient.Get(ctx, namespacedName, &cluster)
@@ -1676,7 +1680,7 @@ func TestCalculateStatus(t *testing.T) {
16761680
}
16771681

16781682
// Test head information
1679-
newInstance, err := r.calculateStatus(ctx, testRayCluster)
1683+
newInstance, err := r.calculateStatus(ctx, testRayCluster, nil)
16801684
assert.Nil(t, err)
16811685
assert.Equal(t, headNodeIP, newInstance.Status.Head.PodIP)
16821686
assert.Equal(t, headServiceIP, newInstance.Status.Head.ServiceIP)
@@ -1729,7 +1733,7 @@ func TestStateTransitionTimes_NoStateChange(t *testing.T) {
17291733
preUpdateTime := metav1.Now()
17301734
testRayCluster.Status.State = rayv1.Ready
17311735
testRayCluster.Status.StateTransitionTimes = map[rayv1.ClusterState]*metav1.Time{rayv1.Ready: &preUpdateTime}
1732-
newInstance, err := r.calculateStatus(ctx, testRayCluster)
1736+
newInstance, err := r.calculateStatus(ctx, testRayCluster, nil)
17331737
assert.Nil(t, err)
17341738
assert.Equal(t, preUpdateTime, *newInstance.Status.StateTransitionTimes[rayv1.Ready], "Cluster state transition timestamp should not be updated")
17351739
}

0 commit comments

Comments
 (0)