diff --git a/docs/reference/api.md b/docs/reference/api.md
index 6d3b803d524..bd22a2c7e91 100644
--- a/docs/reference/api.md
+++ b/docs/reference/api.md
@@ -248,6 +248,7 @@ _Appears in:_
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.
In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the
entrypoint command. | | |
+| `schedule` _string_ | Schedule specifies a cron like string for scheduling Ray jobs.
When shutdownAfterJobFinishes is set to true, a new cluster is provisioned
per scheduled job, otherwise the job is scheduled on an existing cluster. | | |
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.
It's only working when ShutdownAfterJobFinishes set to true. | 0 | |
diff --git a/go.mod b/go.mod
index 472e6d593df..f626a9cad9f 100644
--- a/go.mod
+++ b/go.mod
@@ -87,6 +87,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
+ github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
diff --git a/go.sum b/go.sum
index dddab9f7e86..655b400f4dc 100644
--- a/go.sum
+++ b/go.sum
@@ -184,6 +184,8 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
index 8f8679ca607..1167780856e 100644
--- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
+++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
@@ -8091,6 +8091,8 @@ spec:
type: object
runtimeEnvYAML:
type: string
+ schedule:
+ type: string
shutdownAfterJobFinishes:
type: boolean
submissionMode:
@@ -11794,6 +11796,9 @@ spec:
type: string
jobStatus:
type: string
+ lastScheduleTime:
+ format: date-time
+ type: string
message:
type: string
observedGeneration:
diff --git a/helm-chart/ray-cluster/README.md b/helm-chart/ray-cluster/README.md
index 57aef2f5f33..678a40ad765 100644
--- a/helm-chart/ray-cluster/README.md
+++ b/helm-chart/ray-cluster/README.md
@@ -78,6 +78,7 @@ helm uninstall raycluster
| nameOverride | string | `"kuberay"` | String to partially override release name. |
| fullnameOverride | string | `""` | String to fully override release name. |
| imagePullSecrets | list | `[]` | Secrets with credentials to pull images from a private registry |
+| gcsFaultTolerance.enabled | bool | `false` | |
| common.containerEnv | list | `[]` | containerEnv specifies environment variables for the Ray head and worker containers. Follows standard K8s container env schema. |
| head.initContainers | list | `[]` | Init containers to add to the head pod |
| head.labels | object | `{}` | Labels for the head pod |
diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go
index 1ada1e1ae7d..bb600f61dfe 100644
--- a/ray-operator/apis/ray/v1/rayjob_types.go
+++ b/ray-operator/apis/ray/v1/rayjob_types.go
@@ -54,6 +54,8 @@ const (
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
+ JobDeploymentStatusScheduling JobDeploymentStatus = "Scheduling"
+ JobDeploymentStatusScheduled JobDeploymentStatus = "Scheduled"
)
// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
@@ -181,6 +183,11 @@ type RayJobSpec struct {
// entrypoint command.
// +optional
EntrypointResources string `json:"entrypointResources,omitempty"`
+ // Schedule specifies a cron like string for scheduling Ray jobs.
+ // When shutdownAfterJobFinishes is set to true, a new cluster is provisioned
+ // per scheduled job, otherwise the job is scheduled on an existing cluster.
+ // +optional
+ Schedule string `json:"schedule,omitempty"`
// EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command.
// +optional
EntrypointNumCpus float32 `json:"entrypointNumCpus,omitempty"`
@@ -233,6 +240,9 @@ type RayJobStatus struct {
// or the submitter Job has failed.
// +optional
EndTime *metav1.Time `json:"endTime,omitempty"`
+ // lastScheduledTime is the last time the job was successfully scheduled.
+ // +optional
+ LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
// Succeeded is the number of times this job succeeded.
// +kubebuilder:default:=0
// +optional
diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
index b4cb5decf12..7f80e2ca6f4 100644
--- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
+++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go
@@ -507,6 +507,10 @@ func (in *RayJobStatus) DeepCopyInto(out *RayJobStatus) {
in, out := &in.EndTime, &out.EndTime
*out = (*in).DeepCopy()
}
+ if in.LastScheduleTime != nil {
+ in, out := &in.LastScheduleTime, &out.LastScheduleTime
+ *out = (*in).DeepCopy()
+ }
if in.Succeeded != nil {
in, out := &in.Succeeded, &out.Succeeded
*out = new(int32)
diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
index 8f8679ca607..1167780856e 100644
--- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
+++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml
@@ -8091,6 +8091,8 @@ spec:
type: object
runtimeEnvYAML:
type: string
+ schedule:
+ type: string
shutdownAfterJobFinishes:
type: boolean
submissionMode:
@@ -11794,6 +11796,9 @@ spec:
type: string
jobStatus:
type: string
+ lastScheduleTime:
+ format: date-time
+ type: string
message:
type: string
observedGeneration:
diff --git a/ray-operator/config/samples/ray-job.schedule.yaml b/ray-operator/config/samples/ray-job.schedule.yaml
new file mode 100644
index 00000000000..ec22a9b2097
--- /dev/null
+++ b/ray-operator/config/samples/ray-job.schedule.yaml
@@ -0,0 +1,121 @@
+apiVersion: ray.io/v1
+kind: RayJob
+metadata:
+ name: rayjob-schedule
+spec:
+ # schedule specifires a cron scheduling string telling the rayjob when to start schedule and run new jobs
+ # Here it runs at every 5 minutes of every hour of every day of every week of every year
+ schedule: "*/5 * * * *"
+
+ entrypoint: python /home/ray/samples/sample_code.py
+
+ # shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
+ # NOTE that the expected behavior with schedule is that the cluster will be deleted and recreated at each schedule if set to true, and it will keep using the same cluster otherwise
+ shutdownAfterJobFinishes: true
+
+ runtimeEnvYAML: |
+ pip:
+ - requests==2.26.0
+ - pendulum==2.1.2
+ env_vars:
+ counter_name: "test_counter"
+
+
+ rayClusterSpec:
+ rayVersion: '2.46.0'
+ headGroupSpec:
+ rayStartParams: {}
+ template:
+ spec:
+ containers:
+ - name: ray-head
+ image: rayproject/ray:2.46.0
+ ports:
+ - containerPort: 6379
+ name: gcs-server
+ - containerPort: 8265
+ name: dashboard
+ - containerPort: 10001
+ name: client
+ resources:
+ limits:
+ cpu: "1"
+ requests:
+ cpu: "200m"
+ volumeMounts:
+ - mountPath: /home/ray/samples
+ name: code-sample
+ volumes:
+ - name: code-sample
+ configMap:
+ name: ray-job-code-sample
+ items:
+ - key: sample_code.py
+ path: sample_code.py
+ workerGroupSpecs:
+ - replicas: 1
+ minReplicas: 1
+ maxReplicas: 5
+ groupName: small-group
+ rayStartParams: {}
+ template:
+ spec:
+ containers:
+ - name: ray-worker
+ image: rayproject/ray:2.46.0
+ resources:
+ limits:
+ cpu: "1"
+ requests:
+ cpu: "200m"
+ # SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
+ # If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
+ # submitterPodTemplate:
+ # spec:
+ # restartPolicy: Never
+ # containers:
+ # - name: my-custom-rayjob-submitter-pod
+ # image: rayproject/ray:2.46.0
+ # # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
+ # # Specifying Command is not recommended.
+ # # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
+
+
+######################Ray code sample#################################
+# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
+# it is mounted into the container and executed to show the Ray job at work
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: ray-job-code-sample
+data:
+ sample_code.py: |
+ import ray
+ import os
+ import requests
+
+ ray.init()
+
+ @ray.remote
+ class Counter:
+ def __init__(self):
+ # Used to verify runtimeEnv
+ self.name = os.getenv("counter_name")
+ assert self.name == "test_counter"
+ self.counter = 0
+
+ def inc(self):
+ self.counter += 1
+
+ def get_counter(self):
+ return "{} got {}".format(self.name, self.counter)
+
+ counter = Counter.remote()
+
+ for _ in range(5):
+ ray.get(counter.inc.remote())
+ print(ray.get(counter.get_counter.remote()))
+
+ # Verify that the correct runtime env was used for the job.
+ assert requests.__version__ == "2.26.0"
diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go
index 551cd70f606..50e8647dc6b 100644
--- a/ray-operator/controllers/ray/rayjob_controller.go
+++ b/ray-operator/controllers/ray/rayjob_controller.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/go-logr/logr"
+ "github.com/robfig/cron/v3"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -34,6 +35,8 @@ const (
RayJobDefaultRequeueDuration = 3 * time.Second
RayJobDefaultClusterSelectorKey = "ray.io/cluster"
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
+ // The buffer period in which a scheduled rajob can run since the last cron tick
+ ScheduleBuffer = 100 * time.Millisecond
)
// RayJobReconciler reconciles a RayJob object
@@ -168,6 +171,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
+ // We check the LastScheduleTime to know if its the first job
+ if rayJobInstance.Spec.Schedule != "" && rayJobInstance.Status.LastScheduleTime == nil {
+ rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
+ break
+ }
// Set `Status.JobDeploymentStatus` to `JobDeploymentStatusInitializing`, and initialize `Status.JobId`
// and `Status.RayClusterName` prior to avoid duplicate job submissions and cluster creations.
logger.Info("JobDeploymentStatusNew")
@@ -449,9 +457,58 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
+ if rayJobInstance.Spec.Schedule != "" {
+ logger.Info("Rescheduling RayJob")
+ rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduling
+ break
+ }
// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
+ case rayv1.JobDeploymentStatusScheduling:
+ isJobDeleted, err := r.deleteSubmitterJob(ctx, rayJobInstance)
+ if err != nil {
+ return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
+ }
+
+ if !isJobDeleted {
+ logger.Info("The release of the compute resources has not been completed yet. " +
+ "Wait for the resources to be deleted before the status transitions to avoid a resource leak.")
+ return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
+ }
+
+ if rayJobInstance.Spec.ShutdownAfterJobFinishes {
+ rayJobInstance.Status.RayClusterStatus = rayv1.RayClusterStatus{}
+ rayJobInstance.Status.RayClusterName = ""
+
+ }
+ rayJobInstance.Status.DashboardURL = ""
+ rayJobInstance.Status.JobId = ""
+ rayJobInstance.Status.Message = ""
+ rayJobInstance.Status.Reason = ""
+ rayJobInstance.Status.RayJobStatusInfo = rayv1.RayJobStatusInfo{}
+
+ rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
+ rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
+ case rayv1.JobDeploymentStatusScheduled:
+ // We get the time from the current time to the previous and next cron schedule times
+ // We pass in time.Now() as a parameter so easier unit testing and consistency
+ t1, t2, err := r.getNextAndPreviousScheduleDistance(ctx, time.Now(), rayJobInstance)
+ if err != nil {
+ logger.Error(err, "Could not get the previous and next distances for a cron schedule")
+ return ctrl.Result{}, err
+ }
+ // Checking if we are currently within a buffer to the previous cron schedule time
+ if t2 <= ScheduleBuffer {
+ logger.Info("The current time is within the buffer window of a cron tick", "NextScheduleTimeDuration", t1, "LastScheduleTimeDuration", t2, "Previous LastScheduleTime", rayJobInstance.Status.LastScheduleTime)
+ rayJobInstance.Status.LastScheduleTime = &metav1.Time{Time: time.Now()}
+ rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
+ rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusNew
+ } else {
+ logger.Info("Waiting until the next reconcile to determine schedule", "nextScheduleDuration", t1, "currentTime", time.Now(), "lastScheduleTimeDuration", t2)
+ return ctrl.Result{RequeueAfter: t1}, nil
+ }
+
default:
logger.Info("Unknown JobDeploymentStatus", "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
@@ -894,6 +951,23 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
return rayCluster, nil
}
+func (r *RayJobReconciler) getNextAndPreviousScheduleDistance(ctx context.Context, currentTime time.Time, rayJobInstance *rayv1.RayJob) (time.Duration, time.Duration, error) {
+ logger := ctrl.LoggerFrom(ctx)
+ formatedCron := utils.FormatSchedule(rayJobInstance, r.Recorder)
+ cronSchedule, err := cron.ParseStandard(formatedCron)
+ if err != nil {
+ // this is likely a user error in defining the spec value
+ // we should log the error and not reconcile this cronjob until an update to spec
+ r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", rayJobInstance.Spec.Schedule, err)
+ return 0, 0, fmt.Errorf("the cron schedule provided is unparseable: %w", err)
+ }
+
+ t1 := utils.NextScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
+ t2 := utils.LastScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
+
+ return t1, t2, nil
+}
+
func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
logger := ctrl.LoggerFrom(ctx)
if !rayJob.Spec.Suspend {
diff --git a/ray-operator/controllers/ray/rayjob_controller_scheduled_test.go b/ray-operator/controllers/ray/rayjob_controller_scheduled_test.go
new file mode 100644
index 00000000000..2a93db55461
--- /dev/null
+++ b/ray-operator/controllers/ray/rayjob_controller_scheduled_test.go
@@ -0,0 +1,180 @@
+/*
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package ray
+
+import (
+ "context"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ batchv1 "k8s.io/api/batch/v1"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/apimachinery/pkg/types"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+
+ rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
+ "github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
+ "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
+)
+
+var _ = Context("RayJob with schedule operation", func() {
+ Describe("When creating a RayJob with a schedule field and NO cluster deletion", Ordered, func() {
+ // The states should transition from Scheduled -> ... -> Initializing -> Running -> Complete -> Scheduled
+ // In the last scheduled state the cluster should still exist since ShutdownAfterJobFinishes is False
+ ctx := context.Background()
+ namespace := "default"
+ cronSchedule := "0 0 1 1 *"
+ rayJob := rayJobTemplate("rayjob-scheduled-no-deletion", namespace)
+ rayJob.Spec.Schedule = cronSchedule
+ rayJob.Spec.ShutdownAfterJobFinishes = false
+ rayCluster := &rayv1.RayCluster{}
+
+ It("Verify RayJob spec", func() {
+ Expect(rayJob.Spec.ShutdownAfterJobFinishes).To(BeFalse())
+ Expect(rayJob.Spec.Schedule).To(Not(BeEmpty()))
+ })
+
+ It("should create a RayJob object with the schedule", func() {
+ err := k8sClient.Create(ctx, rayJob)
+ Expect(err).NotTo(HaveOccurred(), "failed to create test scheduled RayJob resource")
+ })
+
+ It("should start in the Scheduled state", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*60, time.Microsecond*500).Should(Equal(rayv1.JobDeploymentStatusScheduled),
+ "JobDeploymentStatus should be Scheduled")
+ })
+
+ It("should NOT create a raycluster object while scheduled", func() {
+ Consistently(
+ getRayClusterNameForRayJob(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(BeEmpty())
+ })
+
+ It("should update schedule string in its spec", func() {
+ rayJobLookupKey := types.NamespacedName{Name: rayJob.Name, Namespace: rayJob.Namespace}
+ fetchedRayJob := &rayv1.RayJob{}
+ newSchedule := "*/1 * * * *"
+
+ err := k8sClient.Get(ctx, rayJobLookupKey, fetchedRayJob)
+ Expect(err).NotTo(HaveOccurred(), "failed to get RayJob before schedule update")
+
+ fetchedRayJob.Spec.Schedule = newSchedule
+ err = updateRayJobScheduleField(ctx, fetchedRayJob, newSchedule)
+ Expect(err).NotTo(HaveOccurred(), "failed to update RayJob's schedule in spec")
+ })
+
+ // The cron job runs every minute so it will take at most 1 minute to run
+ It("should transition to the Initializing", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*70, time.Microsecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing),
+ "JobDeploymentStatus should be Initializing")
+ })
+
+ It("should create a raycluster object", func() {
+ Eventually(
+ getRayClusterNameForRayJob(ctx, rayJob),
+ time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()))
+ Eventually(
+ getResourceFunc(ctx, common.RayJobRayClusterNamespacedName(rayJob), rayCluster),
+ time.Second*3, time.Millisecond*500).Should(Succeed())
+ })
+
+ // We are checking if LastScheduleTime is set
+ It("should have LastScheduleTime updated in its status", func() {
+ Eventually(
+ getLastScheduleTime(ctx, k8sClient, rayJob),
+ time.Second*10, time.Millisecond*500,
+ ).ShouldNot(BeNil(), "expected LastScheduleTime to be set")
+ })
+
+ It("should NOT create the underlying K8s job yet because the cluster is not ready", func() {
+ k8sJob := &batchv1.Job{}
+ Consistently(
+ // k8sClient client throws error if resource not found
+ func() bool {
+ err := getResourceFunc(ctx, common.RayJobK8sJobNamespacedName(rayJob), k8sJob)()
+ return errors.IsNotFound(err)
+ },
+ time.Second*3, time.Millisecond*500).Should(BeTrue())
+ })
+
+ It("should be able to update all Pods to Running", func() {
+ updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+ updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace)
+ })
+
+ It("Dashboard URL should be set", func() {
+ Eventually(
+ getDashboardURLForRayJob(ctx, rayJob),
+ time.Second*3, time.Millisecond*500).Should(HavePrefix(rayJob.Name), "Dashboard URL = %v", rayJob.Status.DashboardURL)
+ })
+
+ It("should create the underlying Kubernetes Job object", func() {
+ k8sJob := &batchv1.Job{}
+ // The underlying Kubernetes Job should be created when the RayJob is scheduled to run
+ Eventually(
+ getResourceFunc(ctx, common.RayJobK8sJobNamespacedName(rayJob), k8sJob),
+ time.Second*3, time.Millisecond*500).Should(Succeed(), "Expected Kubernetes job to be present")
+ })
+
+ It("should transition to the Running", func() {
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning),
+ "JobDeploymentStatus should be Running")
+ })
+
+ It("RayJobs's JobDeploymentStatus transitions to Scheduled after Complete.", func() {
+ // Update fake dashboard client to return job info with "Succeeded" status.
+ getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required
+ return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded, EndTime: uint64(time.Now().UnixMilli())}, nil
+ }
+ fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo)
+ defer fakeRayDashboardClient.GetJobInfoMock.Store(nil)
+
+ // Update the submitter Kubernetes Job to Complete.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ job := &batchv1.Job{}
+ err := k8sClient.Get(ctx, namespacedName, job)
+ Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job")
+
+ conditions := []batchv1.JobCondition{
+ {Type: batchv1.JobComplete, Status: corev1.ConditionTrue},
+ }
+ job.Status.Conditions = conditions
+ Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed())
+
+ // RayJob transitions to Scheduled.
+ Eventually(
+ getRayJobDeploymentStatus(ctx, rayJob),
+ time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusScheduled), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus)
+ })
+
+ It("The raycluster object should still exist", func() {
+ Eventually(
+ func() bool {
+ err := getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster)()
+ return err == nil
+ },
+ time.Second*15, time.Millisecond*500).Should(BeTrue(), "Expected RayCluster to still exist")
+ })
+ })
+})
diff --git a/ray-operator/controllers/ray/rayjob_controller_unit_test.go b/ray-operator/controllers/ray/rayjob_controller_unit_test.go
index 4e0eda26cb5..221b817c80d 100644
--- a/ray-operator/controllers/ray/rayjob_controller_unit_test.go
+++ b/ray-operator/controllers/ray/rayjob_controller_unit_test.go
@@ -624,3 +624,87 @@ func TestEmitRayJobExecutionDuration(t *testing.T) {
})
}
}
+
+func TestGetNextAndPreviousScheduleDistance(t *testing.T) {
+ newScheme := runtime.NewScheme()
+ _ = rayv1.AddToScheme(newScheme)
+ _ = corev1.AddToScheme(newScheme)
+
+ testCases := []struct {
+ currentTime time.Time
+ initialLastTime *metav1.Time
+ name string
+ schedule string
+ expectedNextDelta time.Duration
+ expectedPrevDelta time.Duration
+ expectedErr bool
+ isWithinBuffer bool
+ }{
+ {
+ name: "Test 1: Invalid cron string",
+ schedule: "INVLAID * CRON * STRING * WOW",
+ currentTime: time.Now(),
+ expectedErr: true,
+ },
+ {
+ name: "Test 2: Not within the buffer period - future schedule",
+ schedule: "0 0 * * *", // Every day at midnight
+ currentTime: time.Date(2025, time.July, 1, 10, 0, 0, 0, time.UTC), // 2025-07-01 10:00 AM
+ // Next schedule tick is 2025-07-02 00:00:00 (in 14 hours)
+ // Last schedule tick is 2025-07-01 00:00:00 (10 hours ago)
+ expectedNextDelta: 14 * time.Hour,
+ expectedPrevDelta: 10 * time.Hour,
+ expectedErr: false,
+ isWithinBuffer: false,
+ },
+ {
+ name: "Test 3: Within the buffer period - next schedule",
+ schedule: "*/10 * * * *", // Every 10 minutes
+ currentTime: time.Date(2025, time.July, 1, 10, 10, 0, 0, time.UTC), // 2025-07-01 10:10:00
+ // Next schedule tick is 2025-07-01 10:10:00 (in 10 minutes)
+ // Last schedule tick is 2025-07-01 10:00:00 (10 minutes ago)
+ expectedNextDelta: 10 * time.Minute,
+ expectedPrevDelta: 0 * time.Minute,
+ expectedErr: false,
+ isWithinBuffer: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ rayJob := &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-rayjob",
+ Namespace: "default",
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: tc.schedule,
+ },
+ }
+
+ fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(rayJob).Build()
+ recorder := record.NewFakeRecorder(100)
+
+ reconciler := &RayJobReconciler{
+ Client: fakeClient,
+ Recorder: recorder,
+ Scheme: newScheme,
+ }
+
+ nextDuration, prevDuration, err := reconciler.getNextAndPreviousScheduleDistance(context.Background(), tc.currentTime, rayJob)
+
+ if tc.expectedErr {
+ require.Error(t, err)
+ assert.Contains(t, <-recorder.Events, "UnparseableSchedule")
+ } else {
+ require.NoError(t, err)
+
+ assert.InDelta(t, tc.expectedNextDelta.Seconds(), nextDuration.Seconds(), 1.0, "NextScheduleTimeDuration mismatch")
+ assert.InDelta(t, tc.expectedPrevDelta.Seconds(), prevDuration.Seconds(), 1.0, "LastScheduleTimeDuration mismatch")
+
+ isCurrentlyWithinBuffer := (nextDuration < ScheduleBuffer) || (prevDuration < ScheduleBuffer)
+ assert.Equal(t, tc.isWithinBuffer, isCurrentlyWithinBuffer, "isWithinBuffer check mismatch")
+ }
+ })
+ }
+}
diff --git a/ray-operator/controllers/ray/suite_helpers_test.go b/ray-operator/controllers/ray/suite_helpers_test.go
index f463ef559d9..62970837256 100644
--- a/ray-operator/controllers/ray/suite_helpers_test.go
+++ b/ray-operator/controllers/ray/suite_helpers_test.go
@@ -205,6 +205,22 @@ func getPendingRayClusterWorkerGroupSpecsFunc(ctx context.Context, rayService *r
}
}
+func getLastScheduleTime(ctx context.Context, k8sClient client.Client, rayJob *rayv1.RayJob) func() (*time.Time, error) {
+ return func() (*time.Time, error) {
+ rayJobLookupKey := client.ObjectKey{Name: rayJob.Name, Namespace: rayJob.Namespace}
+ fetchedRayJob := &rayv1.RayJob{}
+
+ if err := k8sClient.Get(ctx, rayJobLookupKey, fetchedRayJob); err != nil {
+ return nil, err
+ }
+
+ if fetchedRayJob.Status.LastScheduleTime != nil {
+ return &fetchedRayJob.Status.LastScheduleTime.Time, nil
+ }
+ return nil, nil
+ }
+}
+
func checkServiceHealth(ctx context.Context, rayService *rayv1.RayService) func() (bool, error) {
return func() (bool, error) {
if err := k8sClient.Get(ctx, client.ObjectKey{Name: rayService.Name, Namespace: rayService.Namespace}, rayService); err != nil {
@@ -319,6 +335,17 @@ func updateRayJobSuspendField(ctx context.Context, rayJob *rayv1.RayJob, suspend
})
}
+func updateRayJobScheduleField(ctx context.Context, rayJob *rayv1.RayJob, schedule string) error {
+ return retry.RetryOnConflict(retry.DefaultRetry, func() error {
+ err := k8sClient.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: rayJob.Name}, rayJob)
+ if err != nil {
+ return err
+ }
+ rayJob.Spec.Schedule = schedule
+ return k8sClient.Update(ctx, rayJob)
+ })
+}
+
func setJobIdOnRayJob(ctx context.Context, rayJob *rayv1.RayJob, jobId string) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: rayJob.Name}, rayJob)
diff --git a/ray-operator/controllers/ray/utils/schedule.go b/ray-operator/controllers/ray/utils/schedule.go
new file mode 100644
index 00000000000..52401ccf1f2
--- /dev/null
+++ b/ray-operator/controllers/ray/utils/schedule.go
@@ -0,0 +1,162 @@
+/*
+Portions of this file are derived from the CronJob reasource in the Kubernetes project,
+Kubernetes project: https://github.com/kubernetes/kubernetes
+Licensed under the Apache License, Version 2.0.
+For more information on Kubernetes CronJob, refer to:
+https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
+*/
+
+package utils
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/go-logr/logr"
+ "github.com/robfig/cron/v3"
+ corev1 "k8s.io/api/core/v1"
+ "k8s.io/client-go/tools/record"
+
+ rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
+)
+
+type MissedSchedulesType int
+
+const (
+ noneMissed MissedSchedulesType = iota
+ fewMissed
+ manyMissed
+)
+
+func MostRecentScheduleTime(rj *rayv1.RayJob, now time.Time, schedule cron.Schedule) (time.Time, *time.Time, MissedSchedulesType, error) {
+ earliestTime := rj.ObjectMeta.CreationTimestamp.Time
+ missedSchedules := noneMissed
+ if rj.Status.LastScheduleTime != nil {
+ earliestTime = rj.Status.LastScheduleTime.Time
+ }
+
+ t1 := schedule.Next(earliestTime)
+ t2 := schedule.Next(t1)
+
+ if now.Before(t1) {
+ return earliestTime, nil, missedSchedules, nil
+ }
+ if now.Before(t2) {
+ return earliestTime, &t1, missedSchedules, nil
+ }
+
+ // It is possible for cron.ParseStandard("59 23 31 2 *") to return an invalid schedule
+ // minute - 59, hour - 23, dom - 31, month - 2, and dow is optional, clearly 31 is invalid
+ // In this case the timeBetweenTwoSchedules will be 0, and we error out the invalid schedule
+ timeBetweenTwoSchedules := int64(t2.Sub(t1).Round(time.Second).Seconds())
+ if timeBetweenTwoSchedules < 1 {
+ return earliestTime, nil, missedSchedules, fmt.Errorf("time difference between two schedules is less than 1 second")
+ }
+ // this logic used for calculating number of missed schedules does a rough
+ // approximation, by calculating a diff between two schedules (t1 and t2),
+ // and counting how many of these will fit in between last schedule and now
+ timeElapsed := int64(now.Sub(t1).Seconds())
+ numberOfMissedSchedules := (timeElapsed / timeBetweenTwoSchedules) + 1
+
+ var mostRecentTime time.Time
+ // to get the most recent time accurate for regular schedules and the ones
+ // specified with @every form, we first need to calculate the potential earliest
+ // time by multiplying the initial number of missed schedules by its interval,
+ // this is critical to ensure @every starts at the correct time, this explains
+ // the numberOfMissedSchedules-1, the additional -1 serves there to go back
+ // in time one more time unit, and let the cron library calculate a proper
+ // schedule, for case where the schedule is not consistent, for example
+ // something like 30 6-16/4 * * 1-5
+ potentialEarliest := t1.Add(time.Duration((numberOfMissedSchedules-1-1)*timeBetweenTwoSchedules) * time.Second)
+ for t := schedule.Next(potentialEarliest); !t.After(now); t = schedule.Next(t) {
+ mostRecentTime = t
+ }
+
+ // An object might miss several starts. For example, if
+ // controller gets wedged on friday at 5:01pm when everyone has
+ // gone home, and someone comes in on tuesday AM and discovers
+ // the problem and restarts the controller, then all the hourly
+ // jobs, more than 80 of them for one hourly cronJob, should
+ // all start running with no further intervention (if the cronJob
+ // allows concurrency and late starts).
+ //
+ // However, if there is a bug somewhere, or incorrect clock
+ // on controller's server or apiservers (for setting creationTimestamp)
+ // then there could be so many missed start times (it could be off
+ // by decades or more), that it would eat up all the CPU and memory
+ // of this controller. In that case, we want to not try to list
+ // all the missed start times.
+ //
+ // I've somewhat arbitrarily picked 100, as more than 80,
+ // but less than "lots".
+ switch {
+ case numberOfMissedSchedules > 100:
+ missedSchedules = manyMissed
+ // inform about few missed, still
+ case numberOfMissedSchedules > 0:
+ missedSchedules = fewMissed
+ }
+
+ if mostRecentTime.IsZero() {
+ return earliestTime, nil, missedSchedules, nil
+ }
+ return earliestTime, &mostRecentTime, missedSchedules, nil
+}
+
+func FormatSchedule(rj *rayv1.RayJob, recorder record.EventRecorder) string {
+ if strings.Contains(rj.Spec.Schedule, "TZ") {
+ if recorder != nil {
+ recorder.Eventf(rj, corev1.EventTypeWarning, "UnsupportedSchedule", "CRON_TZ or TZ used in schedule %q is not officially supported, see https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ for more details", rj.Spec.Schedule)
+ }
+
+ return rj.Spec.Schedule
+ }
+
+ return rj.Spec.Schedule
+}
+
+// nextScheduleTimeDuration returns the time duration to requeue a cron schedule based on
+// the schedule and last schedule time.
+func NextScheduleTimeDuration(logger logr.Logger, rj *rayv1.RayJob, now time.Time, schedule cron.Schedule) time.Duration {
+ earliestTime, mostRecentTime, missedSchedules, err := MostRecentScheduleTime(rj, now, schedule)
+ if err != nil {
+ // we still have to requeue at some point, so aim for the next scheduling slot from now
+ logger.Info("Error in mostRecentScheduleTime, we still have to requeue at some point, so aim for the next scheduling slot from now", "Error", err)
+ mostRecentTime = &now
+ } else if mostRecentTime == nil {
+ logger.Info("mostRecentTime doesnt exist")
+ if missedSchedules == noneMissed {
+ // no missed schedules since earliestTime
+ mostRecentTime = &earliestTime
+ } else {
+ // if there are missed schedules since earliestTime, always use now
+ mostRecentTime = &now
+ }
+ }
+ logger.Info("Successfully calculated earliestTime and mostRecentTime", "mostRecentTime", mostRecentTime, "Current Time", now, "Next time to aim for", schedule.Next(*mostRecentTime))
+ t := schedule.Next(*mostRecentTime).Sub(now)
+ return t
+}
+
+// The LastScheduleTimeDuration function returns the last previous cron time.
+// It calculates the most recent time a schedule should have executed based
+// on the RayJob's creation time (or its last scheduled status) and the current time 'now'.
+func LastScheduleTimeDuration(logger logr.Logger, rj *rayv1.RayJob, now time.Time, schedule cron.Schedule) time.Duration {
+ earliestTime, mostRecentTime, missedSchedules, err := MostRecentScheduleTime(rj, now, schedule)
+ if err != nil {
+ // We still have to requeue at some point, so aim for the next scheduling slot from now
+ logger.Info("Error in mostRecentScheduleTime, we still have to requeue at some point", "Error", err)
+ mostRecentTime = &now
+ } else if mostRecentTime == nil {
+ logger.Info("mostRecentTime doesnt exist")
+ if missedSchedules == noneMissed {
+ // No missed schedules since earliestTime
+ mostRecentTime = &earliestTime
+ } else {
+ // If there are missed schedules since earliestTime, always use now
+ mostRecentTime = &now
+ }
+ }
+ return now.Sub(*mostRecentTime)
+}
diff --git a/ray-operator/controllers/ray/utils/schedule_test.go b/ray-operator/controllers/ray/utils/schedule_test.go
new file mode 100644
index 00000000000..a92264a48b8
--- /dev/null
+++ b/ray-operator/controllers/ray/utils/schedule_test.go
@@ -0,0 +1,498 @@
+package utils
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/go-logr/logr/testr"
+ cron "github.com/robfig/cron/v3"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+ rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
+)
+
+func TestMostRecentScheduleTime(t *testing.T) {
+ metav1TopOfTheHour := metav1.NewTime(*topOfTheHour())
+ metav1HalfPastTheHour := metav1.NewTime(*deltaTimeAfterTopOfTheHour(30 * time.Minute))
+
+ tests := []struct {
+ now time.Time
+ expectedEarliestTime time.Time
+ cj *rayv1.RayJob
+ expectedRecentTime *time.Time
+ name string
+ expectedTooManyMissed MissedSchedulesType
+ includeSDS bool
+ wantErr bool
+ }{
+ {
+ name: "now before next schedule",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 * * * *",
+ },
+ },
+ now: topOfTheHour().Add(30 * time.Second),
+ expectedRecentTime: nil,
+ expectedEarliestTime: *topOfTheHour(),
+ },
+ {
+ name: "now just after next schedule",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 * * * *",
+ },
+ },
+ now: topOfTheHour().Add(61 * time.Minute),
+ expectedRecentTime: deltaTimeAfterTopOfTheHour(60 * time.Minute),
+ expectedEarliestTime: *topOfTheHour(),
+ },
+ {
+ name: "missed 5 schedules",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1.NewTime(*deltaTimeAfterTopOfTheHour(10 * time.Second)),
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 * * * *",
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(301 * time.Minute),
+ expectedRecentTime: deltaTimeAfterTopOfTheHour(300 * time.Minute),
+ expectedEarliestTime: *deltaTimeAfterTopOfTheHour(10 * time.Second),
+ expectedTooManyMissed: fewMissed,
+ },
+ {
+ name: "complex schedule",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 6-16/4 * * 1-5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1HalfPastTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(24*time.Hour + 31*time.Minute),
+ expectedRecentTime: deltaTimeAfterTopOfTheHour(24*time.Hour + 30*time.Minute),
+ expectedEarliestTime: *deltaTimeAfterTopOfTheHour(30 * time.Minute),
+ expectedTooManyMissed: fewMissed,
+ },
+ {
+ name: "another complex schedule",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 10,11,12 * * 1-5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1HalfPastTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(30*time.Hour + 30*time.Minute),
+ expectedRecentTime: nil,
+ expectedEarliestTime: *deltaTimeAfterTopOfTheHour(30 * time.Minute),
+ expectedTooManyMissed: fewMissed,
+ },
+ {
+ name: "complex schedule with longer diff between executions",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 6-16/4 * * 1-5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1HalfPastTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(96*time.Hour + 31*time.Minute),
+ expectedRecentTime: deltaTimeAfterTopOfTheHour(96*time.Hour + 30*time.Minute),
+ expectedEarliestTime: *deltaTimeAfterTopOfTheHour(30 * time.Minute),
+ expectedTooManyMissed: fewMissed,
+ },
+ {
+ name: "complex schedule with shorter diff between executions",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 6-16/4 * * 1-5",
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(24*time.Hour + 31*time.Minute),
+ expectedRecentTime: deltaTimeAfterTopOfTheHour(24*time.Hour + 30*time.Minute),
+ expectedEarliestTime: *topOfTheHour(),
+ expectedTooManyMissed: fewMissed,
+ },
+ {
+ name: "rogue cronjob",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1.NewTime(*deltaTimeAfterTopOfTheHour(10 * time.Second)),
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "59 23 31 2 *",
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(1 * time.Hour),
+ expectedRecentTime: nil,
+ wantErr: true,
+ },
+ {
+ name: "earliestTime being CreationTimestamp and LastScheduleTime",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 * * * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1TopOfTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(30 * time.Second),
+ expectedEarliestTime: *topOfTheHour(),
+ expectedRecentTime: nil,
+ },
+ {
+ name: "earliestTime being LastScheduleTime",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "*/5 * * * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1HalfPastTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(31 * time.Minute),
+ expectedEarliestTime: *deltaTimeAfterTopOfTheHour(30 * time.Minute),
+ expectedRecentTime: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sched, err := cron.ParseStandard(tt.cj.Spec.Schedule)
+ if err != nil {
+ t.Errorf("error setting up the test, %s", err)
+ }
+ gotEarliestTime, gotRecentTime, gotTooManyMissed, err := MostRecentScheduleTime(tt.cj, tt.now, sched)
+ if tt.wantErr {
+ if err == nil {
+ t.Error("mostRecentScheduleTime() got no error when expected one")
+ }
+ return
+ }
+ if !tt.wantErr && err != nil {
+ t.Error("mostRecentScheduleTime() got error when none expected")
+ }
+ if gotEarliestTime.IsZero() {
+ t.Errorf("earliestTime should never be 0, want %v", tt.expectedEarliestTime)
+ }
+ if !gotEarliestTime.Equal(tt.expectedEarliestTime) {
+ t.Errorf("expectedEarliestTime - got %v, want %v", gotEarliestTime, tt.expectedEarliestTime)
+ }
+ if !reflect.DeepEqual(gotRecentTime, tt.expectedRecentTime) {
+ t.Errorf("expectedRecentTime - got %v, want %v", gotRecentTime, tt.expectedRecentTime)
+ }
+ if gotTooManyMissed != tt.expectedTooManyMissed {
+ t.Errorf("expectedNumberOfMisses - got %v, want %v", gotTooManyMissed, tt.expectedTooManyMissed)
+ }
+ })
+ }
+}
+
+func TestNextScheduleTimeDuration(t *testing.T) {
+ logger := testr.New(t)
+ metav1TopOfTheHour := metav1.NewTime(*topOfTheHour())
+ metav1HalfPastTheHour := metav1.NewTime(*deltaTimeAfterTopOfTheHour(30 * time.Minute))
+ metav1TwoHoursLater := metav1.NewTime(*deltaTimeAfterTopOfTheHour(2 * time.Hour))
+
+ tests := []struct {
+ now time.Time
+ cj *rayv1.RayJob
+ name string
+ expectedDuration time.Duration
+ }{
+ {
+ name: "complex schedule skipping weekend",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 6-16/4 * * 1-5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1HalfPastTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(24*time.Hour + 31*time.Minute),
+ expectedDuration: 3*time.Hour + 59*time.Minute,
+ },
+ {
+ name: "another complex schedule skipping weekend",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 10,11,12 * * 1-5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1HalfPastTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(30*time.Hour + 30*time.Minute),
+ expectedDuration: 66 * time.Hour,
+ },
+ {
+ name: "once a week cronjob, missed two runs",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 12 * * 4",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1TwoHoursLater,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(19*24*time.Hour + 1*time.Hour + 30*time.Minute),
+ expectedDuration: 48*time.Hour + 30*time.Minute,
+ },
+ {
+ name: "no previous run of a cronjob",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 12 * * 5",
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(6 * time.Hour),
+ expectedDuration: 20 * time.Hour,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sched, err := cron.ParseStandard(tt.cj.Spec.Schedule)
+ if err != nil {
+ t.Errorf("error setting up the test, %s", err)
+ }
+ gotScheduleTimeDuration := NextScheduleTimeDuration(logger, tt.cj, tt.now, sched)
+ if gotScheduleTimeDuration < 0 {
+ t.Errorf("scheduleTimeDuration should never be less than 0, got %s", gotScheduleTimeDuration)
+ }
+ if !reflect.DeepEqual(&gotScheduleTimeDuration, &tt.expectedDuration) {
+ t.Errorf("scheduleTimeDuration - got %s, want %s, difference: %s", gotScheduleTimeDuration, tt.expectedDuration, gotScheduleTimeDuration-tt.expectedDuration)
+ }
+ })
+ }
+}
+
+func TestLastScheduleTimeDuration(t *testing.T) {
+ logger := testr.New(t)
+ metav1TopOfTheHour := metav1.NewTime(*topOfTheHour())
+ metav1OneHourAgo := metav1.NewTime(*deltaTimeAfterTopOfTheHour(-1 * time.Hour))
+ metav1YesterdayMidday := metav1.NewTime(metav1TopOfTheHour.Add(-28 * time.Hour))
+ metav1TwoDaysAgo := metav1.NewTime(metav1TopOfTheHour.Add(-48 * time.Hour))
+ metav1FourMonthsAgo := metav1.NewTime(metav1TopOfTheHour.AddDate(0, -4, 0))
+ metav1FiveMonthsAgo := metav1.NewTime(metav1TopOfTheHour.AddDate(0, -5, 0))
+
+ tests := []struct {
+ now time.Time
+ cj *rayv1.RayJob
+ name string
+ expectedDuration time.Duration
+ }{
+ {
+ name: "hourly job, last scheduled 30 minutes ago",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 * * * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1TopOfTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(30 * time.Minute),
+ expectedDuration: 30 * time.Minute,
+ },
+ {
+ name: "daily job, last scheduled yesterday, now is midday next day",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TwoDaysAgo,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 12 * * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1YesterdayMidday,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(26*time.Hour + 5*time.Minute),
+ expectedDuration: 5 * time.Minute,
+ },
+ {
+ name: "weekly job, no previous run, now before first schedule",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 12 * * 5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: nil,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(24 * time.Hour),
+ expectedDuration: 24 * time.Hour,
+ },
+ {
+ name: "weekly job, no previous run, now after first schedule (missed)",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 10 * * 5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: nil,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(8 * 24 * time.Hour),
+ expectedDuration: 0 * time.Minute,
+ },
+ {
+ name: "cronjob with lastScheduleTime equal to now",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 * * * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1TopOfTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(0 * time.Minute),
+ expectedDuration: 0 * time.Minute,
+ },
+ {
+ name: "complex schedule, now before first next run after lastScheduleTime",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1OneHourAgo,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "30 6-16/4 * * 1-5",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: func() *metav1.Time {
+ t := metav1.NewTime(*deltaTimeAfterTopOfTheHour(-90 * time.Minute))
+ return &t
+ }(),
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(-60 * time.Minute),
+ expectedDuration: 30 * time.Minute,
+ },
+ {
+ name: "daily job, last scheduled today earlier, now is later",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1TopOfTheHour,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 16 * * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1TopOfTheHour,
+ },
+ },
+ now: *deltaTimeAfterTopOfTheHour(1 * time.Hour),
+ expectedDuration: 1 * time.Hour,
+ },
+ {
+ name: "monthly job, missed several months, now is far past last schedule",
+ cj: &rayv1.RayJob{
+ ObjectMeta: metav1.ObjectMeta{
+ CreationTimestamp: metav1FiveMonthsAgo,
+ },
+ Spec: rayv1.RayJobSpec{
+ Schedule: "0 0 1 * *",
+ },
+ Status: rayv1.RayJobStatus{
+ LastScheduleTime: &metav1FourMonthsAgo,
+ },
+ },
+ now: metav1TopOfTheHour.Add(1 * time.Hour),
+ expectedDuration: metav1TopOfTheHour.Add(1 * time.Hour).Sub(time.Date(2016, time.May, 1, 0, 0, 0, 0, time.UTC)),
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ sched, err := cron.ParseStandard(tt.cj.Spec.Schedule)
+ if err != nil {
+ t.Errorf("error setting up the test, %s", err)
+ return
+ }
+ t.Log(tt.now)
+
+ gotScheduleTimeDuration := LastScheduleTimeDuration(logger, tt.cj, tt.now, sched)
+
+ if gotScheduleTimeDuration < 0 {
+ t.Errorf("LastScheduleTimeDuration should never be less than 0, got %s", gotScheduleTimeDuration)
+ }
+
+ if !reflect.DeepEqual(gotScheduleTimeDuration, tt.expectedDuration) {
+ t.Errorf("LastScheduleTimeDuration - got %s, want %s, difference: %s", gotScheduleTimeDuration, tt.expectedDuration, gotScheduleTimeDuration-tt.expectedDuration)
+ }
+ })
+ }
+}
+
+func topOfTheHour() *time.Time {
+ T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
+ if err != nil {
+ panic("test setup error")
+ }
+ return &T1
+}
+
+func deltaTimeAfterTopOfTheHour(duration time.Duration) *time.Time {
+ T1, err := time.Parse(time.RFC3339, "2016-05-19T10:00:00Z")
+ if err != nil {
+ panic("test setup error")
+ }
+ t := T1.Add(duration)
+ return &t
+}
diff --git a/ray-operator/go.mod b/ray-operator/go.mod
index 05e6cc16cde..cd307d87373 100644
--- a/ray-operator/go.mod
+++ b/ray-operator/go.mod
@@ -13,6 +13,8 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
+ github.com/robfig/cron v1.2.0
+ github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.10.0
go.uber.org/mock v0.5.2
go.uber.org/zap v1.27.0
diff --git a/ray-operator/go.sum b/ray-operator/go.sum
index 0c83565b518..fe91d9f7103 100644
--- a/ray-operator/go.sum
+++ b/ray-operator/go.sum
@@ -112,6 +112,10 @@ github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
index 14083877a44..cebba3a2efe 100644
--- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
+++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobspec.go
@@ -24,6 +24,7 @@ type RayJobSpecApplyConfiguration struct {
JobId *string `json:"jobId,omitempty"`
SubmissionMode *rayv1.JobSubmissionMode `json:"submissionMode,omitempty"`
EntrypointResources *string `json:"entrypointResources,omitempty"`
+ Schedule *string `json:"schedule,omitempty"`
EntrypointNumCpus *float32 `json:"entrypointNumCpus,omitempty"`
EntrypointNumGpus *float32 `json:"entrypointNumGpus,omitempty"`
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`
@@ -161,6 +162,14 @@ func (b *RayJobSpecApplyConfiguration) WithEntrypointResources(value string) *Ra
return b
}
+// WithSchedule sets the Schedule field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the Schedule field is set to the value of the last call.
+func (b *RayJobSpecApplyConfiguration) WithSchedule(value string) *RayJobSpecApplyConfiguration {
+ b.Schedule = &value
+ return b
+}
+
// WithEntrypointNumCpus sets the EntrypointNumCpus field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the EntrypointNumCpus field is set to the value of the last call.
diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobstatus.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobstatus.go
index de1e5fdba42..acdc9a688cf 100644
--- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobstatus.go
+++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayjobstatus.go
@@ -20,6 +20,7 @@ type RayJobStatusApplyConfiguration struct {
Message *string `json:"message,omitempty"`
StartTime *metav1.Time `json:"startTime,omitempty"`
EndTime *metav1.Time `json:"endTime,omitempty"`
+ LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
Succeeded *int32 `json:"succeeded,omitempty"`
Failed *int32 `json:"failed,omitempty"`
RayClusterStatus *RayClusterStatusApplyConfiguration `json:"rayClusterStatus,omitempty"`
@@ -112,6 +113,14 @@ func (b *RayJobStatusApplyConfiguration) WithEndTime(value metav1.Time) *RayJobS
return b
}
+// WithLastScheduleTime sets the LastScheduleTime field in the declarative configuration to the given value
+// and returns the receiver, so that objects can be built by chaining "With" function invocations.
+// If called multiple times, the LastScheduleTime field is set to the value of the last call.
+func (b *RayJobStatusApplyConfiguration) WithLastScheduleTime(value metav1.Time) *RayJobStatusApplyConfiguration {
+ b.LastScheduleTime = &value
+ return b
+}
+
// WithSucceeded sets the Succeeded field in the declarative configuration to the given value
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
// If called multiple times, the Succeeded field is set to the value of the last call.