Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 100 additions & 11 deletions pkg/reconciler/tenancy/logicalcluster/logicalcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

authenticationv1 "k8s.io/api/authentication/v1"
Expand All @@ -37,9 +38,11 @@ import (
kcprbacinformers "github.com/kcp-dev/client-go/informers/rbac/v1"
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
kcprbaclisters "github.com/kcp-dev/client-go/listers/rbac/v1"
"github.com/kcp-dev/logicalcluster/v3"

"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/events"
kcpmetrics "github.com/kcp-dev/kcp/pkg/server/metrics"
corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/tenancy/v1alpha1"
corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1"
Expand All @@ -58,6 +61,7 @@ func NewController(
kubeClusterClient kcpkubernetesclientset.ClusterInterface,
logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer,
clusterRoleBindingInformer kcprbacinformers.ClusterRoleBindingClusterInformer,
shardName string,
) *Controller {
c := &Controller{
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
Expand All @@ -69,26 +73,36 @@ func NewController(
kubeClusterClient: kubeClusterClient,
logicalClusterLister: logicalClusterInformer.Lister(),
clusterRoleBindingLister: clusterRoleBindingInformer.Lister(),
shardName: shardName,
countedClusters: make(map[string]string),
}

_, _ = logicalClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueue(obj) },
UpdateFunc: func(obj, _ interface{}) { c.enqueue(obj) },
DeleteFunc: func(obj interface{}) { c.enqueue(obj) },
AddFunc: func(obj any) {
c.enqueue(obj)
c.handleMetricsOnAdd(obj)
},
UpdateFunc: func(oldObj, newObj any) {
c.enqueue(newObj)
c.handleMetricsOnUpdate(oldObj, newObj)
},
DeleteFunc: func(obj any) {
c.enqueue(obj)
c.handleMetricsOnDelete(obj)
},
})

_, _ = clusterRoleBindingInformer.Informer().AddEventHandler(events.WithoutSyncs(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
FilterFunc: func(obj any) bool {
crb, ok := obj.(*rbacv1.ClusterRoleBinding)
if !ok {
return false
}
return crb.Name == workspaceAdminClusterRoleBindingName
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueCRB(obj) },
UpdateFunc: func(obj, _ interface{}) { c.enqueueCRB(obj) },
DeleteFunc: func(obj interface{}) { c.enqueueCRB(obj) },
AddFunc: func(obj any) { c.enqueueCRB(obj) },
UpdateFunc: func(obj, _ any) { c.enqueueCRB(obj) },
DeleteFunc: func(obj any) { c.enqueueCRB(obj) },
},
}))

Expand All @@ -104,9 +118,12 @@ type Controller struct {
logicalClusterLister corev1alpha1listers.LogicalClusterClusterLister

clusterRoleBindingLister kcprbaclisters.ClusterRoleBindingClusterLister
mu sync.Mutex
countedClusters map[string]string
shardName string
}

func (c *Controller) enqueue(obj interface{}) {
func (c *Controller) enqueue(obj any) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
Expand All @@ -117,7 +134,7 @@ func (c *Controller) enqueue(obj interface{}) {
c.queue.Add(key)
}

func (c *Controller) enqueueCRB(obj interface{}) {
func (c *Controller) enqueueCRB(obj any) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
Expand Down Expand Up @@ -189,7 +206,6 @@ func (c *Controller) process(ctx context.Context, key string) error {
if !apierrors.IsNotFound(err) {
logger.Error(err, "failed to get LogicalCluster from lister", "cluster", clusterName)
}

return nil // nothing we can do here
}

Expand Down Expand Up @@ -252,3 +268,76 @@ func (c *Controller) process(ctx context.Context, key string) error {
_, err = c.kubeClusterClient.Cluster(clusterName.Path()).RbacV1().ClusterRoleBindings().Update(ctx, newBinding, metav1.UpdateOptions{})
return err
}

func (c *Controller) handleMetricsOnAdd(obj any) {
logicalCluster, ok := obj.(*corev1alpha1.LogicalCluster)
if !ok {
return
}

c.mu.Lock()
defer c.mu.Unlock()

clusterKey := string(logicalcluster.From(logicalCluster))
phase := string(logicalCluster.Status.Phase)
if _, exists := c.countedClusters[clusterKey]; !exists {
c.countedClusters[clusterKey] = phase
if phase != "" {
kcpmetrics.IncrementLogicalClusterCount(c.shardName, phase)
}
}
}

func (c *Controller) handleMetricsOnUpdate(oldObj, newObj any) {
oldLogicalCluster, ok := oldObj.(*corev1alpha1.LogicalCluster)
if !ok {
return
}

newLogicalCluster, ok := newObj.(*corev1alpha1.LogicalCluster)
if !ok {
return
}

c.mu.Lock()
defer c.mu.Unlock()

clusterKey := string(logicalcluster.From(newLogicalCluster))
oldPhase := string(oldLogicalCluster.Status.Phase)
newPhase := string(newLogicalCluster.Status.Phase)

if oldPhase != newPhase {
if oldPhase != "" {
kcpmetrics.DecrementLogicalClusterCount(c.shardName, oldPhase)
}
if newPhase != "" {
kcpmetrics.IncrementLogicalClusterCount(c.shardName, newPhase)
}
c.countedClusters[clusterKey] = newPhase
}
}

func (c *Controller) handleMetricsOnDelete(obj any) {
logicalCluster, ok := obj.(*corev1alpha1.LogicalCluster)
if !ok {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
logicalCluster, ok = tombstone.Obj.(*corev1alpha1.LogicalCluster)
if !ok {
return
}
} else {
return
}
}

c.mu.Lock()
defer c.mu.Unlock()

clusterKey := string(logicalcluster.From(logicalCluster))
if phase, exists := c.countedClusters[clusterKey]; exists {
delete(c.countedClusters, clusterKey)
if phase != "" {
kcpmetrics.DecrementLogicalClusterCount(c.shardName, phase)
}
}
}
1 change: 1 addition & 0 deletions pkg/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func (s *Server) installTenancyLogicalClusterController(ctx context.Context, con
kubeClusterClient,
s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(),
s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(),
s.Options.Extra.ShardName,
)

return s.registerController(&controllerWrapper{
Expand Down
47 changes: 47 additions & 0 deletions pkg/server/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2025 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

import (
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)

var (
logicalClusterCount = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Name: "kcp_logicalcluster_count",
Help: "Number of logical clusters currently running with specific phases on this shard.",
StabilityLevel: metrics.ALPHA,
},
[]string{"shard", "phase"},
)
)

func init() {
legacyregistry.MustRegister(logicalClusterCount)
}

// IncrementLogicalClusterCount increments the count for the given shard and phase.
func IncrementLogicalClusterCount(shardName string, phase string) {
logicalClusterCount.WithLabelValues(shardName, phase).Inc()
}

// DecrementLogicalClusterCount decrements the count for the given shard and phase.
func DecrementLogicalClusterCount(shardName string, phase string) {
logicalClusterCount.WithLabelValues(shardName, phase).Dec()
}