From 9e9c2fab9ba3efbd6fa75f58090db6e4ad3984d4 Mon Sep 17 00:00:00 2001 From: JosefNagelschmidt Date: Thu, 11 Sep 2025 13:50:21 +0200 Subject: [PATCH] avoid race in jodId --- kubectl-plugin/pkg/cmd/job/job_submit.go | 45 +++++++++--------------- 1 file changed, 16 insertions(+), 29 deletions(-) diff --git a/kubectl-plugin/pkg/cmd/job/job_submit.go b/kubectl-plugin/pkg/cmd/job/job_submit.go index 189cbee82c4..bf563ec5553 100644 --- a/kubectl-plugin/pkg/cmd/job/job_submit.go +++ b/kubectl-plugin/pkg/cmd/job/job_submit.go @@ -324,6 +324,16 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor return fmt.Errorf("failed to initialize clientset: %w", err) } + // If submission ID is not provided by the user, generate one. + if options.submissionID == "" { + generatedID, err := generateSubmissionID() + if err != nil { + return fmt.Errorf("failed to generate submission ID: %w", err) + } + options.submissionID = generatedID + fmt.Printf("Generated submission ID for Ray job: %s\n", options.submissionID) + } + if options.fileName == "" { // Genarate the Ray job. rayJobObject := generation.RayJobYamlObject{ @@ -358,6 +368,7 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor }, } rayJobApplyConfig := rayJobObject.GenerateRayJobApplyConfig() + rayJobApplyConfig.Spec.JobId = &options.submissionID // Print out the yaml if it is a dry run if options.dryRun { @@ -378,6 +389,7 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor options.RayJob = &rayv1.RayJob{} options.RayJob.SetName(rayJobApplyConfigResult.Name) } else { + options.RayJob.Spec.JobId = options.submissionID options.RayJob, err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Create(ctx, options.RayJob, v1.CreateOptions{FieldManager: util.FieldManager}) if err != nil { return fmt.Errorf("Error when creating RayJob CR: %w", err) @@ -486,16 +498,6 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor fmt.Printf("Using address %s (no port-forwarding)\n", options.address) } - // If submission ID is not provided by the user, generate one. - if options.submissionID == "" { - generatedID, err := generateSubmissionID() - if err != nil { - return fmt.Errorf("failed to generate submission ID: %w", err) - } - options.submissionID = generatedID - fmt.Printf("Generated submission ID for Ray job: %s\n", options.submissionID) - } - // Submitting ray job to cluster raySubmitCmd, err := options.raySubmitCmd() if err != nil { @@ -514,13 +516,10 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor return fmt.Errorf("Error while setting up `ray job submit` stderr: %w", err) } - go func() { - fmt.Printf("Running Ray submit job command...\n") - err := cmd.Start() - if err != nil { - log.Fatalf("error occurred while running command %s: %v", fmt.Sprint(raySubmitCmd), err) - } - }() + fmt.Printf("Running Ray submit job command...\n") + if err := cmd.Start(); err != nil { + log.Fatalf("error occurred while running command %s: %v", fmt.Sprint(raySubmitCmd), err) + } rayJobID := options.submissionID @@ -551,18 +550,6 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor } }() - // Add annotation to RayJob with the correct Ray job ID and update the CR - options.RayJob, err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Get(ctx, options.RayJob.GetName(), v1.GetOptions{}) - if err != nil { - return fmt.Errorf("Failed to get latest version of Ray job: %w", err) - } - options.RayJob.Spec.JobId = rayJobID - - _, err = k8sClients.RayClient().RayV1().RayJobs(options.namespace).Update(ctx, options.RayJob, v1.UpdateOptions{FieldManager: util.FieldManager}) - if err != nil { - return fmt.Errorf("Error occurred when trying to add job ID to RayJob: %w", err) - } - // Wait for Ray job submit to finish. err = cmd.Wait() if err != nil {