Skip to content

Commit 4e4bc57

Browse files
committed
Merge remote-tracking branch 'upstream/master' into move-unmarshall-envyaml
2 parents e9d9f0d + a614b1d commit 4e4bc57

File tree

18 files changed

+379
-179
lines changed

18 files changed

+379
-179
lines changed

apiserver/pkg/server/ray_job_submission_service_server.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"sigs.k8s.io/yaml"
1919

2020
api "github.com/ray-project/kuberay/proto/go_client"
21+
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2122
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2223
)
2324

@@ -31,7 +32,7 @@ type RayJobSubmissionServiceServer struct {
3132
api.UnimplementedRayJobSubmissionServiceServer
3233
options *RayJobSubmissionServiceServerOptions
3334
clusterServer *ClusterServer
34-
dashboardClientFunc func() utils.RayDashboardClientInterface
35+
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
3536
log logr.Logger
3637
}
3738

@@ -49,9 +50,8 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
4950
if err != nil {
5051
return nil, err
5152
}
52-
rayDashboardClient := s.dashboardClientFunc()
53-
// TODO: support proxy subresources in kuberay-apiserver
54-
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
53+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
54+
if err != nil {
5555
return nil, err
5656
}
5757
request := &utils.RayJobRequest{Entrypoint: req.Jobsubmission.Entrypoint}
@@ -104,9 +104,8 @@ func (s *RayJobSubmissionServiceServer) GetJobDetails(ctx context.Context, req *
104104
if err != nil {
105105
return nil, err
106106
}
107-
rayDashboardClient := s.dashboardClientFunc()
108-
// TODO: support proxy subresources in kuberay-apiserver
109-
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
107+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
108+
if err != nil {
110109
return nil, err
111110
}
112111
nodeInfo, err := rayDashboardClient.GetJobInfo(ctx, req.Submissionid)
@@ -127,9 +126,8 @@ func (s *RayJobSubmissionServiceServer) GetJobLog(ctx context.Context, req *api.
127126
if err != nil {
128127
return nil, err
129128
}
130-
rayDashboardClient := s.dashboardClientFunc()
131-
// TODO: support proxy subresources in kuberay-apiserver
132-
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
129+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
130+
if err != nil {
133131
return nil, err
134132
}
135133
jlog, err := rayDashboardClient.GetJobLog(ctx, req.Submissionid)
@@ -150,9 +148,8 @@ func (s *RayJobSubmissionServiceServer) ListJobDetails(ctx context.Context, req
150148
if err != nil {
151149
return nil, err
152150
}
153-
rayDashboardClient := s.dashboardClientFunc()
154-
// TODO: support proxy subresources in kuberay-apiserver
155-
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
151+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
152+
if err != nil {
156153
return nil, err
157154
}
158155
nodesInfo, err := rayDashboardClient.ListJobs(ctx)
@@ -174,9 +171,8 @@ func (s *RayJobSubmissionServiceServer) StopRayJob(ctx context.Context, req *api
174171
if err != nil {
175172
return nil, err
176173
}
177-
rayDashboardClient := s.dashboardClientFunc()
178-
// TODO: support proxy subresources in kuberay-apiserver
179-
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
174+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
175+
if err != nil {
180176
return nil, err
181177
}
182178
err = rayDashboardClient.StopJob(ctx, req.Submissionid)
@@ -194,9 +190,8 @@ func (s *RayJobSubmissionServiceServer) DeleteRayJob(ctx context.Context, req *a
194190
if err != nil {
195191
return nil, err
196192
}
197-
rayDashboardClient := s.dashboardClientFunc()
198-
// TODO: support proxy subresources in kuberay-apiserver
199-
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
193+
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
194+
if err != nil {
200195
return nil, err
201196
}
202197
err = rayDashboardClient.DeleteJob(ctx, req.Submissionid)

kubectl-plugin/pkg/cmd/job/job_submit.go

Lines changed: 65 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
const (
3636
dashboardAddr = "http://localhost:8265"
3737
clusterTimeout = 120.0
38-
portforwardtimeout = 60.0
38+
portForwardTimeout = 60.0
3939
)
4040

4141
type SubmitJobOptions struct {
@@ -45,6 +45,7 @@ type SubmitJobOptions struct {
4545
workerNodeSelectors map[string]string
4646
headNodeSelectors map[string]string
4747
logColor string
48+
address string
4849
image string
4950
fileName string
5051
workingDir string
@@ -86,8 +87,9 @@ type JobInfo struct {
8687

8788
var (
8889
jobSubmitLong = templates.LongDesc(`
89-
Submit Ray job to Ray cluster as one would using Ray CLI e.g. 'ray job submit ENTRYPOINT'. Command supports all options that 'ray job submit' supports, except '--address'.
90-
If Ray cluster is already setup, use 'kubectl ray session' instead.
90+
Submit Ray job to Ray cluster as one would using Ray CLI e.g. 'ray job submit ENTRYPOINT'.
91+
If Ray cluster is already setup, use 'kubectl ray session' instead. If '--address' is set, we connect directly
92+
without port-forwarding; if empty, we port-forward to localhost:8265.
9193
9294
If no RayJob YAML file is specified, the command will create a default RayJob for the user.
9395
@@ -149,6 +151,7 @@ func NewJobSubmitCommand(cmdFactory cmdutil.Factory, streams genericclioptions.I
149151
},
150152
}
151153
cmd.Flags().StringVarP(&options.fileName, "filename", "f", "", "Path and name of the Ray Job YAML file")
154+
cmd.Flags().StringVar(&options.address, "address", "", "Ray Dashboard base URL (e.g., https://ray.example.com). If set, skips port-forwarding.")
152155
cmd.Flags().StringVar(&options.submissionID, "submission-id", "", "ID to specify for the Ray job. If not provided, one will be generated")
153156
cmd.Flags().StringVar(&options.runtimeEnv, "runtime-env", "", "Path and name to the runtime env YAML file.")
154157
cmd.Flags().StringVar(&options.workingDir, "working-dir", "", "Directory containing files that your job will run in")
@@ -307,6 +310,11 @@ func (options *SubmitJobOptions) Validate(cmd *cobra.Command) error {
307310
}
308311
}
309312

313+
if cmd.Flags().Changed("address") {
314+
if strings.TrimSpace(options.address) == "" {
315+
return fmt.Errorf("--address was provided but is empty")
316+
}
317+
}
310318
return nil
311319
}
312320

@@ -421,55 +429,62 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
421429
return fmt.Errorf("Timed out waiting for cluster")
422430
}
423431

424-
svcName, err := k8sClients.GetRayHeadSvcName(ctx, options.namespace, util.RayCluster, options.cluster)
425-
if err != nil {
426-
return fmt.Errorf("Failed to find service name: %w", err)
427-
}
428-
429-
// start port forward section
430-
portForwardCmd := portforward.NewCmdPortForward(factory, *options.ioStreams)
431-
portForwardCmd.SetArgs([]string{"service/" + svcName, fmt.Sprintf("%d:%d", 8265, 8265)})
432-
433-
// create new context for port-forwarding so we can cancel the context to stop the port forwarding only
434-
portforwardctx, cancel := context.WithCancel(ctx)
435-
defer cancel()
436-
go func() {
437-
fmt.Printf("Port Forwarding service %s\n", svcName)
438-
if err := portForwardCmd.ExecuteContext(portforwardctx); err != nil {
439-
log.Fatalf("Error occurred while port-forwarding Ray dashboard: %v", err)
432+
if options.address == "" {
433+
svcName, err := k8sClients.GetRayHeadSvcName(ctx, options.namespace, util.RayCluster, options.cluster)
434+
if err != nil {
435+
return fmt.Errorf("Failed to find service name: %w", err)
440436
}
441-
}()
442437

443-
// Wait for port forward to be ready
444-
var portforwardReady bool
445-
portforwardWaitStartTime := time.Now()
446-
currTime = portforwardWaitStartTime
438+
// start port forward section
439+
portForwardCmd := portforward.NewCmdPortForward(factory, *options.ioStreams)
440+
portForwardCmd.SetArgs([]string{"service/" + svcName, fmt.Sprintf("%d:%d", 8265, 8265)})
441+
442+
// create new context for port-forwarding so we can cancel the context to stop the port forwarding only
443+
portForwardCtx, cancel := context.WithCancel(ctx)
444+
defer cancel()
445+
go func() {
446+
fmt.Printf("Port forwarding service %s\n", svcName)
447+
if err := portForwardCmd.ExecuteContext(portForwardCtx); err != nil {
448+
log.Fatalf("Error occurred while port-forwarding Ray dashboard: %v", err)
449+
}
450+
}()
447451

448-
portforwardCheckRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, dashboardAddr, nil)
449-
if err != nil {
450-
return fmt.Errorf("Error occurred when trying to create request to probe cluster endpoint: %w", err)
451-
}
452-
httpClient := http.Client{
453-
Timeout: 5 * time.Second,
454-
}
455-
fmt.Printf("Waiting for portforwarding...")
456-
for !portforwardReady && currTime.Sub(portforwardWaitStartTime).Seconds() <= portforwardtimeout {
457-
time.Sleep(2 * time.Second)
458-
rayDashboardResponse, err := httpClient.Do(portforwardCheckRequest)
452+
// Wait for port forward to be ready
453+
var portForwardReady bool
454+
portForwardWaitStartTime := time.Now()
455+
currTime = portForwardWaitStartTime
456+
457+
portforwardCheckRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, dashboardAddr, nil)
459458
if err != nil {
460-
err = fmt.Errorf("Error occurred when waiting for portforwarding: %w", err)
461-
fmt.Println(err)
459+
return fmt.Errorf("Error occurred when trying to create request to probe cluster endpoint: %w", err)
462460
}
463-
if rayDashboardResponse.StatusCode >= 200 && rayDashboardResponse.StatusCode < 300 {
464-
portforwardReady = true
461+
httpClient := http.Client{
462+
Timeout: 5 * time.Second,
465463
}
466-
rayDashboardResponse.Body.Close()
467-
currTime = time.Now()
468-
}
469-
if !portforwardReady {
470-
return fmt.Errorf("Timed out waiting for port forwarding")
464+
fmt.Printf("Waiting for port forwarding...")
465+
for !portForwardReady && currTime.Sub(portForwardWaitStartTime).Seconds() <= portForwardTimeout {
466+
time.Sleep(2 * time.Second)
467+
rayDashboardResponse, err := httpClient.Do(portforwardCheckRequest)
468+
if err != nil {
469+
err = fmt.Errorf("Error occurred when waiting for port forwarding: %w", err)
470+
fmt.Println(err)
471+
currTime = time.Now()
472+
continue
473+
}
474+
if rayDashboardResponse.StatusCode >= 200 && rayDashboardResponse.StatusCode < 300 {
475+
portForwardReady = true
476+
}
477+
rayDashboardResponse.Body.Close()
478+
currTime = time.Now()
479+
}
480+
if !portForwardReady {
481+
return fmt.Errorf("Timed out waiting for port forwarding")
482+
}
483+
options.address = dashboardAddr
484+
fmt.Printf("Port forwarding started on %s\n", options.address)
485+
} else {
486+
fmt.Printf("Using address %s (no port-forwarding)\n", options.address)
471487
}
472-
fmt.Printf("Portforwarding started on %s\n", dashboardAddr)
473488

474489
// If submission ID is not provided by the user, generate one.
475490
if options.submissionID == "" {
@@ -608,7 +623,11 @@ func (options *SubmitJobOptions) Run(ctx context.Context, factory cmdutil.Factor
608623
}
609624

610625
func (options *SubmitJobOptions) raySubmitCmd() ([]string, error) {
611-
raySubmitCmd := []string{"ray", "job", "submit", "--address", dashboardAddr}
626+
addr := options.address
627+
if addr == "" {
628+
addr = dashboardAddr
629+
}
630+
raySubmitCmd := []string{"ray", "job", "submit", "--address", addr}
612631

613632
if len(options.runtimeEnv) > 0 {
614633
raySubmitCmd = append(raySubmitCmd, "--runtime-env", options.runtimeEnv)

kubectl-plugin/pkg/cmd/job/job_submit_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,66 @@ func TestRayJobSubmitWithoutYamlValidate(t *testing.T) {
195195
}
196196
}
197197

198+
func TestRayJobSubmit_AddressValidation(t *testing.T) {
199+
testStreams, _, _, _ := genericclioptions.NewTestIOStreams()
200+
cmdFactory := cmdutil.NewFactory(genericclioptions.NewConfigFlags(true))
201+
202+
test := []struct {
203+
name string
204+
address string
205+
expectError string
206+
expectNormalized string
207+
flagChanged bool
208+
}{
209+
{
210+
name: "address flag not set: port-forward mode",
211+
address: "",
212+
flagChanged: false,
213+
},
214+
{
215+
name: "address flag set but empty: error",
216+
address: "",
217+
flagChanged: true,
218+
expectError: "--address was provided but is empty",
219+
},
220+
{
221+
name: "valid https address: OK",
222+
address: "https://ingress.example.com",
223+
flagChanged: true,
224+
},
225+
}
226+
227+
for _, tc := range test {
228+
t.Run(tc.name, func(t *testing.T) {
229+
opts := &SubmitJobOptions{
230+
cmdFactory: cmdFactory,
231+
ioStreams: &testStreams,
232+
rayjobName: "rayjob-sample",
233+
workingDir: "Fake/File/Path",
234+
}
235+
236+
cmd := &cobra.Command{}
237+
cmd.Flags().StringVar(&opts.address, "address", "", "Ray Dashboard base URL")
238+
239+
if tc.flagChanged {
240+
require.NoError(t, cmd.Flags().Set("address", tc.address))
241+
} else {
242+
opts.address = tc.address
243+
}
244+
245+
err := opts.Validate(cmd)
246+
if tc.expectError != "" {
247+
require.EqualError(t, err, tc.expectError)
248+
} else {
249+
require.NoError(t, err)
250+
if tc.expectNormalized != "" {
251+
require.Equal(t, tc.expectNormalized, opts.address)
252+
}
253+
}
254+
})
255+
}
256+
}
257+
198258
func TestRayJobSubmitCmdFlagsOverrideYaml(t *testing.T) {
199259
testStreams, _, _, _ := genericclioptions.NewTestIOStreams()
200260
cmdFactory := cmdutil.NewFactory(genericclioptions.NewConfigFlags(true))
@@ -438,6 +498,7 @@ func TestRaySubmitCmd(t *testing.T) {
438498
fakeSubmitJobOptions.verify = "True"
439499
fakeSubmitJobOptions.workingDir = "/fake/working/dir"
440500
fakeSubmitJobOptions.entryPoint = "python fake_python_script.py"
501+
fakeSubmitJobOptions.address = dashboardAddr
441502

442503
actualCmd, err := fakeSubmitJobOptions.raySubmitCmd()
443504
require.NoError(t, err)
@@ -489,3 +550,51 @@ func TestRayJobSubmit_FlagsHaveDefaults(t *testing.T) {
489550
assert.Equal(t, 0, opts.entryPointMemory, "default entrypoint-memory should be 0")
490551
assert.False(t, opts.noWait, "default no-wait should be false")
491552
}
553+
554+
func TestRaySubmitCmd_AddressSelection(t *testing.T) {
555+
streams, _, _, _ := genericclioptions.NewTestIOStreams()
556+
factory := cmdutil.NewFactory(genericclioptions.NewConfigFlags(true))
557+
558+
makeCmd := func(addr string) ([]string, error) {
559+
opts := NewJobSubmitOptions(factory, streams)
560+
opts.workingDir = "/fake/working/dir"
561+
opts.entryPoint = "python fake.py"
562+
opts.address = addr
563+
return opts.raySubmitCmd()
564+
}
565+
566+
tests := []struct {
567+
name string
568+
address string
569+
expectedAddr string
570+
}{
571+
{
572+
name: "no address provided: falls back to dashboardAddr",
573+
address: "",
574+
expectedAddr: dashboardAddr,
575+
},
576+
{
577+
name: "custom address provided: uses custom",
578+
address: "https://ingress.example.com",
579+
expectedAddr: "https://ingress.example.com",
580+
},
581+
}
582+
583+
for _, tc := range tests {
584+
t.Run(tc.name, func(t *testing.T) {
585+
cmd, err := makeCmd(tc.address)
586+
require.NoError(t, err)
587+
588+
require.GreaterOrEqual(t, len(cmd), 5, "command too short")
589+
assert.Equal(t, "ray", cmd[0])
590+
assert.Equal(t, "job", cmd[1])
591+
assert.Equal(t, "submit", cmd[2])
592+
593+
assert.Equal(t, "--address", cmd[3])
594+
assert.Equal(t, tc.expectedAddr, cmd[4])
595+
596+
require.Contains(t, cmd, "--working-dir")
597+
require.Contains(t, cmd, "--")
598+
})
599+
}
600+
}

0 commit comments

Comments
 (0)