Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ _Appears in:_
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
| `schedule` _string_ | Schedule specifies a cron like string for scheduling Ray jobs.<br />When shutdownAfterJobFinishes is set to true, a new cluster is provisioned<br />per scheduled job, otherwise the job is scheduled on an existing cluster. | | |
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.<br />It's only working when ShutdownAfterJobFinishes set to true. | 0 | |
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ const (
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
JobDeploymentStatusScheduling JobDeploymentStatus = "Scheduling"
JobDeploymentStatusScheduled JobDeploymentStatus = "Scheduled"
)

// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
Expand Down Expand Up @@ -183,11 +181,6 @@ type RayJobSpec struct {
// entrypoint command.
// +optional
EntrypointResources string `json:"entrypointResources,omitempty"`
// Schedule specifies a cron like string for scheduling Ray jobs.
// When shutdownAfterJobFinishes is set to true, a new cluster is provisioned
// per scheduled job, otherwise the job is scheduled on an existing cluster.
// +optional
Schedule string `json:"schedule,omitempty"`
// EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command.
// +optional
EntrypointNumCpus float32 `json:"entrypointNumCpus,omitempty"`
Expand Down Expand Up @@ -240,9 +233,6 @@ type RayJobStatus struct {
// or the submitter Job has failed.
// +optional
EndTime *metav1.Time `json:"endTime,omitempty"`
// lastScheduledTime is the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
// Succeeded is the number of times this job succeeded.
// +kubebuilder:default:=0
// +optional
Expand Down
4 changes: 0 additions & 4 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 0 additions & 121 deletions ray-operator/config/samples/ray-job.schedule.yaml

This file was deleted.

74 changes: 0 additions & 74 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -35,8 +34,6 @@ const (
RayJobDefaultRequeueDuration = 3 * time.Second
RayJobDefaultClusterSelectorKey = "ray.io/cluster"
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
// The buffer period in which a scheduled rajob can run since the last cron tick
ScheduleBuffer = 100 * time.Millisecond
)

// RayJobReconciler reconciles a RayJob object
Expand Down Expand Up @@ -171,11 +168,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
// We check the LastScheduleTime to know if its the first job
if rayJobInstance.Spec.Schedule != "" && rayJobInstance.Status.LastScheduleTime == nil {
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
break
}
// Set `Status.JobDeploymentStatus` to `JobDeploymentStatusInitializing`, and initialize `Status.JobId`
// and `Status.RayClusterName` prior to avoid duplicate job submissions and cluster creations.
logger.Info("JobDeploymentStatusNew")
Expand Down Expand Up @@ -457,58 +449,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
if rayJobInstance.Spec.Schedule != "" {
logger.Info("Rescheduling RayJob")
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduling
break
}

// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
case rayv1.JobDeploymentStatusScheduling:
isJobDeleted, err := r.deleteSubmitterJob(ctx, rayJobInstance)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

if !isJobDeleted {
logger.Info("The release of the compute resources has not been completed yet. " +
"Wait for the resources to be deleted before the status transitions to avoid a resource leak.")
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

if rayJobInstance.Spec.ShutdownAfterJobFinishes {
rayJobInstance.Status.RayClusterStatus = rayv1.RayClusterStatus{}
rayJobInstance.Status.RayClusterName = ""

}
rayJobInstance.Status.DashboardURL = ""
rayJobInstance.Status.JobId = ""
rayJobInstance.Status.Message = ""
rayJobInstance.Status.Reason = ""
rayJobInstance.Status.RayJobStatusInfo = rayv1.RayJobStatusInfo{}

rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
case rayv1.JobDeploymentStatusScheduled:
// We get the time from the current time to the previous and next cron schedule times
// We pass in time.Now() as a parameter so easier unit testing and consistency
t1, t2, err := r.getNextAndPreviousScheduleDistance(ctx, time.Now(), rayJobInstance)
if err != nil {
logger.Error(err, "Could not get the previous and next distances for a cron schedule")
return ctrl.Result{}, err
}
// Checking if we are currently within a buffer to the previous cron schedule time
if t2 <= ScheduleBuffer {
logger.Info("The current time is within the buffer window of a cron tick", "NextScheduleTimeDuration", t1, "LastScheduleTimeDuration", t2, "Previous LastScheduleTime", rayJobInstance.Status.LastScheduleTime)
rayJobInstance.Status.LastScheduleTime = &metav1.Time{Time: time.Now()}
rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusNew
} else {
logger.Info("Waiting until the next reconcile to determine schedule", "nextScheduleDuration", t1, "currentTime", time.Now(), "lastScheduleTimeDuration", t2)
return ctrl.Result{RequeueAfter: t1}, nil
}

default:
logger.Info("Unknown JobDeploymentStatus", "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
Expand Down Expand Up @@ -951,23 +894,6 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
return rayCluster, nil
}

func (r *RayJobReconciler) getNextAndPreviousScheduleDistance(ctx context.Context, currentTime time.Time, rayJobInstance *rayv1.RayJob) (time.Duration, time.Duration, error) {
logger := ctrl.LoggerFrom(ctx)
formatedCron := utils.FormatSchedule(rayJobInstance, r.Recorder)
cronSchedule, err := cron.ParseStandard(formatedCron)
if err != nil {
// this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", rayJobInstance.Spec.Schedule, err)
return 0, 0, fmt.Errorf("the cron schedule provided is unparseable: %w", err)
}

t1 := utils.NextScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
t2 := utils.LastScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)

return t1, t2, nil
}

func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
logger := ctrl.LoggerFrom(ctx)
if !rayJob.Spec.Suspend {
Expand Down
Loading
Loading