From 5f4d89d773bc95c8e75de55e935a5b7b88e38a19 Mon Sep 17 00:00:00 2001 From: TrafalgarZZZ Date: Sat, 6 Sep 2025 22:20:33 +0800 Subject: [PATCH 1/6] refactor SyncScheduleInfoToCacheNodes for better readability Signed-off-by: TrafalgarZZZ --- pkg/utils/dataset/lifecycle/node.go | 152 +++++++++++++---------- pkg/utils/dataset/lifecycle/node_test.go | 42 +------ 2 files changed, 85 insertions(+), 109 deletions(-) diff --git a/pkg/utils/dataset/lifecycle/node.go b/pkg/utils/dataset/lifecycle/node.go index dcd35bbff15..b01f1b14a22 100644 --- a/pkg/utils/dataset/lifecycle/node.go +++ b/pkg/utils/dataset/lifecycle/node.go @@ -51,63 +51,111 @@ func init() { rootLog = ctrl.Log.WithName("dataset.lifecycle") } -func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client client.Client) (err error) { +func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client client.Client) error { defer utils.TimeTrack(time.Now(), "SyncScheduleInfoToCacheNodes", "name", runtimeInfo.GetName(), "namespace", runtimeInfo.GetNamespace()) - var ( - currentCacheNodeNames []string - previousCacheNodeNames []string - ) + // Get current cache nodes from worker pods + desiredNodeNames, err := getDesiredNodesWithScheduleInfo(runtimeInfo, client) + if err != nil { + return err + } + + // Get previously assigned cache nodes + actualNodeNames, err := getActualNodesWithScheduleInfo(runtimeInfo, client) + if err != nil { + return err + } + + // Calculate node differences + nodesToAddScheduleInfo, nodesToRemoveScheduleInfo := calculateNodeDifferences(desiredNodeNames, actualNodeNames) + // Apply changes to nodes + applyNodeChanges(nodesToAddScheduleInfo, nodesToRemoveScheduleInfo, runtimeInfo, client) + + return nil +} + +// getDesiredNodesWithScheduleInfo retrieves the desired cache nodes with schedule info +func getDesiredNodesWithScheduleInfo(runtimeInfo base.RuntimeInfoInterface, client client.Client) ([]string, error) { workers, err := fluidctrl.GetWorkersAsStatefulset(client, types.NamespacedName{Namespace: runtimeInfo.GetNamespace(), Name: runtimeInfo.GetWorkerStatefulsetName()}) if err != nil { if fluiderrs.IsDeprecated(err) { rootLog.Info("Warning: Deprecated mode is not support, so skip handling", "details", err) - return nil + return nil, nil } - return err + return nil, err } workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector) + if err != nil { + return nil, err + } workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector) if err != nil { - return err + return nil, err } + var nodeNames []string for _, pod := range workerPods { - if len(pod.Spec.NodeName) != 0 { - currentCacheNodeNames = append(currentCacheNodeNames, pod.Spec.NodeName) + if pod.Spec.NodeName != "" { + nodeNames = append(nodeNames, pod.Spec.NodeName) } } - // find the nodes which already have the runtime labels - previousCacheNodeNames, err = getAssignedNodes(runtimeInfo, client) + return utils.RemoveDuplicateStr(nodeNames), nil +} - currentCacheNodeNames = utils.RemoveDuplicateStr(currentCacheNodeNames) - previousCacheNodeNames = utils.RemoveDuplicateStr(previousCacheNodeNames) +// getActualNodesWithScheduleInfo retrieves the actual cache nodes with schedule info +func getActualNodesWithScheduleInfo(runtimeInfo base.RuntimeInfoInterface, cli client.Client) (nodeNames []string, err error) { + var ( + nodeList = &corev1.NodeList{} + runtimeLabel = runtimeInfo.GetRuntimeLabelName() + ) - addedCacheNodenames := utils.SubtractString(currentCacheNodeNames, previousCacheNodeNames) - removedCacheNodenames := utils.SubtractString(previousCacheNodeNames, currentCacheNodeNames) + nodeNames = []string{} + datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", runtimeLabel)) + if err != nil { + return + } - if len(addedCacheNodenames) > 0 { - for _, nodeName := range addedCacheNodenames { - if innerErr := addScheduleInfoToNode(nodeName, runtimeInfo, client); innerErr != nil { - rootLog.Info(fmt.Sprintf("error when adding schedule info to node: %v, ignore it and continue", innerErr), "node", nodeName) - } - } + err = cli.List(context.TODO(), nodeList, &client.ListOptions{ + LabelSelector: datasetLabels, + }) + if err != nil { + return } - if len(removedCacheNodenames) > 0 { - for _, nodeName := range removedCacheNodenames { - if innerErr := removeScheduleInfoFromNode(nodeName, runtimeInfo, client); innerErr != nil { - rootLog.Info(fmt.Sprintf("error when removing schedule info from node: %v, ignore it and continue", innerErr), "node", nodeName) - } + for _, node := range nodeList.Items { + nodeNames = append(nodeNames, node.Name) + } + + return utils.RemoveDuplicateStr(nodeNames), nil +} + +// calculateNodeDifferences calculates which nodes need to be added or removed +func calculateNodeDifferences(currentNodes, previousNodes []string) (nodesToAddLabels, nodesToRemovedLabels []string) { + nodesToAddLabels = utils.SubtractString(currentNodes, previousNodes) + nodesToRemovedLabels = utils.SubtractString(previousNodes, currentNodes) + return +} + +// applyNodeChanges applies the calculated changes to the nodes +func applyNodeChanges(nodesToAddScheduleInfo, nodesToRemoveScheduleInfo []string, runtimeInfo base.RuntimeInfoInterface, client client.Client) { + // Add schedule info to new nodes + for _, nodeName := range nodesToAddScheduleInfo { + if err := addScheduleInfoToNode(nodeName, runtimeInfo, client); err != nil { + rootLog.Info("Failed to add schedule info to node, continuing", "node", nodeName, "error", err) } } - return nil + // Remove schedule info from old nodes + for _, nodeName := range nodesToRemoveScheduleInfo { + if err := removeScheduleInfoFromNode(nodeName, runtimeInfo, client); err != nil { + rootLog.Info("Failed to remove schedule info from node, continuing", "node", nodeName, "error", err) + } + } } func addScheduleInfoToNode(nodeName string, runtimeInfo base.RuntimeInfoInterface, client client.Client) (err error) { @@ -119,7 +167,7 @@ func addScheduleInfoToNode(nodeName string, runtimeInfo base.RuntimeInfoInterfac return err } - if CheckIfRuntimeInNode(node, runtimeInfo) { + if hasRuntimeLabel(node, runtimeInfo) { rootLog.Info("Node already added schedule info, skip.", "node", nodeName) return } @@ -141,7 +189,7 @@ func removeScheduleInfoFromNode(nodeName string, runtimeInfo base.RuntimeInfoInt return err } - if !CheckIfRuntimeInNode(node, runtimeInfo) { + if !hasRuntimeLabel(node, runtimeInfo) { rootLog.Info("Node doesn't have existing schedule info, skip.", "node", nodeName) return } @@ -154,46 +202,14 @@ func removeScheduleInfoFromNode(nodeName string, runtimeInfo base.RuntimeInfoInt return nil } -func getAssignedNodes(runtimeInfo base.RuntimeInfoInterface, cli client.Client) (nodeNames []string, err error) { - var ( - nodeList = &corev1.NodeList{} - runtimeLabel = runtimeInfo.GetRuntimeLabelName() - ) - - nodeNames = []string{} - datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", runtimeLabel)) - if err != nil { - return - } - - err = cli.List(context.TODO(), nodeList, &client.ListOptions{ - LabelSelector: datasetLabels, - }) - if err != nil { - return - } - - for _, node := range nodeList.Items { - nodeNames = append(nodeNames, node.Name) - } - - return -} - -// CheckIfRuntimeInNode checks if the the runtime on this node -func CheckIfRuntimeInNode(node corev1.Node, runtimeInfo base.RuntimeInfoInterface) (found bool) { +// hasRuntimeLabel checks if the node has the runtime label +func hasRuntimeLabel(node corev1.Node, runtimeInfo base.RuntimeInfoInterface) bool { key := runtimeInfo.GetRuntimeLabelName() - return findLabelNameOnNode(node, key) -} - -// findLabelNameOnNode checks if the label exist -func findLabelNameOnNode(node corev1.Node, key string) (found bool) { - labels := node.Labels - if len(labels) == 0 { - return + if len(node.Labels) == 0 { + return false } - _, found = labels[key] - return + _, found := node.Labels[key] + return found } // labelCacheNode adds labels on a selected node to indicate the node is scheduled with corresponding runtime diff --git a/pkg/utils/dataset/lifecycle/node_test.go b/pkg/utils/dataset/lifecycle/node_test.go index 3d5c244b5b1..36af7610135 100644 --- a/pkg/utils/dataset/lifecycle/node_test.go +++ b/pkg/utils/dataset/lifecycle/node_test.go @@ -289,46 +289,6 @@ func TestIncreaseDatasetNum(t *testing.T) { } } -func TestFindLabelNameOnNode(t *testing.T) { - var testCase = []struct { - node *v1.Node - key string - wanted bool - }{ - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "2"}}, - Spec: v1.NodeSpec{}, - }, - key: "abc", - wanted: false, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "1"}}, - Spec: v1.NodeSpec{}, - }, - key: "fluid.io/dataset-num", - wanted: true, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{}, - Spec: v1.NodeSpec{}, - }, - key: "fluid.io/dataset-num", - wanted: false, - }, - } - - for _, test := range testCase { - wanted := findLabelNameOnNode(*test.node, test.key) - if wanted != test.wanted { - t.Errorf("fail to Find the label on node ") - } - } -} - func TestCheckIfRuntimeInNode(t *testing.T) { var testCase = []struct { node *v1.Node @@ -367,7 +327,7 @@ func TestCheckIfRuntimeInNode(t *testing.T) { t.Errorf("fail to create the runtimeInfo with error %v", err) } - found := CheckIfRuntimeInNode(*test.node, runtimeInfo) + found := hasRuntimeLabel(*test.node, runtimeInfo) if found != test.wanted { t.Errorf("fail to Find the label on node ") From aa87bb65974536c38bfe50c6506df66e2a9fe387 Mon Sep 17 00:00:00 2001 From: TrafalgarZZZ Date: Sat, 6 Sep 2025 23:04:05 +0800 Subject: [PATCH 2/6] add unit tests for pkg/utils/dataset/lifecycle.node_test.go Signed-off-by: TrafalgarZZZ --- .../dataset/lifecycle/lifecycle_suite_test.go | 13 + pkg/utils/dataset/lifecycle/node_test.go | 936 +++++++++--------- 2 files changed, 503 insertions(+), 446 deletions(-) create mode 100644 pkg/utils/dataset/lifecycle/lifecycle_suite_test.go diff --git a/pkg/utils/dataset/lifecycle/lifecycle_suite_test.go b/pkg/utils/dataset/lifecycle/lifecycle_suite_test.go new file mode 100644 index 00000000000..d31c920ad60 --- /dev/null +++ b/pkg/utils/dataset/lifecycle/lifecycle_suite_test.go @@ -0,0 +1,13 @@ +package lifecycle_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestLifecycle(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Lifecycle Suite") +} diff --git a/pkg/utils/dataset/lifecycle/node_test.go b/pkg/utils/dataset/lifecycle/node_test.go index 36af7610135..7dee8a5dbf8 100644 --- a/pkg/utils/dataset/lifecycle/node_test.go +++ b/pkg/utils/dataset/lifecycle/node_test.go @@ -17,478 +17,522 @@ package lifecycle import ( - "fmt" - "reflect" - "testing" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - v1 "k8s.io/api/core/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" -) + "sigs.k8s.io/controller-runtime/pkg/client" -var ( - testScheme *runtime.Scheme + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func init() { - testScheme = runtime.NewScheme() - _ = v1.AddToScheme(testScheme) -} - -func TestLabelCacheNode(t *testing.T) { - runtimeInfoExclusive, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoExclusive.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, +var _ = Describe("Dataset Lifecycle Node Tests", Label("pkg.utils.dataset.lifecycle.node_test.go"), func() { + var ( + client client.Client + runtimeInfo base.RuntimeInfoInterface + testScheme *runtime.Scheme + resources []runtime.Object + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + _ = corev1.AddToScheme(testScheme) + _ = appsv1.AddToScheme(testScheme) + _ = datav1alpha1.AddToScheme(testScheme) + + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hbase", "fluid", common.AlluxioRuntime) + Expect(err).To(BeNil()) }) - runtimeInfoShareSpark, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoShareSpark.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, + JustBeforeEach(func() { + client = fake.NewFakeClientWithScheme(testScheme, resources...) }) - runtimeInfoShareHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoShareHbase.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, - }) + Describe("Test labelCacheNode()", func() { + When("given exclusive placement mode runtime", func() { + BeforeEach(func() { + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + }) + + resources = []runtime.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-exclusive", + }, + }, + } + }) - tieredStore := datav1alpha1.TieredStore{ - Levels: []datav1alpha1.Level{ - { - MediumType: common.Memory, - Quota: resource.NewQuantity(1, resource.BinarySI), - }, - { - MediumType: common.SSD, - Quota: resource.NewQuantity(2, resource.BinarySI), - }, - { - MediumType: common.HDD, - Quota: resource.NewQuantity(3, resource.BinarySI), - }, - }, - } - runtimeInfoWithTireStore, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio", base.WithTieredStore(tieredStore)) - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoWithTireStore.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, - }) + It("should add exclusive labels to node", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-exclusive", + }, + } + + err := labelCacheNode(node, runtimeInfo, client) + Expect(err).To(BeNil()) + + gotNode, err := kubeclient.GetNode(client, "test-node-exclusive") + Expect(err).To(BeNil()) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/dataset-num", "1")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-alluxio-fluid-hbase", "true")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-fluid-hbase", "true")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid_exclusive", "fluid_hbase")) + }) + }) + + When("given share placement mode runtime", func() { + BeforeEach(func() { + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, + }) + + resources = []runtime.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-share", + }, + }, + } + }) - nodeInputs := []*v1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-exclusive", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-share", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-tireStore", - }, - }, - } - - testNodes := []runtime.Object{} - for _, nodeInput := range nodeInputs { - testNodes = append(testNodes, nodeInput.DeepCopy()) - } - - client := fake.NewFakeClientWithScheme(testScheme, testNodes...) - - var testCase = []struct { - node v1.Node - runtimeInfo base.RuntimeInfoInterface - wantedMap map[string]string - }{ - { - node: v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-exclusive", - }, - }, - runtimeInfo: runtimeInfoExclusive, - wantedMap: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", - "fluid_exclusive": "fluid_hbase", - }, - }, - { - node: v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-share", - }, - }, - runtimeInfo: runtimeInfoShareSpark, - wantedMap: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-t-fluid-spark": "0B", - }, - }, - { - node: v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-share", - }, - }, - runtimeInfo: runtimeInfoShareHbase, - wantedMap: map[string]string{ - "fluid.io/dataset-num": "2", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-t-fluid-spark": "0B", - "fluid.io/s-alluxio-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", - }, - }, - { - node: v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-tireStore", - }, - }, - runtimeInfo: runtimeInfoWithTireStore, - wantedMap: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-d-fluid-spark": "5B", - "fluid.io/s-h-alluxio-m-fluid-spark": "1B", - "fluid.io/s-h-alluxio-t-fluid-spark": "6B", - "fluid_exclusive": "fluid_spark", - }, - }, - } - - for _, test := range testCase { - err := labelCacheNode(test.node, test.runtimeInfo, client) - if err != nil { - t.Errorf("fail to exec the function with the error %v", err) - } - node, err := kubeclient.GetNode(client, test.node.Name) - if err != nil { - fmt.Println(err) - } - if !reflect.DeepEqual(node.Labels, test.wantedMap) { - t.Errorf("fail to update the labels") - } - } -} - -func TestDecreaseDatasetNum(t *testing.T) { - var testCase = []struct { - node *v1.Node - runtimeInfo base.RuntimeInfo - expectedResult common.LabelsToModify - }{ - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "2"}}, - Spec: v1.NodeSpec{}, - }, - runtimeInfo: base.RuntimeInfo{}, - expectedResult: common.LabelsToModify{}, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "1"}}, - Spec: v1.NodeSpec{}, - }, - runtimeInfo: base.RuntimeInfo{}, - expectedResult: common.LabelsToModify{}, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "test"}}, - Spec: v1.NodeSpec{}, - }, - runtimeInfo: base.RuntimeInfo{}, - expectedResult: common.LabelsToModify{}, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{}, - Spec: v1.NodeSpec{}, - }, - runtimeInfo: base.RuntimeInfo{}, - expectedResult: common.LabelsToModify{}, - }, - } - - testCase[0].expectedResult.Update("fluid.io/dataset-num", "1") - testCase[1].expectedResult.Delete("fluid.io/dataset-num") - - for _, test := range testCase { - var labels common.LabelsToModify - _ = DecreaseDatasetNum(test.node, &test.runtimeInfo, &labels) - if !reflect.DeepEqual(labels.GetLabels(), test.expectedResult.GetLabels()) { - t.Errorf("fail to exec the function with the error ") - } - - } -} - -func TestIncreaseDatasetNum(t *testing.T) { - var testCase = []struct { - node *v1.Node - runtimeInfo base.RuntimeInfo - expectedResult common.LabelsToModify - }{ - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "1"}}, - Spec: v1.NodeSpec{}, - }, - runtimeInfo: base.RuntimeInfo{}, - expectedResult: common.LabelsToModify{}, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{}, - Spec: v1.NodeSpec{}, - }, - runtimeInfo: base.RuntimeInfo{}, - expectedResult: common.LabelsToModify{}, - }, - } - - testCase[0].expectedResult.Update("fluid.io/dataset-num", "2") - testCase[1].expectedResult.Add("fluid.io/dataset-num", "1") - - for _, test := range testCase { - var labels common.LabelsToModify - _ = increaseDatasetNum(test.node, &test.runtimeInfo, &labels) - if !reflect.DeepEqual(labels.GetLabels(), test.expectedResult.GetLabels()) { - t.Errorf("fail to exec the function with the error ") - } - } -} - -func TestCheckIfRuntimeInNode(t *testing.T) { - var testCase = []struct { - node *v1.Node - key string - wanted bool - }{ - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "2"}}, - Spec: v1.NodeSpec{}, - }, - key: "abc", - wanted: false, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/s-alluxio-fluid-hbase": "true"}}, - Spec: v1.NodeSpec{}, - }, - - wanted: true, - }, - { - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{}, - Spec: v1.NodeSpec{}, - }, - - wanted: false, - }, - } - - for _, test := range testCase { - runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - found := hasRuntimeLabel(*test.node, runtimeInfo) - - if found != test.wanted { - t.Errorf("fail to Find the label on node ") - } - } -} - -func TestUnlabelCacheNode(t *testing.T) { - runtimeInfoExclusive, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoExclusive.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + It("should add share mode labels to node", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-share", + }, + } + + err := labelCacheNode(node, runtimeInfo, client) + Expect(err).To(BeNil()) + + gotNode, err := kubeclient.GetNode(client, "test-node-share") + Expect(err).To(BeNil()) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/dataset-num", "1")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-alluxio-fluid-hbase", "true")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-fluid-hbase", "true")) + Expect(gotNode.Labels).NotTo(HaveKey("fluid_exclusive")) + }) + }) + + When("given runtime with tiered store", func() { + BeforeEach(func() { + tieredStore := datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{ + { + MediumType: common.Memory, + Quota: resource.NewQuantity(1<<30, resource.BinarySI), + }, + { + MediumType: common.SSD, + Quota: resource.NewQuantity(2<<30, resource.BinarySI), + }, + { + MediumType: common.HDD, + Quota: resource.NewQuantity(3<<30, resource.BinarySI), + }, + }, + } + + var err error + runtimeInfo, err = base.BuildRuntimeInfo("spark", "fluid", "alluxio", base.WithTieredStore(tieredStore)) + Expect(err).To(BeNil()) + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + }) + + resources = []runtime.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-tiered", + }, + }, + } + }) + + It("should add capacity labels to node", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-tiered", + }, + } + + err := labelCacheNode(node, runtimeInfo, client) + Expect(err).To(BeNil()) + + gotNode, err := kubeclient.GetNode(client, "test-node-tiered") + Expect(err).To(BeNil()) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-h-alluxio-m-fluid-spark", "1GiB")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-h-alluxio-d-fluid-spark", "5GiB")) + Expect(gotNode.Labels).To(HaveKeyWithValue("fluid.io/s-h-alluxio-t-fluid-spark", "6GiB")) + }) + }) }) - runtimeInfoShareSpark, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoShareSpark.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, + Describe("Test DecreaseDatasetNum()", func() { + When("node has dataset-num label with value 2", func() { + It("should decrease dataset number to 1", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "2"}}, + } + runtimeInfo := &base.RuntimeInfo{} + + var labels common.LabelsToModify + err := DecreaseDatasetNum(node, runtimeInfo, &labels) + Expect(err).To(BeNil()) + + // Check that the label modification is correct + modifications := labels.GetLabels() + Expect(modifications).To(HaveLen(1)) + Expect(modifications[0].GetLabelKey()).To(Equal("fluid.io/dataset-num")) + Expect(modifications[0].GetLabelValue()).To(Equal("1")) + Expect(modifications[0].GetOperationType()).To(Equal(common.UpdateLabel)) + }) + }) + + When("node has dataset-num label with value 1", func() { + It("should delete dataset-num label", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "1"}}, + } + runtimeInfo := &base.RuntimeInfo{} + + var labels common.LabelsToModify + err := DecreaseDatasetNum(node, runtimeInfo, &labels) + Expect(err).To(BeNil()) + + // Check that the label is marked for deletion + modifications := labels.GetLabels() + Expect(modifications).To(HaveLen(1)) + Expect(modifications[0].GetLabelKey()).To(Equal("fluid.io/dataset-num")) + Expect(modifications[0].GetOperationType()).To(Equal(common.DeleteLabel)) + }) + }) + + When("node has invalid dataset-num label", func() { + It("should return error", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "invalid"}}, + } + runtimeInfo := &base.RuntimeInfo{} + + var labels common.LabelsToModify + err := DecreaseDatasetNum(node, runtimeInfo, &labels) + Expect(err).NotTo(BeNil()) + }) + }) + + When("node has no dataset-num label", func() { + It("should not modify labels", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{}, + } + runtimeInfo := &base.RuntimeInfo{} + + var labels common.LabelsToModify + err := DecreaseDatasetNum(node, runtimeInfo, &labels) + Expect(err).To(BeNil()) + Expect(labels.GetLabels()).To(BeEmpty()) + }) + }) }) - runtimeInfoShareHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoShareHbase.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, + Describe("Test increaseDatasetNum", func() { + When("node has existing dataset-num label", func() { + It("should increase dataset number", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"fluid.io/dataset-num": "1"}}, + } + runtimeInfo := &base.RuntimeInfo{} + + var labels common.LabelsToModify + err := increaseDatasetNum(node, runtimeInfo, &labels) + Expect(err).To(BeNil()) + + modifications := labels.GetLabels() + Expect(modifications).To(HaveLen(1)) + Expect(modifications[0].GetLabelKey()).To(Equal("fluid.io/dataset-num")) + Expect(modifications[0].GetLabelValue()).To(Equal("2")) + Expect(modifications[0].GetOperationType()).To(Equal(common.UpdateLabel)) + }) + }) + + When("node has no dataset-num label", func() { + It("should add dataset-num label with value 1", func() { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{}, + } + runtimeInfo := &base.RuntimeInfo{} + + var labels common.LabelsToModify + err := increaseDatasetNum(node, runtimeInfo, &labels) + Expect(err).To(BeNil()) + + modifications := labels.GetLabels() + Expect(modifications).To(HaveLen(1)) + Expect(modifications[0].GetLabelKey()).To(Equal("fluid.io/dataset-num")) + Expect(modifications[0].GetLabelValue()).To(Equal("1")) + Expect(modifications[0].GetOperationType()).To(Equal(common.AddLabel)) + }) + }) }) - tieredStore := datav1alpha1.TieredStore{ - Levels: []datav1alpha1.Level{ - { - MediumType: common.Memory, - Quota: resource.NewQuantity(1, resource.BinarySI), - }, - { - MediumType: common.SSD, - Quota: resource.NewQuantity(2, resource.BinarySI), - }, - { - MediumType: common.HDD, - Quota: resource.NewQuantity(3, resource.BinarySI), - }, - }, - } - runtimeInfoWithTireStore, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio", base.WithTieredStore(tieredStore)) - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoWithTireStore.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + Describe("Test hasRuntimeLabel", func() { + When("node has runtime label", func() { + It("should return true", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"fluid.io/s-alluxio-fluid-hbase": "true"}, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") + Expect(err).To(BeNil()) + + found := hasRuntimeLabel(node, runtimeInfo) + Expect(found).To(BeTrue()) + }) + }) + + When("node has no runtime label", func() { + It("should return false", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"other-label": "value"}, + }, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") + Expect(err).To(BeNil()) + + found := hasRuntimeLabel(node, runtimeInfo) + Expect(found).To(BeFalse()) + }) + }) + + When("node has no labels", func() { + It("should return false", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{}, + } + + runtimeInfo, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") + Expect(err).To(BeNil()) + + found := hasRuntimeLabel(node, runtimeInfo) + Expect(found).To(BeFalse()) + }) + }) }) - var testCases = []struct { - name string - node *v1.Node - runtimeInfo base.RuntimeInfoInterface - wantedMap map[string]string - }{ - { - name: "test-node-exclusive", - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-exclusive", - Labels: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", - "fluid_exclusive": "fluid_hbase", - "test": "abc", + Describe("Test unlabelCacheNode()", func() { + When("given exclusive placement mode runtime", func() { + BeforeEach(func() { + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + }) + + resources = []runtime.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-exclusive", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-alluxio-fluid-hbase": "true", + "fluid.io/s-fluid-hbase": "true", + "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", + "fluid_exclusive": "fluid_hbase", + "test": "abc", + }, + }, + }, + } + }) + + It("should remove fluid labels and keep other labels", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-exclusive", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-alluxio-fluid-hbase": "true", + "fluid.io/s-fluid-hbase": "true", + "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", + "fluid_exclusive": "fluid_hbase", + "test": "abc", + }, }, - }, - }, - runtimeInfo: runtimeInfoExclusive, - wantedMap: map[string]string{ - "test": "abc", - }, - }, - { - name: "test-node-share", - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-share", - Labels: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-t-fluid-spark": "0B", + } + + err := unlabelCacheNode(node, runtimeInfo, client) + Expect(err).To(BeNil()) + + gotNode, err := kubeclient.GetNode(client, "test-node-exclusive") + Expect(err).To(BeNil()) + Expect(gotNode.Labels).To(HaveKeyWithValue("test", "abc")) + Expect(gotNode.Labels).NotTo(HaveKey("fluid.io/s-alluxio-fluid-hbase")) + Expect(gotNode.Labels).NotTo(HaveKey("fluid_exclusive")) + }) + }) + + When("given share placement mode runtime", func() { + BeforeEach(func() { + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, + }) + + resources = []runtime.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-share", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-alluxio-fluid-hbase": "true", + "fluid.io/s-fluid-hbase": "true", + "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", + }, + }, }, - }, - }, - runtimeInfo: runtimeInfoShareSpark, - wantedMap: map[string]string{}, - }, { - name: "test-node-share-1", - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-share-1", - Labels: map[string]string{ - "fluid.io/dataset-num": "2", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-t-fluid-spark": "0B", - "fluid.io/s-alluxio-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-alluxio-t-fluid-hbase": "0B", + } + }) + + It("should remove all fluid labels", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-share", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-alluxio-fluid-spark": "true", + "fluid.io/s-fluid-spark": "true", + "fluid.io/s-h-alluxio-t-fluid-spark": "0B", + }, }, - }, - }, - runtimeInfo: runtimeInfoShareHbase, - wantedMap: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-t-fluid-spark": "0B", - }, - }, { - name: "test-node-tireStore", - node: &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-tireStore", - Labels: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-alluxio-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-alluxio-d-fluid-spark": "5B", - "fluid.io/s-h-alluxio-m-fluid-spark": "1B", - "fluid.io/s-h-alluxio-t-fluid-spark": "6B", - "fluid_exclusive": "fluid_spark", + } + + err := unlabelCacheNode(node, runtimeInfo, client) + Expect(err).To(BeNil()) + + gotNode, err := kubeclient.GetNode(client, "test-node-share") + Expect(err).To(BeNil()) + Expect(gotNode.Labels).To(BeEmpty()) + }) + }) + + When("given runtime with tiered store", func() { + BeforeEach(func() { + tieredStore := datav1alpha1.TieredStore{ + Levels: []datav1alpha1.Level{ + { + MediumType: common.Memory, + Quota: resource.NewQuantity(1, resource.BinarySI), + }, + { + MediumType: common.SSD, + Quota: resource.NewQuantity(2, resource.BinarySI), + }, + { + MediumType: common.HDD, + Quota: resource.NewQuantity(3, resource.BinarySI), + }, }, - }, - }, - runtimeInfo: runtimeInfoWithTireStore, - wantedMap: map[string]string{}, - }, - } - testNodes := []runtime.Object{} - for _, test := range testCases { - testNodes = append(testNodes, test.node) - } - - client := fake.NewFakeClientWithScheme(testScheme, testNodes...) - - for _, test := range testCases { - err := unlabelCacheNode(*test.node, test.runtimeInfo, client) - if err != nil { - t.Errorf("fail to exec the function with the error %v", err) - } - node, err := kubeclient.GetNode(client, test.node.Name) - if err != nil { - fmt.Println(err) - } - if len(node.Labels) == 0 && len(test.wantedMap) == 0 { - continue - } - if !reflect.DeepEqual(node.Labels, test.wantedMap) { - t.Errorf("test case %v fail to delete the labels, wanted %v, got %v", test.name, test.wantedMap, node.Labels) - } - } -} + } + + var err error + runtimeInfo, err = base.BuildRuntimeInfo("spark", "fluid", "alluxio", base.WithTieredStore(tieredStore)) + Expect(err).To(BeNil()) + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + }) + + resources = []runtime.Object{ + &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-tiered", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-alluxio-fluid-spark": "true", + "fluid.io/s-fluid-spark": "true", + "fluid.io/s-h-alluxio-d-fluid-spark": "5B", + "fluid.io/s-h-alluxio-m-fluid-spark": "1B", + "fluid.io/s-h-alluxio-t-fluid-spark": "6B", + "fluid_exclusive": "fluid_spark", + }, + }, + }, + } + }) + + It("should remove all fluid labels including capacity labels", func() { + node := corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-tiered", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-alluxio-fluid-spark": "true", + "fluid.io/s-fluid-spark": "true", + "fluid.io/s-h-alluxio-d-fluid-spark": "5B", + "fluid.io/s-h-alluxio-m-fluid-spark": "1B", + "fluid.io/s-h-alluxio-t-fluid-spark": "6B", + "fluid_exclusive": "fluid_spark", + }, + }, + } + + err := unlabelCacheNode(node, runtimeInfo, client) + Expect(err).To(BeNil()) + + gotNode, err := kubeclient.GetNode(client, "test-node-tiered") + Expect(err).To(BeNil()) + Expect(gotNode.Labels).To(BeEmpty()) + }) + }) + }) + + Describe("Test calculateNodeDifferences", func() { + Context("when given current and previous node lists", func() { + It("should calculate correct differences", func() { + currentNodes := []string{"node-1", "node-2", "node-3"} + previousNodes := []string{"node-2", "node-3", "node-4"} + + nodesToAdd, nodesToRemove := calculateNodeDifferences(currentNodes, previousNodes) + + Expect(nodesToAdd).To(Equal([]string{"node-1"})) + Expect(nodesToRemove).To(Equal([]string{"node-4"})) + }) + }) + + Context("when given identical node lists", func() { + It("should return empty differences", func() { + currentNodes := []string{"node-1", "node-2"} + previousNodes := []string{"node-1", "node-2"} + + nodesToAdd, nodesToRemove := calculateNodeDifferences(currentNodes, previousNodes) + + Expect(nodesToAdd).To(BeEmpty()) + Expect(nodesToRemove).To(BeEmpty()) + }) + }) + + Context("when given completely different node lists", func() { + It("should return all nodes as differences", func() { + currentNodes := []string{"node-1", "node-2"} + previousNodes := []string{"node-3", "node-4"} + + nodesToAdd, nodesToRemove := calculateNodeDifferences(currentNodes, previousNodes) + + Expect(nodesToAdd).To(Equal([]string{"node-1", "node-2"})) + Expect(nodesToRemove).To(Equal([]string{"node-3", "node-4"})) + }) + }) + }) +}) From 2ba3b96d8af939733ecb54d8edf7cf4adce6e104 Mon Sep 17 00:00:00 2001 From: TrafalgarZZZ Date: Sun, 7 Sep 2025 13:51:47 +0800 Subject: [PATCH 3/6] add unit tests for pkg/utils/dataset/lifecycle.node_test.go Signed-off-by: TrafalgarZZZ --- pkg/utils/dataset/lifecycle/node_test.go | 156 +++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/pkg/utils/dataset/lifecycle/node_test.go b/pkg/utils/dataset/lifecycle/node_test.go index 7dee8a5dbf8..668ac3836c0 100644 --- a/pkg/utils/dataset/lifecycle/node_test.go +++ b/pkg/utils/dataset/lifecycle/node_test.go @@ -17,6 +17,9 @@ package lifecycle import ( + "context" + "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" @@ -27,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" . "github.com/onsi/ginkgo/v2" @@ -535,4 +539,156 @@ var _ = Describe("Dataset Lifecycle Node Tests", Label("pkg.utils.dataset.lifecy }) }) }) + + Describe("Test SyncScheduleInfoToCacheNodes", func() { + var ( + nodes []*corev1.Node + pods []*corev1.Pod + sts *appsv1.StatefulSet + ) + + BeforeEach(func() { + nodeNumToMock := 3 + for i := 0; i < nodeNumToMock; i++ { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node-%d", i), + }, + } + nodes = append(nodes, node) + } + + // mock a pod for each node + for i := 0; i < nodeNumToMock; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("hbase-worker-%d", i), + Namespace: "fluid", + Labels: map[string]string{ + "app": "hbase-worker", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "hbase-worker", + UID: "test-uid", + Controller: ptr.To(true), + }, + }, + }, + Spec: corev1.PodSpec{ + NodeName: fmt.Sprintf("node-%d", i), + }, + } + pods = append(pods, pod) + } + + sts = &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fluid", + Name: "hbase-worker", + UID: "test-uid", + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "hbase-worker", + }, + }, + }, + } + }) + + When("calling SyncScheduleInfoToCacheNodes several times to mock runtime scaling", func() { + + BeforeEach(func() { + // 复用现有的 runtimeInfo 设置 + runtimeInfo.SetupWithDataset(&datav1alpha1.Dataset{ + Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, + }) + + resources = []runtime.Object{sts} + for _, node := range nodes { + resources = append(resources, node) + } + }) + + It("should successfully add or remove schedule info on proper nodes", func() { + By("scaling two pods to node-0 and node-1, the two nodes should have runtime labels", func() { + Expect(client.Create(context.TODO(), pods[0])).To(Succeed()) + Expect(client.Create(context.TODO(), pods[1])).To(Succeed()) + + Expect(SyncScheduleInfoToCacheNodes(runtimeInfo, client)).To(Succeed()) + + gotNode0, err := kubeclient.GetNode(client, "node-0") + Expect(err).To(BeNil()) + Expect(gotNode0).NotTo(BeNil()) + Expect(gotNode0.Labels).To(HaveKeyWithValue(runtimeInfo.GetRuntimeLabelName(), "true")) + + gotNode1, err := kubeclient.GetNode(client, "node-1") + Expect(err).To(BeNil()) + Expect(gotNode1).NotTo(BeNil()) + Expect(gotNode1.Labels).To(HaveKeyWithValue(runtimeInfo.GetRuntimeLabelName(), "true")) + + gotNode2, err := kubeclient.GetNode(client, "node-2") + Expect(err).To(BeNil()) + Expect(gotNode2).NotTo(BeNil()) + Expect(gotNode2.Labels).NotTo(HaveKey(runtimeInfo.GetRuntimeLabelName())) + }) + + By("scaling out a third pod to node-2, all pods should have runtime labels", func() { + Expect(client.Create(context.TODO(), pods[2])).To(Succeed()) + + Expect(SyncScheduleInfoToCacheNodes(runtimeInfo, client)).To(Succeed()) + + for i := 0; i < 3; i++ { + gotNode, err := kubeclient.GetNode(client, fmt.Sprintf("node-%d", i)) + Expect(err).To(BeNil()) + Expect(gotNode).NotTo(BeNil()) + Expect(gotNode.Labels).To(HaveKeyWithValue(runtimeInfo.GetRuntimeLabelName(), "true")) + } + }) + + By("scaling in pods on node-1 and node-2, only node-0 should have runtime labels", func() { + Expect(client.Delete(context.TODO(), pods[2])).To(Succeed()) + Expect(client.Delete(context.TODO(), pods[1])).To(Succeed()) + + Expect(SyncScheduleInfoToCacheNodes(runtimeInfo, client)).To(Succeed()) + + gotNode0, err := kubeclient.GetNode(client, "node-0") + Expect(err).To(BeNil()) + Expect(gotNode0).NotTo(BeNil()) + Expect(gotNode0.Labels).To(HaveKeyWithValue(runtimeInfo.GetRuntimeLabelName(), "true")) + + gotNode1, err := kubeclient.GetNode(client, "node-1") + Expect(err).To(BeNil()) + Expect(gotNode1).NotTo(BeNil()) + Expect(gotNode1.Labels).NotTo(HaveKey(runtimeInfo.GetRuntimeLabelName())) + + gotNode2, err := kubeclient.GetNode(client, "node-2") + Expect(err).To(BeNil()) + Expect(gotNode2).NotTo(BeNil()) + Expect(gotNode2.Labels).NotTo(HaveKey(runtimeInfo.GetRuntimeLabelName())) + }) + + By("scaling in the last pod node node-0, now all nodes should not have runtime labels", func() { + Expect(client.Delete(context.TODO(), pods[0])).To(Succeed()) + + Expect(SyncScheduleInfoToCacheNodes(runtimeInfo, client)).To(Succeed()) + + for i := 0; i < 3; i++ { + gotNode, err := kubeclient.GetNode(client, fmt.Sprintf("node-%d", i)) + Expect(err).To(BeNil()) + Expect(gotNode).NotTo(BeNil()) + Expect(gotNode.Labels).NotTo(HaveKey(runtimeInfo.GetRuntimeLabelName())) + } + }) + }) + }) + }) }) From ad4d49e3597fd0e291a4f01b0f9f6c2fc63b8027 Mon Sep 17 00:00:00 2001 From: TrafalgarZZZ Date: Sun, 7 Sep 2025 16:35:48 +0800 Subject: [PATCH 4/6] add unit tests for util functions in package dataset/volume Signed-off-by: TrafalgarZZZ --- pkg/utils/dataset/volume/create_test.go | 312 ++++++------------ pkg/utils/dataset/volume/delete_test.go | 267 ++++----------- pkg/utils/dataset/volume/deprecated_test.go | 105 +++--- pkg/utils/dataset/volume/get_test.go | 133 +++++--- pkg/utils/dataset/volume/volume_suite_test.go | 13 + 5 files changed, 307 insertions(+), 523 deletions(-) create mode 100644 pkg/utils/dataset/volume/volume_suite_test.go diff --git a/pkg/utils/dataset/volume/create_test.go b/pkg/utils/dataset/volume/create_test.go index bc0357003b9..0336c1abfd6 100644 --- a/pkg/utils/dataset/volume/create_test.go +++ b/pkg/utils/dataset/volume/create_test.go @@ -2,140 +2,46 @@ package volume import ( "context" - "testing" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func TestCreatePersistentVolumeForRuntime(t *testing.T) { - // runtimeInfoExclusive is a runtimeInfo with ExclusiveMode with a PV already in use. - runtimeInfoHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - // runtimeInfoExclusive is a runtimeInfo in global mode with no correspond PV. - runtimeInfoSpark, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoSpark.SetFuseNodeSelector(map[string]string{"test-node": "true"}) - - // runtimeInfoShare is a runtimeInfo in non global mode with no correspond PV. - runtimeInfoHadoop, err := base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - testPVInputs := []*v1.PersistentVolume{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fluid-hbase", - Annotations: map[string]string{ - "CreatedBy": "fluid", - }, - }, - Spec: v1.PersistentVolumeSpec{}, - }} - testObjs := []runtime.Object{} - for _, pvInput := range testPVInputs { - testObjs = append(testObjs, pvInput.DeepCopy()) - } - - testDatasetInputs := []*datav1alpha1.Dataset{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - Spec: datav1alpha1.DatasetSpec{}, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "spark", - Namespace: "fluid", - }, - Spec: datav1alpha1.DatasetSpec{}, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "hadoop", - Namespace: "fluid", - }, - Spec: datav1alpha1.DatasetSpec{}, - }, - } - for _, datasetInput := range testDatasetInputs { - testObjs = append(testObjs, datasetInput.DeepCopy()) - } - client := fake.NewFakeClientWithScheme(testScheme, testObjs...) - - var testCase = []struct { - runtimeInfo base.RuntimeInfoInterface - mountPath string - mountType string - expectedPVNum int - }{ - { - runtimeInfo: runtimeInfoHbase, - mountPath: "/runtimeInfoHbase", - mountType: "alluxio", - expectedPVNum: 1, - }, - { - runtimeInfo: runtimeInfoSpark, - mountPath: "/runtimeInfoSpark", - mountType: "alluxio", - expectedPVNum: 2, - }, - { - runtimeInfo: runtimeInfoHadoop, - mountPath: "/runtimeInfoHadoop", - mountType: "alluxio", - expectedPVNum: 3, - }, - } - - for _, test := range testCase { - var log = ctrl.Log.WithName("delete") - err := CreatePersistentVolumeForRuntime(client, test.runtimeInfo, test.mountPath, test.mountType, log) - if err != nil { - t.Errorf("fail to exec the function with error %v", err) - return - } - var pvs v1.PersistentVolumeList - err = client.List(context.TODO(), &pvs) - if err != nil { - t.Errorf("fail to exec the function with error %v", err) - return - } - if len(pvs.Items) != test.expectedPVNum { - t.Errorf("fail to create the pv") - } - } -} - -func TestCreatePersistentVolumeClaimForRuntime(t *testing.T) { - runtimeInfoHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoHbase.SetFuseName("hbase-fuse") - - runtimeInfoSpark, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoSpark.SetFuseName("spark-fuse") - - testDsInputs := []*appsv1.DaemonSet{ - { +var _ = Describe("Create Volume Tests", Label("pkg.utils.dataset.volume.create_test.go"), func() { + var ( + scheme *runtime.Scheme + client client.Client + runtimeInfo base.RuntimeInfoInterface + dataset *datav1alpha1.Dataset + daemonset *appsv1.DaemonSet + resources []runtime.Object + log logr.Logger + ) + + BeforeEach(func() { + scheme = runtime.NewScheme() + _ = v1.AddToScheme(scheme) + _ = appsv1.AddToScheme(scheme) + _ = datav1alpha1.AddToScheme(scheme) + + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hbase", "fluid", "alluxio") + Expect(err).To(BeNil()) + + dataset = &datav1alpha1.Dataset{ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}} + + daemonset = &appsv1.DaemonSet{ ObjectMeta: metav1.ObjectMeta{ Name: "hbase-fuse", Namespace: "fluid", @@ -145,100 +51,78 @@ func TestCreatePersistentVolumeClaimForRuntime(t *testing.T) { Spec: v1.PodSpec{ Containers: []v1.Container{ { - Image: "fuse-image:v1", - }, - }, - }, - }, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "spark-fuse", - Namespace: "fluid", - }, - Spec: appsv1.DaemonSetSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Image: "fuse-image:v1", + Image: "fuse:v1", }, }, }, }, }, - }, - } - - testPVCInputs := []*v1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - Annotations: map[string]string{ - "CreatedBy": "fluid", - }, - }, - Spec: v1.PersistentVolumeClaimSpec{}, - }} - testObjs := []runtime.Object{} - for _, pvcInput := range testPVCInputs { - testObjs = append(testObjs, pvcInput.DeepCopy()) - } - for _, dsInput := range testDsInputs { - testObjs = append(testObjs, dsInput.DeepCopy()) - } - - testDatasetInputs := []*datav1alpha1.Dataset{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - Spec: datav1alpha1.DatasetSpec{}, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "spark", - Namespace: "fluid", - }, - Spec: datav1alpha1.DatasetSpec{}, - }, - } - for _, datasetInput := range testDatasetInputs { - testObjs = append(testObjs, datasetInput.DeepCopy()) - } - - client := fake.NewFakeClientWithScheme(testScheme, testObjs...) - var testCase = []struct { - runtimeInfo base.RuntimeInfoInterface - expectedPVCNum int - }{ - { - runtimeInfo: runtimeInfoHbase, - expectedPVCNum: 1, - }, - { - runtimeInfo: runtimeInfoSpark, - expectedPVCNum: 2, - }, - } - - for _, test := range testCase { - var log = ctrl.Log.WithName("delete") - err := CreatePersistentVolumeClaimForRuntime(client, test.runtimeInfo, log) - if err != nil { - t.Errorf("fail to exec the function with error %v", err) - return } - var pvs v1.PersistentVolumeClaimList - err = client.List(context.TODO(), &pvs) - if err != nil { - t.Errorf("fail to exec the function with error %v", err) - return - } - if len(pvs.Items) != test.expectedPVCNum { - t.Errorf("fail to create the pv") + + resources = []runtime.Object{ + dataset, + daemonset, } - } -} + + log = fake.NullLogger() + }) + + JustBeforeEach(func() { + client = fake.NewFakeClientWithScheme(scheme, resources...) + }) + + Context("Test CreatePersistentVolumeForRuntime()", func() { + When("runtime info has defined node affinity on it", func() { + BeforeEach(func() { + runtimeInfo.SetFuseNodeSelector(map[string]string{"test-affinity": "true"}) + }) + It("should create PV with node affinity and annotations", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + Expect(list.Items).To(HaveLen(1)) + pv := list.Items[0] + Expect(pv.Spec.CSI).NotTo(BeNil()) + Expect(pv.Spec.CSI.VolumeAttributes).To(HaveKeyWithValue(common.VolumeAttrFluidPath, "/mnt")) + Expect(pv.Spec.NodeAffinity).NotTo(BeNil()) + }) + }) + + When("PV with fluid annotations is already exists", func() { + BeforeEach(func() { + // Pre-create PV with expected annotations + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtimeInfo.GetPersistentVolumeName(), + Annotations: common.GetExpectedFluidAnnotations(), + }, + } + + resources = append(resources, pv) + }) + It("should skip creating PV if it already exists with fluid annotations", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + Expect(list.Items).To(HaveLen(1)) + }) + }) + }) + + Context("Test CreatePersistentVolumeClaimForRuntime()", func() { + It("should create PVC and record fuse generation if exists", func() { + runtimeInfo.SetFuseName("hbase-fuse") + Expect(CreatePersistentVolumeClaimForRuntime(client, runtimeInfo, log)).To(Succeed()) + var list v1.PersistentVolumeClaimList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + Expect(list.Items).ToNot(BeEmpty()) + }) + + It("should create PVC even if no fuse daemonset found (no generation label)", func() { + Expect(CreatePersistentVolumeClaimForRuntime(client, runtimeInfo, log)).To(Succeed()) + var list v1.PersistentVolumeClaimList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + Expect(list.Items).ToNot(BeEmpty()) + }) + }) +}) diff --git a/pkg/utils/dataset/volume/delete_test.go b/pkg/utils/dataset/volume/delete_test.go index cd3b727ff48..4d740497961 100644 --- a/pkg/utils/dataset/volume/delete_test.go +++ b/pkg/utils/dataset/volume/delete_test.go @@ -17,218 +17,79 @@ limitations under the License. package volume import ( - "context" - "testing" "time" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" - apierrs "k8s.io/apimachinery/pkg/api/errors" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func TestDeleteFusePersistentVolume(t *testing.T) { - runtimeInfoHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - runtimeInfoHadoop, err := base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - testPVInputs := []*v1.PersistentVolume{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fluid-hadoop", - Annotations: map[string]string{ - "CreatedBy": "fluid", - }, - }, - Spec: v1.PersistentVolumeSpec{}, - }, { - ObjectMeta: metav1.ObjectMeta{ - Name: "fluid-hbase", - }, - Spec: v1.PersistentVolumeSpec{}, - }} - - testPVs := []runtime.Object{} - for _, pvInput := range testPVInputs { - testPVs = append(testPVs, pvInput.DeepCopy()) - } - client := fake.NewFakeClientWithScheme(testScheme, testPVs...) - var testCase = []struct { - runtimeInfo base.RuntimeInfoInterface - expectedDeleted bool - pvName string - }{ - { - runtimeInfo: runtimeInfoHadoop, - pvName: "fluid-hadoop", - expectedDeleted: true, - }, - { - pvName: "fluid-hbase", - runtimeInfo: runtimeInfoHbase, - expectedDeleted: false, - }, - } - for _, test := range testCase { - key := types.NamespacedName{ - Name: test.pvName, - } - pv := &v1.PersistentVolume{} - var log = ctrl.Log.WithName("delete") - err := client.Get(context.TODO(), key, pv) - if err != nil { - t.Errorf("Expect no error, but got %v", err) - } - err = DeleteFusePersistentVolume(client, test.runtimeInfo, log) - if err != nil { - t.Errorf("failed to call DeleteFusePersistentVolume due to %v", err) - } - - err = client.Get(context.TODO(), key, pv) - if apierrs.IsNotFound(err) != test.expectedDeleted { - t.Errorf("testcase %s Expect deleted %v, but got err %v", test.pvName, test.expectedDeleted, err) - } - - } -} - -func TestDeleteFusePersistentVolumeIfExists(t *testing.T) { - testPVInputs := []*v1.PersistentVolume{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Annotations: map[string]string{ - "CreatedBy": "fluid", - }, - }, - Spec: v1.PersistentVolumeSpec{}, - }} - - testPVs := []runtime.Object{} - for _, pvInput := range testPVInputs { - testPVs = append(testPVs, pvInput.DeepCopy()) - } - client := fake.NewFakeClientWithScheme(testScheme, testPVs...) - var testCase = []struct { - pvName string - expectedResult v1.PersistentVolume - }{ - { - pvName: "hadoop", - expectedResult: v1.PersistentVolume{ - TypeMeta: metav1.TypeMeta{ - Kind: "PersistentVolume", - APIVersion: "v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Annotations: map[string]string{ - "CreatedBy": "fluid", - }, - }, - Spec: v1.PersistentVolumeSpec{}, - }, - }, - { - pvName: "hbase", - expectedResult: v1.PersistentVolume{}, - }, - } - for _, test := range testCase { - var log = ctrl.Log.WithName("delete") - _ = deleteFusePersistentVolumeIfExists(client, test.pvName, log) - - key := types.NamespacedName{ - Name: test.pvName, - } - pv := &v1.PersistentVolume{} - err := client.Get(context.TODO(), key, pv) - if !apierrs.IsNotFound(err) { - t.Errorf("testcase %s failed to delete due to %v", test.pvName, err) - } - - } -} - -func TestDeleteFusePersistentVolumeClaim(t *testing.T) { - runtimeInfoHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - runtimeInfoHadoop, err := base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - runtimeInfoForceDelete, err := base.BuildRuntimeInfo("force-delete", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - - testPVCInputs := []*v1.PersistentVolumeClaim{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - Finalizers: []string{"kubernetes.io/pvc-protection"}, - Annotations: common.GetExpectedFluidAnnotations(), - }, - Spec: v1.PersistentVolumeClaimSpec{}, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "force-delete", - Namespace: "fluid", - Finalizers: []string{"kubernetes.io/pvc-protection"}, - Annotations: common.GetExpectedFluidAnnotations(), - DeletionTimestamp: &metav1.Time{Time: time.Now().Add(-35 * time.Second)}, - }, - }, - } - - testPVCs := []runtime.Object{} - for _, pvInput := range testPVCInputs { - testPVCs = append(testPVCs, pvInput.DeepCopy()) - } - - client := fake.NewFakeClientWithScheme(testScheme, testPVCs...) - - var testCase = []struct { +var _ = Describe("Delete Volume Tests", Label("pkg.utils.dataset.volume.delete_test.go"), func() { + var ( + scheme *runtime.Scheme + clientObj client.Client runtimeInfo base.RuntimeInfoInterface - isErr bool - }{ - { - runtimeInfo: runtimeInfoHadoop, - isErr: false, - }, - { - runtimeInfo: runtimeInfoHbase, - isErr: true, - }, - { - runtimeInfo: runtimeInfoForceDelete, - isErr: false, - }, - } - for _, test := range testCase { - var log = ctrl.Log.WithName("delete") - if err := DeleteFusePersistentVolumeClaim(client, test.runtimeInfo, log); test.isErr != (err != nil) { - if test.isErr { - t.Errorf("Expected got error, but got nil") - } else { - t.Errorf("Expected no error, but got error: %v", err) - } - } - } -} + resources []runtime.Object + log logr.Logger + ) + + BeforeEach(func() { + scheme = runtime.NewScheme() + _ = v1.AddToScheme(scheme) + resources = nil + log = fake.NullLogger() + }) + + JustBeforeEach(func() { + clientObj = fake.NewFakeClientWithScheme(scheme, resources...) + }) + + Context("Test DeleteFusePersistentVolume()", func() { + When("PV is not annotated by fluid", func() { + BeforeEach(func() { + resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "no-anno"}}) + var err error + runtimeInfo, err = base.BuildRuntimeInfo("no-anno", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should no-op and return success", func() { + Expect(DeleteFusePersistentVolume(clientObj, runtimeInfo, log)).To(Succeed()) + }) + }) + + When("PV with fluid annotations exists", func() { + BeforeEach(func() { + resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "fluid-hadoop", Annotations: common.GetExpectedFluidAnnotations()}}) + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should delete the PV successfully", func() { + Expect(DeleteFusePersistentVolume(clientObj, runtimeInfo, log)).To(Succeed()) + }) + }) + }) + + Context("Test DeleteFusePersistentVolumeClaim()", func() { + When("PVC is stuck terminating with pvc-protection finalizer", func() { + BeforeEach(func() { + pvc := &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "force-delete", Namespace: "fluid", Finalizers: []string{"kubernetes.io/pvc-protection"}, Annotations: common.GetExpectedFluidAnnotations(), DeletionTimestamp: &metav1.Time{Time: time.Now().Add(-35 * time.Second)}}} + resources = append(resources, pvc) + var err error + runtimeInfo, err = base.BuildRuntimeInfo("force-delete", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should remove finalizer if needed and succeed", func() { + Expect(DeleteFusePersistentVolumeClaim(clientObj, runtimeInfo, log)).To(Succeed()) + }) + }) + }) +}) diff --git a/pkg/utils/dataset/volume/deprecated_test.go b/pkg/utils/dataset/volume/deprecated_test.go index 6397112793d..9a06dec986f 100644 --- a/pkg/utils/dataset/volume/deprecated_test.go +++ b/pkg/utils/dataset/volume/deprecated_test.go @@ -17,73 +17,66 @@ limitations under the License. package volume import ( - "testing" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" -) + "sigs.k8s.io/controller-runtime/pkg/client" -var ( - testScheme *runtime.Scheme + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func init() { - testScheme = runtime.NewScheme() - _ = v1.AddToScheme(testScheme) - _ = appsv1.AddToScheme(testScheme) - _ = datav1alpha1.AddToScheme(testScheme) -} - -func TestHasDeprecatedPersistentVolumeName(t *testing.T) { - testPVInputs := []*v1.PersistentVolume{{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Annotations: map[string]string{ - "CreatedBy": "fluid", - }, - }, - Spec: v1.PersistentVolumeSpec{}, - }} +var _ = Describe("Deprecated PV Tests", Label("pkg.utils.dataset.volume.deprecated_test.go"), func() { + var ( + scheme *runtime.Scheme + clientObj client.Client + runtimeInfo base.RuntimeInfoInterface + resources []runtime.Object + log logr.Logger + ) - runtimeInfoSpark, err := base.BuildRuntimeInfo("spark", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } + BeforeEach(func() { + scheme = runtime.NewScheme() + _ = v1.AddToScheme(scheme) + _ = appsv1.AddToScheme(scheme) + _ = datav1alpha1.AddToScheme(scheme) + resources = nil + log = fake.NullLogger() + }) - runtimeInfoHbase, err := base.BuildRuntimeInfo("hbase", "fluid", "alluxio") - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } + JustBeforeEach(func() { + clientObj = fake.NewFakeClientWithScheme(scheme, resources...) + }) - testPVs := []runtime.Object{} - for _, pvInput := range testPVInputs { - testPVs = append(testPVs, pvInput.DeepCopy()) - } - client := fake.NewFakeClientWithScheme(testScheme, testPVs...) + When("deprecated PV exists", func() { + BeforeEach(func() { + resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "hbase", Annotations: map[string]string{"CreatedBy": "fluid"}}}) + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hbase", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should return true", func() { + deprecated, err := HasDeprecatedPersistentVolumeName(clientObj, runtimeInfo, log) + Expect(err).To(BeNil()) + Expect(deprecated).To(BeTrue()) + }) + }) - var testCase = []struct { - runtimeInfo base.RuntimeInfoInterface - expectedResult bool - }{ - { - runtimeInfo: runtimeInfoSpark, - expectedResult: false, - }, - { - runtimeInfo: runtimeInfoHbase, - expectedResult: true, - }, - } - for _, test := range testCase { - var log = ctrl.Log.WithName("deprecated") - if result, _ := HasDeprecatedPersistentVolumeName(client, test.runtimeInfo, log); result != test.expectedResult { - t.Errorf("fail to exec the function with the error %v", err) - } - } -} + When("no deprecated PV exists", func() { + BeforeEach(func() { + var err error + runtimeInfo, err = base.BuildRuntimeInfo("spark", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should return false", func() { + deprecated, err := HasDeprecatedPersistentVolumeName(clientObj, runtimeInfo, log) + Expect(err).To(BeNil()) + Expect(deprecated).To(BeFalse()) + }) + }) +}) diff --git a/pkg/utils/dataset/volume/get_test.go b/pkg/utils/dataset/volume/get_test.go index f887c5efe3d..f57ca598f0f 100644 --- a/pkg/utils/dataset/volume/get_test.go +++ b/pkg/utils/dataset/volume/get_test.go @@ -17,61 +17,94 @@ limitations under the License. package volume import ( - "context" - "testing" - + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) -func TestGetNamespacedNameByVolumeId(t *testing.T) { - testPVs := []runtime.Object{} - client := fake.NewFakeClientWithScheme(testScheme, testPVs...) - testCase := []struct { - volumeId string - pv *v1.PersistentVolume - pvc *v1.PersistentVolumeClaim - expectName string - expectNamespace string - wantErr bool - }{ - { - volumeId: "default-test", - pv: &v1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{Name: "default-test"}, - Spec: v1.PersistentVolumeSpec{ClaimRef: &v1.ObjectReference{ - Namespace: "default", - Name: "test", - }}, - }, - pvc: &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - Labels: map[string]string{ - common.LabelAnnotationStorageCapacityPrefix + "default-test": "", - }, - }, - }, - expectName: "test", - expectNamespace: "default", - wantErr: false, - }, - } - for _, test := range testCase { - _ = client.Create(context.TODO(), test.pv) - _ = client.Create(context.TODO(), test.pvc) - namespace, name, err := GetNamespacedNameByVolumeId(client, test.volumeId) - if err != nil { - t.Errorf("failed to call GetNamespacedNameByVolumeId due to %v", err) - } +var _ = Describe("Get helpers", Label("pkg.utils.dataset.volume.get_test.go"), func() { + var ( + scheme *runtime.Scheme + clientObj client.Client + resources []runtime.Object + log logr.Logger + ) + + BeforeEach(func() { + scheme = runtime.NewScheme() + _ = v1.AddToScheme(scheme) + _ = datav1alpha1.AddToScheme(scheme) + resources = nil + log = fake.NullLogger() + }) + + JustBeforeEach(func() { + clientObj = fake.NewFakeClientWithScheme(scheme, resources...) + }) + + Context("Test GetPVCByVolumeId()", func() { + When("pv is bound to a fluid pvc", func() { + BeforeEach(func() { + resources = append(resources, + &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "ns-name"}, Spec: v1.PersistentVolumeSpec{ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "name"}}}, + &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", Labels: map[string]string{common.LabelAnnotationStorageCapacityPrefix + "ns-name": ""}}}, + ) + }) + It("should return the pvc", func() { + got, err := GetPVCByVolumeId(clientObj, "ns-name") + Expect(err).To(BeNil()) + Expect(got).NotTo(BeNil()) + Expect(got.Name).To(Equal("name")) + _ = log + }) + }) + + When("pv is bound to a non-fluid pvc", func() { + BeforeEach(func() { + resources = append(resources, + &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "x"}, Spec: v1.PersistentVolumeSpec{ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "n"}}}, + &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "n", Namespace: "ns"}}, + ) + }) + It("should return error", func() { + _, err := GetPVCByVolumeId(clientObj, "x") + Expect(err).ToNot(BeNil()) + }) + }) + }) + + Context("Test GetNamespacedNameByVolumeId()", func() { + When("pv has claimRef and pvc is fluid", func() { + BeforeEach(func() { + resources = append(resources, + &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "ns2-n"}, Spec: v1.PersistentVolumeSpec{ClaimRef: &v1.ObjectReference{Namespace: "ns2", Name: "n"}}}, + &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "n", Namespace: "ns2", Labels: map[string]string{common.LabelAnnotationStorageCapacityPrefix + "ns2-n": ""}}}, + ) + }) + It("should return namespace and name", func() { + ns, name, err := GetNamespacedNameByVolumeId(clientObj, "ns2-n") + Expect(err).To(BeNil()) + Expect(ns).To(Equal("ns2")) + Expect(name).To(Equal("n")) + }) + }) - if name != test.expectName && namespace != test.expectNamespace { - t.Errorf("testcase %s Expect name %s, but got %s, Expect namespace %s, but got %s", - test.volumeId, test.expectName, name, test.expectNamespace, namespace) - } - } -} + When("pv has nil claimRef", func() { + BeforeEach(func() { + resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "v"}}) + }) + It("should return error", func() { + _, _, err := GetNamespacedNameByVolumeId(clientObj, "v") + Expect(err).ToNot(BeNil()) + }) + }) + }) +}) diff --git a/pkg/utils/dataset/volume/volume_suite_test.go b/pkg/utils/dataset/volume/volume_suite_test.go new file mode 100644 index 00000000000..e2a9db5d629 --- /dev/null +++ b/pkg/utils/dataset/volume/volume_suite_test.go @@ -0,0 +1,13 @@ +package volume + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestVolume(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Volume Suite") +} From 4738cbcc13024721984db6b7d56509a0bdb474fe Mon Sep 17 00:00:00 2001 From: TrafalgarZZZ Date: Mon, 8 Sep 2025 00:08:02 +0800 Subject: [PATCH 5/6] add unit tests for util functions in package dataset/volume Signed-off-by: TrafalgarZZZ --- pkg/utils/dataset/volume/create_test.go | 134 +++++++++++++++++++++++- pkg/utils/dataset/volume/delete_test.go | 79 ++++++++++++-- pkg/utils/dataset/volume/get_test.go | 12 +-- 3 files changed, 208 insertions(+), 17 deletions(-) diff --git a/pkg/utils/dataset/volume/create_test.go b/pkg/utils/dataset/volume/create_test.go index 0336c1abfd6..34e39ce6a87 100644 --- a/pkg/utils/dataset/volume/create_test.go +++ b/pkg/utils/dataset/volume/create_test.go @@ -6,10 +6,12 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -82,9 +84,12 @@ var _ = Describe("Create Volume Tests", Label("pkg.utils.dataset.volume.create_t Expect(client.List(context.TODO(), &list)).To(Succeed()) Expect(list.Items).To(HaveLen(1)) pv := list.Items[0] + Expect(pv.Labels).To(HaveKeyWithValue(runtimeInfo.GetCommonLabelName(), "true")) + Expect(pv.Labels).To(HaveKeyWithValue(common.LabelAnnotationDatasetId, "fluid-hbase")) + Expect(pv.Spec.StorageClassName).To(Equal(common.FluidStorageClass)) Expect(pv.Spec.CSI).NotTo(BeNil()) Expect(pv.Spec.CSI.VolumeAttributes).To(HaveKeyWithValue(common.VolumeAttrFluidPath, "/mnt")) - Expect(pv.Spec.NodeAffinity).NotTo(BeNil()) + Expect(pv.Spec.NodeAffinity.Required.NodeSelectorTerms).To(HaveLen(1)) }) }) @@ -107,6 +112,133 @@ var _ = Describe("Create Volume Tests", Label("pkg.utils.dataset.volume.create_t Expect(list.Items).To(HaveLen(1)) }) }) + + When("Related Dataset has set a explicit access mode", func() { + BeforeEach(func() { + dataset.Spec.AccessModes = append(dataset.Spec.AccessModes, v1.ReadWriteMany) + }) + + It("should create a pv with same access mode", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + Expect(list.Items).To(HaveLen(1)) + pv := list.Items[0] + Expect(pv.Spec.AccessModes).To(HaveLen(1)) + Expect(pv.Spec.AccessModes).To(ContainElement(v1.ReadWriteMany)) + }) + }) + + When("dataset has no explicit access mode", func() { + It("should create pv with default access mode", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + Expect(list.Items).To(HaveLen(1)) + pv := list.Items[0] + Expect(pv.Spec.AccessModes).To(HaveLen(1)) + Expect(pv.Spec.AccessModes).To(ContainElement(v1.ReadOnlyMany)) + }) + }) + + When("dataset has pvc storage capacity annotation set", func() { + BeforeEach(func() { + if dataset.Annotations == nil { + dataset.Annotations = map[string]string{} + } + dataset.Annotations[utils.PVCStorageAnnotation] = "10Gi" + }) + It("should use annotated storage capacity", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + pv := list.Items[0] + q := pv.Spec.Capacity[v1.ResourceStorage] + expected := resource.MustParse("10Gi") + Expect(q.Cmp(expected)).To(Equal(0)) + }) + }) + + When("dataset pvc storage capacity annotation is invalid", func() { + BeforeEach(func() { + if dataset.Annotations == nil { + dataset.Annotations = map[string]string{} + } + dataset.Annotations[utils.PVCStorageAnnotation] = "invalid-size" + }) + It("should fallback to default storage capacity", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + pv := list.Items[0] + q := pv.Spec.Capacity[v1.ResourceStorage] + expected := resource.MustParse(utils.DefaultStorageCapacity) + Expect(q.Cmp(expected)).To(Equal(0)) + }) + }) + + When("creating pv csi attributes", func() { + It("should set mountType/namespace/name and claimRef", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + pv := list.Items[0] + Expect(pv.Spec.CSI).NotTo(BeNil()) + attrs := pv.Spec.CSI.VolumeAttributes + Expect(attrs).To(HaveKeyWithValue(common.VolumeAttrMountType, "alluxio")) + Expect(attrs).To(HaveKeyWithValue(common.VolumeAttrNamespace, runtimeInfo.GetNamespace())) + Expect(attrs).To(HaveKeyWithValue(common.VolumeAttrName, runtimeInfo.GetName())) + Expect(pv.Spec.ClaimRef).NotTo(BeNil()) + Expect(pv.Spec.ClaimRef.Name).To(Equal(runtimeInfo.GetName())) + Expect(pv.Spec.ClaimRef.Namespace).To(Equal(runtimeInfo.GetNamespace())) + }) + }) + + When("runtime annotations contain skip-check-mount-ready target", func() { + BeforeEach(func() { + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hbase", "fluid", "alluxio", base.WithAnnotations(map[string]string{ + common.AnnotationSkipCheckMountReadyTarget: "All", + })) + Expect(err).To(BeNil()) + }) + It("should propagate to pv csi volumeAttributes", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + pv := list.Items[0] + Expect(pv.Spec.CSI.VolumeAttributes).To(HaveKeyWithValue(common.AnnotationSkipCheckMountReadyTarget, "All")) + }) + }) + + When("metadataList selects PV with labels/annotations and symlink method", func() { + BeforeEach(func() { + meta := []datav1alpha1.Metadata{{ + PodMetadata: datav1alpha1.PodMetadata{ + Labels: map[string]string{ + common.LabelNodePublishMethod: common.NodePublishMethodSymlink, + "x-extra": "y", + }, + Annotations: map[string]string{ + "a1": "b1", + }, + }, + Selector: metav1.GroupKind{Group: v1.GroupName, Kind: "PersistentVolume"}, + }} + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hbase", "fluid", "alluxio", base.WithMetadataList(meta)) + Expect(err).To(BeNil()) + }) + It("should merge into pv and set node_publish_method", func() { + Expect(CreatePersistentVolumeForRuntime(client, runtimeInfo, "/mnt", "alluxio", log)).To(Succeed()) + var list v1.PersistentVolumeList + Expect(client.List(context.TODO(), &list)).To(Succeed()) + pv := list.Items[0] + Expect(pv.Labels).To(HaveKeyWithValue("x-extra", "y")) + Expect(pv.Annotations).To(HaveKeyWithValue("a1", "b1")) + Expect(pv.Spec.CSI.VolumeAttributes).To(HaveKeyWithValue(common.NodePublishMethod, common.NodePublishMethodSymlink)) + }) + }) }) Context("Test CreatePersistentVolumeClaimForRuntime()", func() { diff --git a/pkg/utils/dataset/volume/delete_test.go b/pkg/utils/dataset/volume/delete_test.go index 4d740497961..499459de2f8 100644 --- a/pkg/utils/dataset/volume/delete_test.go +++ b/pkg/utils/dataset/volume/delete_test.go @@ -17,15 +17,19 @@ limitations under the License. package volume import ( + "context" "time" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" . "github.com/onsi/ginkgo/v2" @@ -53,32 +57,87 @@ var _ = Describe("Delete Volume Tests", Label("pkg.utils.dataset.volume.delete_t }) Context("Test DeleteFusePersistentVolume()", func() { - When("PV is not annotated by fluid", func() { + When("PV with fluid annotations exists", func() { BeforeEach(func() { - resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "no-anno"}}) + resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "fluid-hadoop", Annotations: common.GetExpectedFluidAnnotations()}}) var err error - runtimeInfo, err = base.BuildRuntimeInfo("no-anno", "fluid", "alluxio") + runtimeInfo, err = base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") Expect(err).To(BeNil()) }) - It("should no-op and return success", func() { + It("should delete the PV successfully", func() { Expect(DeleteFusePersistentVolume(clientObj, runtimeInfo, log)).To(Succeed()) + gotPV, err := kubeclient.GetPersistentVolume(clientObj, "fluid-hadoop") + Expect(err).NotTo(BeNil()) + Expect(apierrs.IsNotFound(err)).To(BeTrue()) + Expect(gotPV).To(BeNil()) }) }) - When("PV with fluid annotations exists", func() { + When("PV has no fluid-expected annotations", func() { BeforeEach(func() { - resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "fluid-hadoop", Annotations: common.GetExpectedFluidAnnotations()}}) + resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "fluid-no-anno"}}) var err error - runtimeInfo, err = base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") + runtimeInfo, err = base.BuildRuntimeInfo("no-anno", "fluid", "alluxio") Expect(err).To(BeNil()) }) - It("should delete the PV successfully", func() { + It("should no-op and return success", func() { Expect(DeleteFusePersistentVolume(clientObj, runtimeInfo, log)).To(Succeed()) + // The PV should still exists + gotPV, err := kubeclient.GetPersistentVolume(clientObj, "fluid-no-anno") + Expect(err).To(BeNil()) + Expect(gotPV).NotTo(BeNil()) }) }) }) Context("Test DeleteFusePersistentVolumeClaim()", func() { + When("PVC with fluid annotations exists", func() { + BeforeEach(func() { + pvc := &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "hadoop", Namespace: "fluid", Annotations: common.GetExpectedFluidAnnotations()}} + resources = append(resources, pvc) + var err error + runtimeInfo, err = base.BuildRuntimeInfo("hadoop", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should delete the PVC successfully", func() { + Expect(DeleteFusePersistentVolumeClaim(clientObj, runtimeInfo, log)).To(Succeed()) + pvc := &v1.PersistentVolumeClaim{} + err := clientObj.Get(context.TODO(), types.NamespacedName{Name: "hadoop", Namespace: "fluid"}, pvc) + Expect(err).NotTo(BeNil()) + Expect(apierrs.IsNotFound(err)).To(BeTrue()) + }) + }) + + When("PVC has no fluid-expected annotations", func() { + BeforeEach(func() { + pvc := &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "no-anno", Namespace: "fluid"}} + resources = append(resources, pvc) + var err error + runtimeInfo, err = base.BuildRuntimeInfo("no-anno", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should no-op and return success", func() { + Expect(DeleteFusePersistentVolumeClaim(clientObj, runtimeInfo, log)).To(Succeed()) + // The PVC should still exist + key := types.NamespacedName{Name: "no-anno", Namespace: "fluid"} + pvc := &v1.PersistentVolumeClaim{} + err := clientObj.Get(context.TODO(), key, pvc) + Expect(err).To(BeNil()) + Expect(pvc).NotTo(BeNil()) + }) + }) + + When("PVC does not exist", func() { + BeforeEach(func() { + var err error + runtimeInfo, err = base.BuildRuntimeInfo("not-exist", "fluid", "alluxio") + Expect(err).To(BeNil()) + }) + It("should no-op and return success", func() { + Expect(DeleteFusePersistentVolumeClaim(clientObj, runtimeInfo, log)).To(Succeed()) + }) + }) + When("PVC is stuck terminating with pvc-protection finalizer", func() { BeforeEach(func() { pvc := &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "force-delete", Namespace: "fluid", Finalizers: []string{"kubernetes.io/pvc-protection"}, Annotations: common.GetExpectedFluidAnnotations(), DeletionTimestamp: &metav1.Time{Time: time.Now().Add(-35 * time.Second)}}} @@ -89,6 +148,10 @@ var _ = Describe("Delete Volume Tests", Label("pkg.utils.dataset.volume.delete_t }) It("should remove finalizer if needed and succeed", func() { Expect(DeleteFusePersistentVolumeClaim(clientObj, runtimeInfo, log)).To(Succeed()) + pvc := &v1.PersistentVolumeClaim{} + err := clientObj.Get(context.TODO(), types.NamespacedName{Name: "force-delete", Namespace: "fluid"}, pvc) + Expect(err).NotTo(BeNil()) + Expect(apierrs.IsNotFound(err)).To(BeTrue()) }) }) }) diff --git a/pkg/utils/dataset/volume/get_test.go b/pkg/utils/dataset/volume/get_test.go index f57ca598f0f..e92c9a907ac 100644 --- a/pkg/utils/dataset/volume/get_test.go +++ b/pkg/utils/dataset/volume/get_test.go @@ -20,7 +20,6 @@ import ( datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" - "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -30,12 +29,11 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("Get helpers", Label("pkg.utils.dataset.volume.get_test.go"), func() { +var _ = Describe("Get helpers related tests", Label("pkg.utils.dataset.volume.get_test.go"), func() { var ( scheme *runtime.Scheme clientObj client.Client resources []runtime.Object - log logr.Logger ) BeforeEach(func() { @@ -43,7 +41,6 @@ var _ = Describe("Get helpers", Label("pkg.utils.dataset.volume.get_test.go"), f _ = v1.AddToScheme(scheme) _ = datav1alpha1.AddToScheme(scheme) resources = nil - log = fake.NullLogger() }) JustBeforeEach(func() { @@ -63,7 +60,6 @@ var _ = Describe("Get helpers", Label("pkg.utils.dataset.volume.get_test.go"), f Expect(err).To(BeNil()) Expect(got).NotTo(BeNil()) Expect(got.Name).To(Equal("name")) - _ = log }) }) @@ -82,11 +78,11 @@ var _ = Describe("Get helpers", Label("pkg.utils.dataset.volume.get_test.go"), f }) Context("Test GetNamespacedNameByVolumeId()", func() { - When("pv has claimRef and pvc is fluid", func() { + When("pv has claimRef and pvc is managed by fluid", func() { BeforeEach(func() { resources = append(resources, &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: "ns2-n"}, Spec: v1.PersistentVolumeSpec{ClaimRef: &v1.ObjectReference{Namespace: "ns2", Name: "n"}}}, - &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "n", Namespace: "ns2", Labels: map[string]string{common.LabelAnnotationStorageCapacityPrefix + "ns2-n": ""}}}, + &v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "n", Namespace: "ns2", Labels: map[string]string{common.LabelAnnotationStorageCapacityPrefix + "ns2-n": "true"}}}, ) }) It("should return namespace and name", func() { @@ -103,7 +99,7 @@ var _ = Describe("Get helpers", Label("pkg.utils.dataset.volume.get_test.go"), f }) It("should return error", func() { _, _, err := GetNamespacedNameByVolumeId(clientObj, "v") - Expect(err).ToNot(BeNil()) + Expect(err).NotTo(BeNil()) }) }) }) From d493f34f71ae3303ffac93f3bae9d6982bdc7076 Mon Sep 17 00:00:00 2001 From: TrafalgarZZZ Date: Mon, 8 Sep 2025 00:35:34 +0800 Subject: [PATCH 6/6] remove unused namespace_test.go Signed-off-by: TrafalgarZZZ --- pkg/controllers/namespace_test.go | 101 ------------------------------ 1 file changed, 101 deletions(-) delete mode 100644 pkg/controllers/namespace_test.go diff --git a/pkg/controllers/namespace_test.go b/pkg/controllers/namespace_test.go deleted file mode 100644 index 649d7344ae0..00000000000 --- a/pkg/controllers/namespace_test.go +++ /dev/null @@ -1,101 +0,0 @@ -/* -Copyright 2020 The Fluid Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "path/filepath" - "testing" - "time" - - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - // +kubebuilder:scaffold:imports -) - -// These tests use Ginkgo (BDD-style Go testing framework). Refer to -// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. - -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment - -func TestAPIs(t *testing.T) { - RegisterFailHandler(Fail) - - RunSpecs(t, - "Controller Suite") -} - -var _ = BeforeSuite(func(done Done) { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, - } - - var err error - cfg, err = testEnv.Start() - Expect(err).ToNot(HaveOccurred()) - Expect(cfg).ToNot(BeNil()) - - err = datav1alpha1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - // +kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).ToNot(HaveOccurred()) - Expect(k8sClient).ToNot(BeNil()) - - close(done) -}, 60) - -var _ = AfterSuite(func() { - By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).ToNot(HaveOccurred()) -}) - -var _ = Describe("Test namespace", func() { - Context("check if given ns exist", func() { - It("this ns exists,and err should be nil", func() { - err := kubeclient.EnsureNamespace(k8sClient, "default") - Expect(err).NotTo(HaveOccurred()) - }) - - It("try to get a non-existed ns,should fail", func() { - err := kubeclient.EnsureNamespace(k8sClient, time.Now().String()) - Expect(err).Should(HaveOccurred()) - }) - }) - Context("test createnamespace", func() { - namespace := "woohoo" - It("check if create ns successfully,err should nil", func() { - err := kubeclient.EnsureNamespace(k8sClient, namespace) - Expect(err).NotTo(HaveOccurred()) - }) - }) -})