diff --git a/pkg/reconciler/tenancy/logicalcluster/logicalcluster_controller.go b/pkg/reconciler/tenancy/logicalcluster/logicalcluster_controller.go index 535d1d12384..1f5c280da26 100644 --- a/pkg/reconciler/tenancy/logicalcluster/logicalcluster_controller.go +++ b/pkg/reconciler/tenancy/logicalcluster/logicalcluster_controller.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" authenticationv1 "k8s.io/api/authentication/v1" @@ -37,9 +38,11 @@ import ( kcprbacinformers "github.com/kcp-dev/client-go/informers/rbac/v1" kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" kcprbaclisters "github.com/kcp-dev/client-go/listers/rbac/v1" + "github.com/kcp-dev/logicalcluster/v3" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/events" + kcpmetrics "github.com/kcp-dev/kcp/pkg/server/metrics" corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1" corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" @@ -58,6 +61,7 @@ func NewController( kubeClusterClient kcpkubernetesclientset.ClusterInterface, logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, clusterRoleBindingInformer kcprbacinformers.ClusterRoleBindingClusterInformer, + shardName string, ) *Controller { c := &Controller{ queue: workqueue.NewTypedRateLimitingQueueWithConfig( @@ -69,16 +73,26 @@ func NewController( kubeClusterClient: kubeClusterClient, logicalClusterLister: logicalClusterInformer.Lister(), clusterRoleBindingLister: clusterRoleBindingInformer.Lister(), + shardName: shardName, + countedClusters: make(map[string]string), } _, _ = logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { c.enqueue(obj) }, - UpdateFunc: func(obj, _ interface{}) { c.enqueue(obj) }, - DeleteFunc: func(obj interface{}) { c.enqueue(obj) }, + AddFunc: func(obj any) { + c.enqueue(obj) + c.handleMetricsOnAdd(obj) + }, + UpdateFunc: func(oldObj, newObj any) { + c.enqueue(newObj) + c.handleMetricsOnUpdate(oldObj, newObj) + }, + DeleteFunc: func(obj any) { + c.enqueue(obj) + c.handleMetricsOnDelete(obj) + }, }) - _, _ = clusterRoleBindingInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { + FilterFunc: func(obj any) bool { crb, ok := obj.(*rbacv1.ClusterRoleBinding) if !ok { return false @@ -86,9 +100,9 @@ func NewController( return crb.Name == workspaceAdminClusterRoleBindingName }, Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { c.enqueueCRB(obj) }, - UpdateFunc: func(obj, _ interface{}) { c.enqueueCRB(obj) }, - DeleteFunc: func(obj interface{}) { c.enqueueCRB(obj) }, + AddFunc: func(obj any) { c.enqueueCRB(obj) }, + UpdateFunc: func(obj, _ any) { c.enqueueCRB(obj) }, + DeleteFunc: func(obj any) { c.enqueueCRB(obj) }, }, })) @@ -104,9 +118,12 @@ type Controller struct { logicalClusterLister corev1alpha1listers.LogicalClusterClusterLister clusterRoleBindingLister kcprbaclisters.ClusterRoleBindingClusterLister + mu sync.Mutex + countedClusters map[string]string + shardName string } -func (c *Controller) enqueue(obj interface{}) { +func (c *Controller) enqueue(obj any) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) @@ -117,7 +134,7 @@ func (c *Controller) enqueue(obj interface{}) { c.queue.Add(key) } -func (c *Controller) enqueueCRB(obj interface{}) { +func (c *Controller) enqueueCRB(obj any) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { utilruntime.HandleError(err) @@ -189,7 +206,6 @@ func (c *Controller) process(ctx context.Context, key string) error { if !apierrors.IsNotFound(err) { logger.Error(err, "failed to get LogicalCluster from lister", "cluster", clusterName) } - return nil // nothing we can do here } @@ -252,3 +268,76 @@ func (c *Controller) process(ctx context.Context, key string) error { _, err = c.kubeClusterClient.Cluster(clusterName.Path()).RbacV1().ClusterRoleBindings().Update(ctx, newBinding, metav1.UpdateOptions{}) return err } + +func (c *Controller) handleMetricsOnAdd(obj any) { + logicalCluster, ok := obj.(*corev1alpha1.LogicalCluster) + if !ok { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + clusterKey := string(logicalcluster.From(logicalCluster)) + phase := string(logicalCluster.Status.Phase) + if _, exists := c.countedClusters[clusterKey]; !exists { + c.countedClusters[clusterKey] = phase + if phase != "" { + kcpmetrics.IncrementLogicalClusterCount(c.shardName, phase) + } + } +} + +func (c *Controller) handleMetricsOnUpdate(oldObj, newObj any) { + oldLogicalCluster, ok := oldObj.(*corev1alpha1.LogicalCluster) + if !ok { + return + } + + newLogicalCluster, ok := newObj.(*corev1alpha1.LogicalCluster) + if !ok { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + clusterKey := string(logicalcluster.From(newLogicalCluster)) + oldPhase := string(oldLogicalCluster.Status.Phase) + newPhase := string(newLogicalCluster.Status.Phase) + + if oldPhase != newPhase { + if oldPhase != "" { + kcpmetrics.DecrementLogicalClusterCount(c.shardName, oldPhase) + } + if newPhase != "" { + kcpmetrics.IncrementLogicalClusterCount(c.shardName, newPhase) + } + c.countedClusters[clusterKey] = newPhase + } +} + +func (c *Controller) handleMetricsOnDelete(obj any) { + logicalCluster, ok := obj.(*corev1alpha1.LogicalCluster) + if !ok { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + logicalCluster, ok = tombstone.Obj.(*corev1alpha1.LogicalCluster) + if !ok { + return + } + } else { + return + } + } + + c.mu.Lock() + defer c.mu.Unlock() + + clusterKey := string(logicalcluster.From(logicalCluster)) + if phase, exists := c.countedClusters[clusterKey]; exists { + delete(c.countedClusters, clusterKey) + if phase != "" { + kcpmetrics.DecrementLogicalClusterCount(c.shardName, phase) + } + } +} diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index 22e70f094ea..d7826fdaec3 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -435,6 +435,7 @@ func (s *Server) installTenancyLogicalClusterController(ctx context.Context, con kubeClusterClient, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), + s.Options.Extra.ShardName, ) return s.registerController(&controllerWrapper{ diff --git a/pkg/server/metrics/metrics.go b/pkg/server/metrics/metrics.go new file mode 100644 index 00000000000..28489fe1423 --- /dev/null +++ b/pkg/server/metrics/metrics.go @@ -0,0 +1,47 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +var ( + logicalClusterCount = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Name: "kcp_logicalcluster_count", + Help: "Number of logical clusters currently running with specific phases on this shard.", + StabilityLevel: metrics.ALPHA, + }, + []string{"shard", "phase"}, + ) +) + +func init() { + legacyregistry.MustRegister(logicalClusterCount) +} + +// IncrementLogicalClusterCount increments the count for the given shard and phase. +func IncrementLogicalClusterCount(shardName string, phase string) { + logicalClusterCount.WithLabelValues(shardName, phase).Inc() +} + +// DecrementLogicalClusterCount decrements the count for the given shard and phase. +func DecrementLogicalClusterCount(shardName string, phase string) { + logicalClusterCount.WithLabelValues(shardName, phase).Dec() +}