Skip to content

Integration: KAI Scheduler #3886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ logging:
# 4. Use PodGroup
# batchScheduler:
# name: scheduler-plugins
#

# 5. Use Kai Scheduler
# batchScheduler:
# name: kai-scheduler

batchScheduler:
# Deprecated. This option will be removed in the future.
# Note, for backwards compatibility. When it sets to true, it enables volcano scheduler integration.
enabled: false
# Set the customized scheduler name, supported values are "volcano", "yunikorn" or "scheduler-plugins", do not set
# Set the customized scheduler name, supported values are "volcano", "yunikorn", "kai-scheduler" or "scheduler-plugins", do not set
# "batchScheduler.enabled=true" at the same time as it will override this option.
name: ""

Expand Down
3 changes: 2 additions & 1 deletion ray-operator/apis/config/v1alpha1/config_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-logr/logr"

kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler"
schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
Expand All @@ -23,7 +24,7 @@ func ValidateBatchSchedulerConfig(logger logr.Logger, config Configuration) erro

if len(config.BatchScheduler) > 0 {
// if a customized scheduler is configured, check it is supported
if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() {
if config.BatchScheduler == volcano.GetPluginName() || config.BatchScheduler == yunikorn.GetPluginName() || config.BatchScheduler == schedulerplugins.GetPluginName() || config.BatchScheduler == kaischeduler.GetPluginName() {
logger.Info("Feature flag batch-scheduler is enabled",
"scheduler name", config.BatchScheduler)
} else {
Expand Down
11 changes: 11 additions & 0 deletions ray-operator/apis/config/v1alpha1/config_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"

kaischeduler "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/kai-scheduler"
schedulerPlugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
Expand Down Expand Up @@ -71,6 +72,16 @@ func TestValidateBatchSchedulerConfig(t *testing.T) {
},
wantErr: false,
},
{
name: "valid option, batch-scheduler=kai-scheduler",
args: args{
logger: testr.New(t),
config: Configuration{
BatchScheduler: kaischeduler.GetPluginName(),
},
},
wantErr: false,
},
{
name: "invalid option, invalid scheduler name",
args: args{
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Configuration struct {
LogStdoutEncoder string `json:"logStdoutEncoder,omitempty"`

// BatchScheduler enables the batch scheduler integration with a specific scheduler
// based on the given name, currently, supported values are volcano and yunikorn.
// based on the given name, currently, supported values are volcano, yunikorn, kai-scheduler.
BatchScheduler string `json:"batchScheduler,omitempty"`

// HeadSidecarContainers includes specification for a sidecar container
Expand Down
35 changes: 35 additions & 0 deletions ray-operator/config/samples/ray-cluster.kai-gpu-sharing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-half-gpu
labels:
kai.scheduler/queue: team-a
spec:
headGroupSpec:
template:
spec:
containers:
- name: head
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "2Gi"

# ---- Two workers share one GPU (0.5 each) ----
workerGroupSpecs:
- groupName: shared-gpu
replicas: 2
minReplicas: 2
template:
metadata:
annotations:
gpu-fraction: "0.5"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? Are you using DRA to mount the same GPU to two different Pods?

Additionally, do we need to specify GPUs in the resource requests and limits? If not, KubeRay won’t pass GPU information to Ray, and Ray will be unable to map physical GPU resources in Kubernetes to logical resources within Ray.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments for the KAI Scheduler–specific configuration so that users can understand what this YAML is for?

spec:
containers:
- name: worker
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "2Gi"
38 changes: 38 additions & 0 deletions ray-operator/config/samples/ray-cluster.kai-scheduler-queues.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: scheduling.run.ai/v2
Copy link
Contributor

@fscnick fscnick Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the queue definition using by other yaml files. Could the content of this file put into other yaml files which are using KAI together? It might benefit the end user to apply easily like other yaml files under samples.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way would work as long as we have a clear documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. If we can make it so that users only need to run a single file to follow the doc, we should do that to make the process less error-prone.

kind: Queue
metadata:
name: department-1
spec:
resources:
cpu:
quota: -1
limit: -1
overQuotaWeight: 1
gpu:
quota: -1
limit: -1
overQuotaWeight: 1
memory:
quota: -1
limit: -1
overQuotaWeight: 1
---
apiVersion: scheduling.run.ai/v2
kind: Queue
metadata:
name: team-a
spec:
parentQueue: department-1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do parentQueue, quota: -1, limit: -1, and overQuotaWeight: 1 mean?

resources:
cpu:
quota: -1
limit: -1
overQuotaWeight: 1
gpu:
quota: -1
limit: -1
overQuotaWeight: 1
memory:
quota: -1
limit: -1
overQuotaWeight: 1
31 changes: 31 additions & 0 deletions ray-operator/config/samples/ray-cluster.kai-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#A simple example raycluster with KAI
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-sample
labels:
kai.scheduler/queue: team-a
spec:
headGroupSpec:
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
resources:
requests:
cpu: "1"
memory: "2Gi"
workerGroupSpecs:
- groupName: worker
replicas: 2
minReplicas: 2
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
requests:
cpu: "1"
memory: "1Gi"
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kaischeduler

// This KAI plugin relies on KAI-Scheduler's
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! In that case, I guess it’s possible for the KAI scheduler to support gang scheduling with autoscaling enabled?

// built-in PodGrouper to create PodGroups at
// runtime, so the plugin itself only needs to:
// 1. expose the scheduler name,
// 2. stamp pods with schedulerName + queue label.
// No PodGroup create/patch logic is included.

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
)

const (
QueueLabelName = "kai.scheduler/queue"
)

type KaiScheduler struct{}

type KaiSchedulerFactory struct{}

func GetPluginName() string { return "kai-scheduler" }

func (k *KaiScheduler) Name() string { return GetPluginName() }

func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ *rayv1.RayCluster) error {
return nil
}

func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
pod.Spec.SchedulerName = k.Name()

queue, ok := app.Labels[QueueLabelName]
if !ok || queue == "" {
logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if we out logger intype KaiScheduler struct{} so we can reuse instead of creating new one everytime AddMetadataToPod is called

logger.Info("Queue label missing from RayCluster; pods will remain pending",
"requiredLabel", QueueLabelName,
"rayCluster", app.Name)
return
}
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[QueueLabelName] = queue
}

func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we might need to create logger when new a Kaischeduler.

return &KaiScheduler{}, nil
}

func (kf *KaiSchedulerFactory) AddToScheme(_ *runtime.Scheme) {
}

func (kf *KaiSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
return b
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package kaischeduler

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

func createTestRayCluster(labels map[string]string) *rayv1.RayCluster {
return &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: "default",
Labels: labels,
},
}
}

func createTestPod() *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
Labels: map[string]string{
"ray.io/cluster": "test-cluster",
"ray.io/node-type": "worker",
"app": "ray",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "ray-worker",
Image: "rayproject/ray:latest",
}},
},
}
}

func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
a := assert.New(t)
scheduler := &KaiScheduler{}
ctx := context.Background()

// Create RayCluster with queue label
rayCluster := createTestRayCluster(map[string]string{
QueueLabelName: "test-queue",
})
pod := createTestPod()

// Call AddMetadataToPod
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)

// Assert scheduler name is set to kai-scheduler
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

// Assert queue label is propagated to pod
a.NotNil(pod.Labels)
a.Equal("test-queue", pod.Labels[QueueLabelName])
}

func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
a := assert.New(t)
scheduler := &KaiScheduler{}
ctx := context.Background()

// Create RayCluster without queue label
rayCluster := createTestRayCluster(map[string]string{})
pod := createTestPod()

// Call AddMetadataToPod
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)

// Assert scheduler name is still set (always required)
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

// Assert queue label is not added to pod when missing from RayCluster
if pod.Labels != nil {
_, exists := pod.Labels[QueueLabelName]
a.False(exists)
}
}

func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
a := assert.New(t)
scheduler := &KaiScheduler{}
ctx := context.Background()

// Create RayCluster with empty queue label
rayCluster := createTestRayCluster(map[string]string{
QueueLabelName: "",
})
pod := createTestPod()

// Call AddMetadataToPod
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)

// Assert scheduler name is still set
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

// Assert empty queue label is treated as missing
if pod.Labels != nil {
_, exists := pod.Labels[QueueLabelName]
a.False(exists)
}
}

func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
a := assert.New(t)
scheduler := &KaiScheduler{}
ctx := context.Background()

// Create RayCluster with queue label
rayCluster := createTestRayCluster(map[string]string{
QueueLabelName: "test-queue",
})

// Create pod with existing labels
pod := createTestPod()
pod.Labels = map[string]string{
"existing-label": "existing-value",
"app": "ray",
}

// Call AddMetadataToPod
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)

// Assert scheduler name is set
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

// Assert queue label is added
a.Equal("test-queue", pod.Labels[QueueLabelName])

// Assert existing labels are preserved
a.Equal("existing-value", pod.Labels["existing-label"])
a.Equal("ray", pod.Labels["app"])
}
Loading
Loading