Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 16 additions & 29 deletions kubectl-plugin/pkg/cmd/job/job_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,16 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
return fmt.Errorf("failed to initialize clientset: %w", err)
}

// If submission ID is not provided by the user, generate one.
if options.submissionID == "" {
generatedID, err := generateSubmissionID()
if err != nil {
return fmt.Errorf("failed to generate submission ID: %w", err)
}
options.submissionID = generatedID
fmt.Printf("Generated submission ID for Ray job: %s\n", options.submissionID)
}

if options.fileName == "" {
// Genarate the Ray job.
rayJobObject := generation.RayJobYamlObject{
Expand Down Expand Up @@ -358,6 +368,7 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
},
}
rayJobApplyConfig := rayJobObject.GenerateRayJobApplyConfig()
rayJobApplyConfig.Spec.JobId = &options.submissionID

// Print out the yaml if it is a dry run
if options.dryRun {
Expand All @@ -378,6 +389,7 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
options.RayJob = &rayv1.RayJob{}
options.RayJob.SetName(rayJobApplyConfigResult.Name)
} else {
options.RayJob.Spec.JobId = options.submissionID
options.RayJob, err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Create(ctx, options.RayJob, v1.CreateOptions{FieldManager: util.FieldManager})
if err != nil {
return fmt.Errorf("Error when creating RayJob CR: %w", err)
Expand Down Expand Up @@ -486,16 +498,6 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
fmt.Printf("Using address %s (no port-forwarding)\n", options.address)
}

// If submission ID is not provided by the user, generate one.
if options.submissionID == "" {
generatedID, err := generateSubmissionID()
if err != nil {
return fmt.Errorf("failed to generate submission ID: %w", err)
}
options.submissionID = generatedID
fmt.Printf("Generated submission ID for Ray job: %s\n", options.submissionID)
}

// Submitting ray job to cluster
raySubmitCmd, err := options.raySubmitCmd()
if err != nil {
Expand All @@ -514,13 +516,10 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
return fmt.Errorf("Error while setting up `ray job submit` stderr: %w", err)
}

go func() {
fmt.Printf("Running Ray submit job command...\n")
err := cmd.Start()
if err != nil {
log.Fatalf("error occurred while running command %s: %v", fmt.Sprint(raySubmitCmd), err)
}
}()
fmt.Printf("Running Ray submit job command...\n")
if err := cmd.Start(); err != nil {
log.Fatalf("error occurred while running command %s: %v", fmt.Sprint(raySubmitCmd), err)
}

rayJobID := options.submissionID

Expand Down Expand Up @@ -551,18 +550,6 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
}
}()

// Add annotation to RayJob with the correct Ray job ID and update the CR
options.RayJob, err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Get(ctx, options.RayJob.GetName(), v1.GetOptions{})
if err != nil {
return fmt.Errorf("Failed to get latest version of Ray job: %w", err)
}
options.RayJob.Spec.JobId = rayJobID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a reason why we set the job id after creating the CR. Could you resolve the conflict of the object has been modified by retrying the update?

Copy link
Collaborator

@win5923 win5923 Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my thought:

  • Before this PR, when a user specified a submissionID, the RayJob CR had already been created without it. To keep the CR aligned with the actual Ray submission, we had to do a post-create Get/Update to set spec.jobId.
  • In this PR, we generate (or specified by user) the submissionID upfront and embed it into the RayJob before applying/creating it (e.g., via rayJobApplyConfig.Spec.JobId). This makes the submissionID ensures consistency, and removes the need for a follow-up Get/Update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a reason why we set the job id after creating the CR. Could you resolve the conflict of the object has been modified by retrying the update?

It is an option, but I wonder why this would be necessary (what is the exact reason for setting the job id after creating the CR? It seems to work seamlessly for me the other way). The suggested fix looks cleaner for me at least.


_, err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Update(ctx, options.RayJob, v1.UpdateOptions{FieldManager: util.FieldManager})
if err != nil {
return fmt.Errorf("Error occurred when trying to add job ID to RayJob: %w", err)
}

// Wait for Ray job submit to finish.
err = cmd.Wait()
if err != nil {
Expand Down
Loading