Skip to content

Node Resource Controller #1

@latermonk

Description

@latermonk

下面是一个使用Kubebuilder和Go实现的Node资源控制器示例,用于在CPU和内存使用率较低时自动Cordon节点:

// 注意:这是一个简化示例,实际生产环境需要完善错误处理和优化逻辑

package controllers

import (
	"context"
	"fmt"
	"os"
	"time"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/resource"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// NodeReconciler reconciles a Node object
type NodeReconciler struct {
	client.Client
	Scheme        *runtime.Scheme
	Clientset     *kubernetes.Clientset
	MetricsClient *metricsv.Clientset
}

// 环境变量或配置参数
const (
	defaultCPUThreshold    = 30.0
	defaultMemoryThreshold = 30.0
	checkInterval          = 5 * time.Minute
)

func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
	// 启动定时检查任务
	go r.periodicNodeCheck(checkInterval)
	return ctrl.NewControllerManagedBy(mgr).
		For(&corev1.Node{}).
		Complete(r)
}

func (r *NodeReconciler) periodicNodeCheck(interval time.Duration) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			r.checkNodes()
		}
	}
}

func (r *NodeReconciler) checkNodes() {
	ctx := context.Background()

	// 获取所有节点
	nodes, err := r.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
	if err != nil {
		fmt.Printf("Error listing nodes: %v\n", err)
		return
	}

	// 获取节点Metrics
	nodeMetricsList, err := r.MetricsClient.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{})
	if err != nil {
		fmt.Printf("Error getting node metrics: %v\n", err)
		return
	}

	metricsMap := make(map[string]corev1.ResourceList)
	for _, nm := range nodeMetricsList.Items {
		metricsMap[nm.Name] = nm.Usage
	}

	// 处理每个节点
	for _, node := range nodes.Items {
		// 跳过已排除节点(示例:跳过master节点)
		if isMasterNode(&node) {
			continue
		}

		usage := metricsMap[node.Name]
		if len(usage) == 0 {
			fmt.Printf("No metrics available for node %s\n", node.Name)
			continue
		}

		// 计算使用率
		cpuPercent, memPercent := calculateUsage(&node, usage)

		// 获取阈值配置
		cpuThreshold := getThreshold("CPU_THRESHOLD", defaultCPUThreshold)
		memThreshold := getThreshold("MEMORY_THRESHOLD", defaultMemoryThreshold)

		// 检查是否需要Cordon
		if cpuPercent < cpuThreshold && memPercent < memThreshold {
			if !node.Spec.Unschedulable {
				fmt.Printf("Cordoning node %s (CPU: %.2f%%, Memory: %.2f%%)\n",
					node.Name, cpuPercent, memPercent)
				if err := r.cordonNode(ctx, &node); err != nil {
					fmt.Printf("Error cordoning node %s: %v\n", node.Name, err)
				}
			}
		}
	}
}

func isMasterNode(node *corev1.Node) bool {
	if _, exists := node.Labels["node-role.kubernetes.io/master"]; exists {
		return true
	}
	if _, exists := node.Labels["node-role.kubernetes.io/control-plane"]; exists {
		return true
	}
	return false
}

func calculateUsage(node *corev1.Node, usage corev1.ResourceList) (float64, float64) {
	// CPU使用率
	cpuUsage := usage[corev1.ResourceCPU]
	cpuAlloc := node.Status.Allocatable[corev1.ResourceCPU]
	cpuPercent := float64(cpuUsage.MilliValue()) / float64(cpuAlloc.MilliValue()) * 100

	// 内存使用率
	memUsage := usage[corev1.ResourceMemory]
	memAlloc := node.Status.Allocatable[corev1.ResourceMemory]
	memPercent := float64(memUsage.Value()) / float64(memAlloc.Value()) * 100

	return cpuPercent, memPercent
}

func (r *NodeReconciler) cordonNode(ctx context.Context, node *corev1.Node) error {
	patch := client.MergeFrom(node.DeepCopy())
	node.Spec.Unschedulable = true
	return r.Patch(ctx, node, patch)
}

func getThreshold(envVar string, defaultValue float64) float64 {
	valueStr := os.Getenv(envVar)
	if valueStr == "" {
		return defaultValue
	}
	var value float64
	_, err := fmt.Sscanf(valueStr, "%f", &value)
	if err != nil {
		return defaultValue
	}
	return value
}

部署说明:

  1. 创建项目:
kubebuilder init --domain example.com --repo example.com/node-cordon
kubebuilder create api --group node --version v1 --kind CordonPolicy
  1. 替换生成的控制器代码为上述示例

  2. RBAC配置 (config/rbac/role.yaml) 需要添加:

- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["get", "list", "patch", "update"]
- apiGroups: ["metrics.k8s.io"]
  resources: ["nodes"]
  verbs: ["get", "list"]
  1. 部署到集群:
make docker-build docker-push IMG=<your-image>
make deploy
  1. 设置环境变量(在部署manifest中):
env:
- name: CPU_THRESHOLD
  value: "30"
- name: MEMORY_THRESHOLD
  value: "30"

注意事项:

  1. 需要集群中已安装metrics-server
  2. 根据实际需求调整阈值和检查间隔
  3. 建议添加更完善的错误处理和重试机制
  4. 可以扩展为使用CRD进行配置
  5. 需要处理领导选举问题(Kubebuilder默认已处理)
  6. 建议添加事件记录和监控指标

该控制器会:

  • 每5分钟检查一次所有节点
  • 排除master节点
  • 当CPU和内存使用率都低于30%时自动cordon节点
  • 通过环境变量可以调整阈值

可以根据实际需求扩展以下功能:

  • 添加Uncordon机制
  • 实现更复杂的调度策略
  • 添加Prometheus监控指标
  • 使用更平滑的滚动Cordon策略
  • 添加节点排除标签配置

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions