-
Notifications
You must be signed in to change notification settings - Fork 579
feat(co scheduling): using custom indexer instead of naive lister #931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
Welcome @JasonHe-WQ! |
|
Hi @JasonHe-WQ. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: JasonHe-WQ The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
✅ Deploy Preview for kubernetes-sigs-scheduler-plugins canceled.
|
|
It seems not to be a problem after serious mocking and testing package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
// getKubeConfig tries in-cluster first, then falls back to local kubeconfig.
func getKubeConfig(kubeconfigPath string) (*rest.Config, error) {
// Try in-cluster
if cfg, err := rest.InClusterConfig(); err == nil {
return cfg, nil
}
// Fall back to local kubeconfig
if kubeconfigPath == "" {
home, _ := os.UserHomeDir()
kubeconfigPath = filepath.Join(home, ".kube", "config")
}
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}
func main() {
var (
kubeconfig string
labelKey string
labelValue string
namespace string
repeat int
warmup int
)
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig; empty means in-cluster first, then ~/.kube/config")
flag.StringVar(&labelKey, "label-key", "scheduling.x-k8s.io/pod-group", "Label key to query")
flag.StringVar(&labelValue, "label-value", "exp-20250917-105355-t0", "Label value to query")
flag.StringVar(&namespace, "namespace", "", "Target namespace; empty for all namespaces")
flag.IntVar(&repeat, "repeat", 30, "Number of timed queries to run")
flag.IntVar(&warmup, "warmup", 5, "Warm-up queries before timing (not measured)")
flag.Parse()
cfg, err := getKubeConfig(kubeconfig)
if err != nil {
panic(fmt.Errorf("failed to get kubeconfig: %w", err))
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
panic(fmt.Errorf("failed to create clientset: %w", err))
}
// Create a SharedInformerFactory with 0 resync (event-driven)
factory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := factory.Core().V1().Pods()
inf := podInformer.Informer()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start informers and wait for initial cache sync
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
panic("failed to sync informer cache")
}
// Get lister and indexer
lister := podInformer.Lister()
// Count all pods (cluster-wide or namespaced)
var allPods []*corev1.Pod
var listErr error
if namespace == "" {
allPods, listErr = lister.List(labels.Everything())
} else {
allPods, listErr = podInformer.Lister().Pods(namespace).List(labels.Everything())
}
if listErr != nil {
panic(fmt.Errorf("failed to list all pods: %w", listErr))
}
fmt.Printf("Total pods in cluster (or namespace if set): %d\n", len(allPods))
// Prepare selector for label-based scan
selector := labels.SelectorFromSet(labels.Set{labelKey: labelValue})
fmt.Printf("Looking for pods with label selector: %s=%s\n", labelKey, labelValue)
// Warm-up (not timed): ensures any lazy paths and CPU caches are primed
for i := 0; i < warmup; i++ {
if namespace == "" {
_, _ = lister.List(selector)
} else {
_, _ = podInformer.Lister().Pods(namespace).List(selector)
}
}
// Timed queries using Lister + LabelSelector (linear scan over store)
start := time.Now()
var hits []*corev1.Pod
for i := 0; i < repeat; i++ {
if namespace == "" {
hits, listErr = lister.List(selector)
} else {
hits, listErr = podInformer.Lister().Pods(namespace).List(selector)
}
if listErr != nil {
panic(fmt.Errorf("failed labeled list: %w", listErr))
}
}
elapsed := time.Since(start)
avg := time.Duration(0)
if repeat > 0 {
avg = elapsed / time.Duration(repeat)
}
fmt.Printf("[Lister+Selector] label %q=%q -> hits=%d, repeat=%d, total=%s, avg=%s\n",
labelKey, labelValue, len(hits), repeat, elapsed, avg)
} |
|
After adding indexer package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
// getKubeConfig tries in-cluster first, then falls back to local kubeconfig.
func getKubeConfig(kubeconfigPath string) (*rest.Config, error) {
// Try in-cluster
if cfg, err := rest.InClusterConfig(); err == nil {
return cfg, nil
}
// Fall back to local kubeconfig
if kubeconfigPath == "" {
home, _ := os.UserHomeDir()
kubeconfigPath = filepath.Join(home, ".kube", "config")
}
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}
func main() {
var (
kubeconfig string
labelKey string
labelValue string
namespace string
repeat int
warmup int
indexName string
)
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig; empty means in-cluster first, then ~/.kube/config")
flag.StringVar(&labelKey, "label-key", "scheduling.x-k8s.io/pod-group", "Label key to index and query")
flag.StringVar(&labelValue, "label-value", "exp-20250917-105355-t0", "Label value to query via index")
flag.StringVar(&namespace, "namespace", "", "Target namespace (REQUIRED for index query)")
flag.IntVar(&repeat, "repeat", 30, "Number of timed queries to run")
flag.IntVar(&warmup, "warmup", 5, "Warm-up queries before timing (not measured)")
flag.StringVar(&indexName, "index-name", "byNamespaceAndPodGroupLabel", "Custom index name")
flag.Parse()
if namespace == "" {
fmt.Fprintln(os.Stderr, "ERROR: --namespace is required when using a namespace+label index key")
os.Exit(2)
}
cfg, err := getKubeConfig(kubeconfig)
if err != nil {
panic(fmt.Errorf("failed to get kubeconfig: %w", err))
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
panic(fmt.Errorf("failed to create clientset: %w", err))
}
// Always build a CLUSTER-WIDE SharedInformerFactory (alignment between lister and indexer).
factory := informers.NewSharedInformerFactory(clientset, 0)
// Typed Pod informer handle.
podInformer := factory.Core().V1().Pods()
// IMPORTANT: instantiate informer BEFORE Start so it is registered and can be started.
inf := podInformer.Informer()
// IMPORTANT: Add indexers BEFORE starting the informer.
// Index key format: "<namespace>/<labelValue>"
indexFunc := func(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)
if !ok || pod == nil {
return nil, nil
}
if pod.Labels == nil {
return nil, nil
}
if val, ok := pod.Labels[labelKey]; ok && val != "" {
return []string{pod.Namespace + "/" + val}, nil
}
return nil, nil
}
if err := inf.AddIndexers(cache.Indexers{indexName: indexFunc}); err != nil {
panic(fmt.Errorf("failed to add indexer: %w", err))
}
// Start informers and wait for THIS informer's cache to sync.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) {
panic("pod informer cache failed to sync")
}
// Reference stats: total pods in cluster (same cluster-wide informer for alignment).
lister := podInformer.Lister()
allPods, err := lister.List(labels.Everything())
if err != nil {
panic(fmt.Errorf("failed to list all pods: %w", err))
}
fmt.Printf("Total pods in cluster: %d\n", len(allPods))
// Optional per-namespace count (uses the same cluster-wide informer).
nsPods, err := lister.Pods(namespace).List(labels.Everything())
if err != nil {
panic(fmt.Errorf("failed to list pods in namespace %q: %w", namespace, err))
}
fmt.Printf("Pods in namespace %q: %d\n", namespace, len(nsPods))
// Prepare index key and warm-up (not timed).
indexer := inf.GetIndexer()
key := namespace + "/" + labelValue
for i := 0; i < warmup; i++ {
_, _ = indexer.ByIndex(indexName, key)
}
// Timed queries using the namespace+label index.
start := time.Now()
var objs []interface{}
for i := 0; i < repeat; i++ {
objs, err = indexer.ByIndex(indexName, key)
if err != nil {
panic(fmt.Errorf("index query failed: %w", err))
}
}
elapsed := time.Since(start)
var avg time.Duration
if repeat > 0 {
avg = elapsed / time.Duration(repeat)
}
// Convert []interface{} to []*corev1.Pod (mostly for type safety; hits should already be in the namespace).
hits := 0
for _, o := range objs {
if p, ok := o.(*corev1.Pod); ok && p.Namespace == namespace {
// No extra label check needed; the index key already enforces labelValue.
hits++
}
}
fmt.Printf("[Indexer] ns=%q, label %q=%q (index=%q) -> hits=%d, repeat=%d, total=%s, avg=%s\n",
namespace, labelKey, labelValue, indexName, hits, repeat, elapsed, avg)
} |
|
Hi, thanks for the PR. Can we summarize the performance gains without/with this PR? |
ffromani
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ok-to-test
I haven’t run end-to-end tests yet because we had modified several other places to enable basic preemption. Once this PR is merged, each call is expected to drop to < 100 ns on a 137 k-pod cluster (25 k in the same namespace); details and the relevant codes are linked above. Without it, the same operation can take up to 16 ms—about 160× slower. |
What type of PR is this?
/kind feature
What this PR does / why we need it:
To improve performance when using co scheduling at informer cache.
When using original lister, the
listimplement seems to be naive for listing all the pods of the namespace, which may encounter some performance issue at large sacle. Although using list is not a bottle neck, yet it still improve a little bitBy using custome indexer, we can improve performance
Which issue(s) this PR fixes:
Fixes #932
Special notes for your reviewer:
Does this PR introduce a user-facing change?
None