From 1c28709851777cf6e7d29d20a0d4569f9df5c608 Mon Sep 17 00:00:00 2001 From: David Date: Thu, 4 Sep 2025 16:17:36 -0400 Subject: [PATCH] drain: re-cordon nodes if needed --- pkg/controller/drain/drain_controller.go | 8 +- pkg/controller/drain/drain_controller_test.go | 214 ++++++++++++++++++ 2 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 pkg/controller/drain/drain_controller_test.go diff --git a/pkg/controller/drain/drain_controller.go b/pkg/controller/drain/drain_controller.go index 09e87047cd..689f16a4cc 100644 --- a/pkg/controller/drain/drain_controller.go +++ b/pkg/controller/drain/drain_controller.go @@ -411,7 +411,13 @@ func (ctrl *Controller) drainNode(node *corev1.Node, drainer *drain.Helper) erro return err } - if !isOngoingDrain { + // isOngoingDrain will only inform us of MCO managed cordons/drains. We should also check + // if an external actor has uncordoned this node, and force it back into a cordoned state if so. + if isOngoingDrain && !node.Spec.Unschedulable { + klog.Infof("External actor has unexpectedly uncordoned node %s, cordoning again...", node.Name) + } + + if !isOngoingDrain || !node.Spec.Unschedulable { ctrl.logNode(node, "cordoning") // perform cordon if err := ctrl.cordonOrUncordonNode(true, node, drainer); err != nil { diff --git a/pkg/controller/drain/drain_controller_test.go b/pkg/controller/drain/drain_controller_test.go new file mode 100644 index 0000000000..87dd57f3ca --- /dev/null +++ b/pkg/controller/drain/drain_controller_test.go @@ -0,0 +1,214 @@ +package drain + +import ( + "context" + "encoding/json" + "maps" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + + configv1 "github.com/openshift/api/config/v1" + mcfgv1 "github.com/openshift/api/machineconfiguration/v1" + fakemcfgclientset "github.com/openshift/client-go/machineconfiguration/clientset/versioned/fake" + mcfginformers "github.com/openshift/client-go/machineconfiguration/informers/externalversions" + daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants" + + "github.com/stretchr/testify/assert" +) + +type fakeFeatureGateHandler struct{} + +func (f *fakeFeatureGateHandler) Connect(ctx context.Context) error { return nil } +func (f *fakeFeatureGateHandler) Enabled(featureName configv1.FeatureGateName) bool { return false } +func (f *fakeFeatureGateHandler) Exists(featureName configv1.FeatureGateName) bool { return false } +func (f *fakeFeatureGateHandler) KnownFeatures() []configv1.FeatureGateName { return nil } + +const ( + testNodeName = "test-node" + testPoolName = "worker" + testDrainState = "drain-test-hash" + testUncordonState = "uncordon-test-hash" +) + +func createTestNode(name string, unschedulable bool) *corev1.Node { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: make(map[string]string), + }, + Spec: corev1.NodeSpec{ + Unschedulable: unschedulable, + }, + } + return node +} + +func createTestNodeWithAnnotations(name string, unschedulable bool, annotations map[string]string) *corev1.Node { + node := createTestNode(name, unschedulable) + maps.Copy(node.Annotations, annotations) + return node +} + +func createTestMCP(name string) *mcfgv1.MachineConfigPool { + return &mcfgv1.MachineConfigPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: mcfgv1.MachineConfigPoolSpec{ + NodeSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "node-role.kubernetes.io/" + name: "", + }, + }, + }, + } +} + +func createTestController(nodes []*corev1.Node, mcps []*mcfgv1.MachineConfigPool) (*Controller, *k8sfake.Clientset, *fakemcfgclientset.Clientset) { + kubeObjs := make([]runtime.Object, len(nodes)) + for i, node := range nodes { + kubeObjs[i] = node + } + kubeClient := k8sfake.NewSimpleClientset(kubeObjs...) + + mcfgObjs := make([]runtime.Object, len(mcps)) + for i, mcp := range mcps { + mcfgObjs[i] = mcp + } + mcfgClient := fakemcfgclientset.NewSimpleClientset(mcfgObjs...) + + kubeInformers := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + mcfgInformers := mcfginformers.NewSharedInformerFactory(mcfgClient, 0) + + nodeInformer := kubeInformers.Core().V1().Nodes() + mcpInformer := mcfgInformers.Machineconfiguration().V1().MachineConfigPools() + + // Start informers to properly initialize them + kubeInformers.Start(make(chan struct{})) + mcfgInformers.Start(make(chan struct{})) + + // Add nodes to informer + for _, node := range nodes { + nodeInformer.Informer().GetIndexer().Add(node) + } + // Add MCPs to informer + for _, mcp := range mcps { + mcpInformer.Informer().GetIndexer().Add(mcp) + } + + cfg := DefaultConfig() + cfg.DrainTimeoutDuration = 10 * time.Minute + cfg.DrainRequeueDelay = 1 * time.Minute + cfg.DrainRequeueFailingDelay = 5 * time.Minute + cfg.DrainRequeueFailingThreshold = 5 * time.Minute + + fgHandler := &fakeFeatureGateHandler{} + + ctrl := New(cfg, nodeInformer, mcpInformer, kubeClient, mcfgClient, fgHandler) + + // Initialize ongoing drains map for testing + ctrl.ongoingDrains = make(map[string]time.Time) + + return ctrl, kubeClient, mcfgClient +} + +func createDrainTestNode(nodeName string, unschedulable bool, desiredState, lastAppliedState string) *corev1.Node { + node := createTestNodeWithAnnotations(nodeName, unschedulable, map[string]string{ + daemonconsts.DesiredDrainerAnnotationKey: desiredState, + daemonconsts.LastAppliedDrainerAnnotationKey: lastAppliedState, + }) + node.Labels = map[string]string{ + "node-role.kubernetes.io/" + testPoolName: "", + } + return node +} + +func setupControllerAndSync(node *corev1.Node, ongoingDrains map[string]time.Time) (*Controller, *k8sfake.Clientset, error) { + ctrl, kubeClient, _ := createTestController([]*corev1.Node{node}, []*mcfgv1.MachineConfigPool{createTestMCP(testPoolName)}) + + if ongoingDrains != nil { + ctrl.ongoingDrains = ongoingDrains + } + + err := ctrl.syncNode(testNodeName) + return ctrl, kubeClient, err +} + +func verifyDrainPatches(t *testing.T, kubeClient *k8sfake.Clientset, expectedUnschedulable bool, expectedAnnotationValue string) { + // Collect all patch actions + patchActions := []core.PatchAction{} + for _, action := range kubeClient.Actions() { + if patchAction, ok := action.(core.PatchAction); ok { + patchActions = append(patchActions, patchAction) + } + } + + // Verify exactly 2 patch operations occurred + assert.Len(t, patchActions, 2, "should have made exactly two patch requests") + + // Verify the first patch sets the correct scheduling state + firstPatchBytes := patchActions[0].GetPatch() + var firstPatch map[string]any + err := json.Unmarshal(firstPatchBytes, &firstPatch) + assert.NoError(t, err, "unmarshalling first patch failed") + if spec, ok := firstPatch["spec"].(map[string]any); ok { + if unschedulable, ok := spec["unschedulable"].(bool); ok { + assert.Equal(t, expectedUnschedulable, unschedulable, "first patch should set node schedulable state correctly") + } + } + + // Verify the second patch sets the LastAppliedDrainerAnnotationKey annotation + secondPatchBytes := patchActions[1].GetPatch() + var secondPatch map[string]any + err = json.Unmarshal(secondPatchBytes, &secondPatch) + assert.NoError(t, err, "unmarshalling second patch failed") + if metadata, ok := secondPatch["metadata"].(map[string]any); ok { + if annotations, ok := metadata["annotations"].(map[string]any); ok { + if lastApplied, ok := annotations[daemonconsts.LastAppliedDrainerAnnotationKey].(string); ok { + assert.Equal(t, expectedAnnotationValue, lastApplied, "LastAppliedDrainerAnnotationKey should be set correctly") + } + } + } +} + +func TestSyncNode(t *testing.T) { + + t.Run("uncordon requested", func(t *testing.T) { + node := createDrainTestNode(testNodeName, true, testUncordonState, "") + _, kubeClient, err := setupControllerAndSync(node, nil) + assert.NoError(t, err, "syncNode should not error for uncordon action") + + // Verify patch operations: uncordon (schedulable=false) + completion annotation + verifyDrainPatches(t, kubeClient, false, testUncordonState) + + }) + + t.Run("drain requested", func(t *testing.T) { + node := createDrainTestNode(testNodeName, false, testDrainState, "") + _, kubeClient, err := setupControllerAndSync(node, nil) + assert.NoError(t, err, "syncNode should not error for drain action") + + // Verify patch operations: cordon (unschedulable=true) + completion annotation + verifyDrainPatches(t, kubeClient, true, testDrainState) + }) + + t.Run("re-cordon required", func(t *testing.T) { + node := createDrainTestNode(testNodeName, false, testDrainState, "") + // Simulate ongoing drain (but node is not cordoned - external uncordon) + ongoingDrains := map[string]time.Time{ + testNodeName: time.Now().Add(-5 * time.Minute), + } + _, kubeClient, err := setupControllerAndSync(node, ongoingDrains) + assert.NoError(t, err, "syncNode should not error for re-cordon action") + + // Verify patch operations: re-cordon (unschedulable=true) + completion annotation + verifyDrainPatches(t, kubeClient, true, testDrainState) + }) +}