Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
cb6f1ea
feat: extract compute template from spec
machichima Aug 15, 2025
2edd8f0
Merge branch 'master' of github.com:ray-project/kuberay into apiserve…
machichima Aug 18, 2025
7dd5a70
feat: add compute template middleware
machichima Aug 18, 2025
7106c01
fix: extract cluster spec from request correctly
machichima Aug 18, 2025
7cc1d4e
Merge branch 'master' of github.com:ray-project/kuberay into apiserve…
machichima Aug 20, 2025
4417877
feat: convert request to map then to clusterSpec
machichima Aug 20, 2025
c8a8f78
feat: parse anno/cpu/mem from compute template to request
machichima Aug 20, 2025
3dc557c
fix: get compute template name without converting to ClusterSpec struct
machichima Aug 21, 2025
c4fc102
feat: apply gpu/extend/toleration
machichima Aug 21, 2025
fa7eae8
feat: forward modified request body
machichima Aug 21, 2025
0f05fed
feat: deal with case with no head/workerGroupSpec set
machichima Aug 21, 2025
ce17d2e
feat: also support for rayjob and rayservice
machichima Aug 21, 2025
1f8db4c
test: structure for test compute template middleware
machichima Aug 27, 2025
062d602
fix: update field and match types
machichima Aug 27, 2025
d6e2cb5
Merge branch 'master' of github.com:ray-project/kuberay into apiserve…
machichima Sep 1, 2025
abeaa5d
feat: use remote execute for apiserver sdk e2e test
machichima Sep 1, 2025
b17b879
feat: pass through if no spec & convert to json
machichima Sep 1, 2025
740e8f4
feat: check request body type for unmarshal
machichima Sep 1, 2025
20efd19
test: enable setitng content type in execCommandWithCurlInPod
machichima Sep 5, 2025
b796540
Trigger CI
machichima Sep 5, 2025
7695daf
fix: extract client manager for mocking
machichima Sep 7, 2025
39e8919
Trigger CI
machichima Sep 7, 2025
9c418f5
fix: add missing methods & clean up code
machichima Sep 7, 2025
0af361e
refactor: clean up print
machichima Sep 7, 2025
44213ea
build: go mod tidy
machichima Sep 7, 2025
4ba14e2
Trigger CI
machichima Sep 7, 2025
f772cd5
Trigger CI
machichima Sep 7, 2025
edc99ad
refactor: remove unused args
machichima Sep 11, 2025
8ad0573
refactor: interface to any
machichima Sep 11, 2025
559f2ae
refactor: remove redundant else block
machichima Sep 11, 2025
65bbc6a
Merge branch 'master' of github.com:ray-project/kuberay into apiserve…
machichima Sep 12, 2025
967c314
refactor: status code 500 to 422
machichima Sep 15, 2025
b56867d
feat: support memory unit
machichima Sep 15, 2025
d9a9e2c
Merge branch 'master' of github.com:ray-project/kuberay into apiserve…
machichima Sep 15, 2025
bee12d0
Merge branch 'master' of github.com:ray-project/kuberay into apiserve…
machichima Sep 23, 2025
9561d29
fix: use ProxyRoundTripper is apiserver client & remove remote execut…
machichima Sep 23, 2025
f4df39c
fix: base url use kubernetesConfig.Host
machichima Sep 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apiserver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ func startHttpProxy() {
KubernetesConfig: kubernetesConfig,
Middleware: corsHandler, // Always set, even if it's a no-op
}

topMux, err = apiserversdk.NewMux(muxConfig)
clientManager := manager.NewClientManager()
topMux, err = apiserversdk.NewMux(muxConfig, &clientManager)
if err != nil {
klog.Fatalf("Failed to create API server mux: %v", err)
}
Expand Down
52 changes: 26 additions & 26 deletions apiserver/pkg/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type KuberayAPIServerClient struct {
// See https://github.com/ray-project/kuberay/pull/3334/files#r2041183495 for details.
//
// Store http request handling function for unit test purpose.
executeHttpRequest func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)
ExecuteHttpRequest func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this public so that we can use in apiserversdk

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@machichima, can we revert this now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseURL string
retryCfg apiserversdkutil.RetryConfig
}
Expand Down Expand Up @@ -68,13 +68,13 @@ func NewKuberayAPIServerClient(baseURL string, httpClient *http.Client, retryCfg
},
retryCfg: retryCfg,
}
client.executeHttpRequest = client.executeRequest
client.ExecuteHttpRequest = client.executeRequest
return client
}

// Setter function for setting executeHttpRequest method
func (krc *KuberayAPIServerClient) SetExecuteHttpRequest(fn func(httpRequest *http.Request, URL string) ([]byte, *rpcStatus.Status, error)) {
krc.executeHttpRequest = fn
krc.ExecuteHttpRequest = fn
}

// CreateComputeTemplate creates a new compute template.
Expand All @@ -94,7 +94,7 @@ func (krc *KuberayAPIServerClient) CreateComputeTemplate(request *api.CreateComp
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
if err != nil {
return nil, status, err
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (krc *KuberayAPIServerClient) GetComputeTemplate(request *api.GetComputeTem

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -143,7 +143,7 @@ func (krc *KuberayAPIServerClient) GetAllComputeTemplates() (*api.ListAllCompute

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -164,7 +164,7 @@ func (krc *KuberayAPIServerClient) GetAllComputeTemplatesInNamespace(request *ap

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func (krc *KuberayAPIServerClient) CreateCluster(request *api.CreateClusterReque
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
if err != nil {
return nil, status, err
}
Expand All @@ -219,7 +219,7 @@ func (krc *KuberayAPIServerClient) GetCluster(request *api.GetClusterRequest) (*

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -245,7 +245,7 @@ func (krc *KuberayAPIServerClient) ListClusters(request *api.ListClustersRequest

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -271,7 +271,7 @@ func (krc *KuberayAPIServerClient) ListAllClusters(request *api.ListAllClustersR

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -298,7 +298,7 @@ func (krc *KuberayAPIServerClient) CreateRayJob(request *api.CreateRayJobRequest
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
if err != nil {
return nil, status, err
}
Expand All @@ -319,7 +319,7 @@ func (krc *KuberayAPIServerClient) GetRayJob(request *api.GetRayJobRequest) (*ap

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -345,7 +345,7 @@ func (krc *KuberayAPIServerClient) ListRayJobs(request *api.ListRayJobsRequest)

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -370,7 +370,7 @@ func (krc *KuberayAPIServerClient) ListAllRayJobs(request *api.ListAllRayJobsReq
httpRequest.URL.RawQuery = q.Encode()
httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand Down Expand Up @@ -403,7 +403,7 @@ func (krc *KuberayAPIServerClient) CreateRayService(request *api.CreateRayServic
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
if err != nil {
return nil, status, err
}
Expand All @@ -430,7 +430,7 @@ func (krc *KuberayAPIServerClient) UpdateRayService(request *api.UpdateRayServic
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, updateURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, updateURL)
if err != nil {
return nil, status, err
}
Expand All @@ -451,7 +451,7 @@ func (krc *KuberayAPIServerClient) GetRayService(request *api.GetRayServiceReque

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -476,7 +476,7 @@ func (krc *KuberayAPIServerClient) ListRayServices(request *api.ListRayServicesR
httpRequest.URL.RawQuery = q.Encode()
httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -502,7 +502,7 @@ func (krc *KuberayAPIServerClient) ListAllRayServices(request *api.ListAllRaySer
httpRequest.URL.RawQuery = q.Encode()
httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand Down Expand Up @@ -535,7 +535,7 @@ func (krc *KuberayAPIServerClient) SubmitRayJob(request *api.SubmitRayJobRequest
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, createURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
if err != nil {
return nil, status, err
}
Expand All @@ -556,7 +556,7 @@ func (krc *KuberayAPIServerClient) GetRayJobDetails(request *api.GetJobDetailsRe

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -577,7 +577,7 @@ func (krc *KuberayAPIServerClient) GetRayJobLog(request *api.GetJobLogRequest) (

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -598,7 +598,7 @@ func (krc *KuberayAPIServerClient) ListRayJobsCluster(request *api.ListJobDetail

httpRequest.Header.Add("Accept", "application/json")

bodyBytes, status, err := krc.executeHttpRequest(httpRequest, getURL)
bodyBytes, status, err := krc.ExecuteHttpRequest(httpRequest, getURL)
if err != nil {
return nil, status, err
}
Expand All @@ -621,7 +621,7 @@ func (krc *KuberayAPIServerClient) StopRayJob(request *api.StopRayJobSubmissionR
httpRequest.Header.Add("Accept", "application/json")
httpRequest.Header.Add("Content-Type", "application/json")

_, status, err := krc.executeHttpRequest(httpRequest, createURL)
_, status, err := krc.ExecuteHttpRequest(httpRequest, createURL)
if err != nil {
return status, err
}
Expand All @@ -640,7 +640,7 @@ func (krc *KuberayAPIServerClient) doDelete(deleteURL string) (*rpcStatus.Status
return nil, fmt.Errorf("failed to create http request for url '%s': %w", deleteURL, err)
}
httpRequest.Header.Add("Accept", "application/json")
_, status, err := krc.executeHttpRequest(httpRequest, deleteURL)
_, status, err := krc.ExecuteHttpRequest(httpRequest, deleteURL)
return status, err
}

Expand Down
4 changes: 2 additions & 2 deletions apiserver/pkg/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestUnmarshalHttpResponseOK(t *testing.T) {
}

client := NewKuberayAPIServerClient("baseurl", nil /*httpClient*/, retryCfg)
client.executeHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
client.ExecuteHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
resp := &api.ListClustersResponse{
Clusters: []*api.Cluster{
{
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestUnmarshalHttpResponseFails(t *testing.T) {
}

client := NewKuberayAPIServerClient("baseurl", nil /*httpClient*/, retryCfg)
client.executeHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
client.ExecuteHttpRequest = func(_ *http.Request, _ string) ([]byte, *rpcStatus.Status, error) {
// Intentionall returning a bad response.
return []byte("helloworld"), nil, nil
}
Expand Down
14 changes: 7 additions & 7 deletions apiserver/pkg/manager/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *ResourceManager) getKubernetesNamespaceClient() clientv1.NamespaceInter
// clusters
func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Cluster) (*rayv1api.RayCluster, error) {
// populate cluster map
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiCluster.ClusterSpec, apiCluster.Namespace)
computeTemplateDict, err := r.PopulateComputeTemplate(ctx, apiCluster.ClusterSpec, apiCluster.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiCluster.Namespace, apiCluster.Name)
}
Expand All @@ -82,13 +82,13 @@ func (r *ResourceManager) CreateCluster(ctx context.Context, apiCluster *api.Clu
}

// Compute template
func (r *ResourceManager) populateComputeTemplate(ctx context.Context, clusterSpec *api.ClusterSpec, nameSpace string) (map[string]*api.ComputeTemplate, error) {
func (r *ResourceManager) PopulateComputeTemplate(ctx context.Context, clusterSpec *api.ClusterSpec, nameSpace string) (map[string]*api.ComputeTemplate, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this public so that we can use in apiserversdk

dict := map[string]*api.ComputeTemplate{}
// populate head compute template
name := clusterSpec.HeadGroupSpec.ComputeTemplate
configMap, err := r.GetComputeTemplate(ctx, name, nameSpace)
if err != nil {
return nil, err
return nil, fmt.Errorf("Cannot get compute template for name '%s' in namespace '%s', error: %w", name, nameSpace, err)
}
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
dict[name] = computeTemplate
Expand All @@ -99,7 +99,7 @@ func (r *ResourceManager) populateComputeTemplate(ctx context.Context, clusterSp
if _, exist := dict[name]; !exist {
configMap, err := r.GetComputeTemplate(ctx, name, nameSpace)
if err != nil {
return nil, err
return nil, fmt.Errorf("Cannot get compute template for name '%s' in namespace '%s', error: %w", name, nameSpace, err)
}
computeTemplate := model.FromKubeToAPIComputeTemplate(configMap)
dict[name] = computeTemplate
Expand Down Expand Up @@ -160,7 +160,7 @@ func (r *ResourceManager) CreateJob(ctx context.Context, apiJob *api.RayJob) (*r

// populate cluster map
if apiJob.ClusterSpec != nil {
computeTemplateMap, err = r.populateComputeTemplate(ctx, apiJob.ClusterSpec, apiJob.Namespace)
computeTemplateMap, err = r.PopulateComputeTemplate(ctx, apiJob.ClusterSpec, apiJob.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiJob.Namespace, apiJob.JobId)
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func (r *ResourceManager) DeleteJob(ctx context.Context, jobName string, namespa

func (r *ResourceManager) CreateService(ctx context.Context, apiService *api.RayService) (*rayv1api.RayService, error) {
// populate cluster map
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
computeTemplateDict, err := r.PopulateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
}
Expand All @@ -254,7 +254,7 @@ func (r *ResourceManager) UpdateRayService(ctx context.Context, apiService *api.
return nil, util.Wrap(err, fmt.Sprintf("Update service fail, no service named: %s ", name))
}
// populate cluster map
computeTemplateDict, err := r.populateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
computeTemplateDict, err := r.PopulateComputeTemplate(ctx, apiService.ClusterSpec, apiService.Namespace)
if err != nil {
return nil, util.NewInternalServerError(err, "Failed to populate compute template for (%s/%s)", apiService.Namespace, apiService.Name)
}
Expand Down
2 changes: 1 addition & 1 deletion apiserver/pkg/manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestPopulateComputeTemplate(t *testing.T) {

// Run
resourceManager := NewResourceManager(mockClientManager)
computeTemplates, err := resourceManager.populateComputeTemplate(ctx, clusterSpec, namespace)
computeTemplates, err := resourceManager.PopulateComputeTemplate(ctx, clusterSpec, namespace)

// Assert
require.NoError(t, err)
Expand Down
Loading
Loading