Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 0 additions & 101 deletions pkg/controllers/namespace_test.go

This file was deleted.

13 changes: 13 additions & 0 deletions pkg/utils/dataset/lifecycle/lifecycle_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package lifecycle_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestLifecycle(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Lifecycle Suite")
}
152 changes: 84 additions & 68 deletions pkg/utils/dataset/lifecycle/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,63 +51,111 @@ func init() {
rootLog = ctrl.Log.WithName("dataset.lifecycle")
}

func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client client.Client) (err error) {
func SyncScheduleInfoToCacheNodes(runtimeInfo base.RuntimeInfoInterface, client client.Client) error {
defer utils.TimeTrack(time.Now(), "SyncScheduleInfoToCacheNodes", "name", runtimeInfo.GetName(), "namespace", runtimeInfo.GetNamespace())

var (
currentCacheNodeNames []string
previousCacheNodeNames []string
)
// Get current cache nodes from worker pods
desiredNodeNames, err := getDesiredNodesWithScheduleInfo(runtimeInfo, client)
if err != nil {
return err
}

// Get previously assigned cache nodes
actualNodeNames, err := getActualNodesWithScheduleInfo(runtimeInfo, client)
if err != nil {
return err
}

// Calculate node differences
nodesToAddScheduleInfo, nodesToRemoveScheduleInfo := calculateNodeDifferences(desiredNodeNames, actualNodeNames)

// Apply changes to nodes
applyNodeChanges(nodesToAddScheduleInfo, nodesToRemoveScheduleInfo, runtimeInfo, client)

return nil
}

// getDesiredNodesWithScheduleInfo retrieves the desired cache nodes with schedule info
func getDesiredNodesWithScheduleInfo(runtimeInfo base.RuntimeInfoInterface, client client.Client) ([]string, error) {
workers, err := fluidctrl.GetWorkersAsStatefulset(client,
types.NamespacedName{Namespace: runtimeInfo.GetNamespace(), Name: runtimeInfo.GetWorkerStatefulsetName()})
if err != nil {
if fluiderrs.IsDeprecated(err) {
rootLog.Info("Warning: Deprecated mode is not support, so skip handling", "details", err)
return nil
return nil, nil
}
return err
return nil, err
}

workerSelector, err := metav1.LabelSelectorAsSelector(workers.Spec.Selector)
if err != nil {
return nil, err
}

workerPods, err := kubeclient.GetPodsForStatefulSet(client, workers, workerSelector)
if err != nil {
return err
return nil, err
}

var nodeNames []string
for _, pod := range workerPods {
if len(pod.Spec.NodeName) != 0 {
currentCacheNodeNames = append(currentCacheNodeNames, pod.Spec.NodeName)
if pod.Spec.NodeName != "" {
nodeNames = append(nodeNames, pod.Spec.NodeName)
}
}

// find the nodes which already have the runtime labels
previousCacheNodeNames, err = getAssignedNodes(runtimeInfo, client)
return utils.RemoveDuplicateStr(nodeNames), nil
}

currentCacheNodeNames = utils.RemoveDuplicateStr(currentCacheNodeNames)
previousCacheNodeNames = utils.RemoveDuplicateStr(previousCacheNodeNames)
// getActualNodesWithScheduleInfo retrieves the actual cache nodes with schedule info
func getActualNodesWithScheduleInfo(runtimeInfo base.RuntimeInfoInterface, cli client.Client) (nodeNames []string, err error) {
var (
nodeList = &corev1.NodeList{}
runtimeLabel = runtimeInfo.GetRuntimeLabelName()
)

addedCacheNodenames := utils.SubtractString(currentCacheNodeNames, previousCacheNodeNames)
removedCacheNodenames := utils.SubtractString(previousCacheNodeNames, currentCacheNodeNames)
nodeNames = []string{}
datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", runtimeLabel))
if err != nil {
return
}

if len(addedCacheNodenames) > 0 {
for _, nodeName := range addedCacheNodenames {
if innerErr := addScheduleInfoToNode(nodeName, runtimeInfo, client); innerErr != nil {
rootLog.Info(fmt.Sprintf("error when adding schedule info to node: %v, ignore it and continue", innerErr), "node", nodeName)
}
}
err = cli.List(context.TODO(), nodeList, &client.ListOptions{
LabelSelector: datasetLabels,
})
if err != nil {
return
}

if len(removedCacheNodenames) > 0 {
for _, nodeName := range removedCacheNodenames {
if innerErr := removeScheduleInfoFromNode(nodeName, runtimeInfo, client); innerErr != nil {
rootLog.Info(fmt.Sprintf("error when removing schedule info from node: %v, ignore it and continue", innerErr), "node", nodeName)
}
for _, node := range nodeList.Items {
nodeNames = append(nodeNames, node.Name)
}

return utils.RemoveDuplicateStr(nodeNames), nil
}

// calculateNodeDifferences calculates which nodes need to be added or removed
func calculateNodeDifferences(currentNodes, previousNodes []string) (nodesToAddLabels, nodesToRemovedLabels []string) {
nodesToAddLabels = utils.SubtractString(currentNodes, previousNodes)
nodesToRemovedLabels = utils.SubtractString(previousNodes, currentNodes)
return
}

// applyNodeChanges applies the calculated changes to the nodes
func applyNodeChanges(nodesToAddScheduleInfo, nodesToRemoveScheduleInfo []string, runtimeInfo base.RuntimeInfoInterface, client client.Client) {
// Add schedule info to new nodes
for _, nodeName := range nodesToAddScheduleInfo {
if err := addScheduleInfoToNode(nodeName, runtimeInfo, client); err != nil {
rootLog.Info("Failed to add schedule info to node, continuing", "node", nodeName, "error", err)
}
}

return nil
// Remove schedule info from old nodes
for _, nodeName := range nodesToRemoveScheduleInfo {
if err := removeScheduleInfoFromNode(nodeName, runtimeInfo, client); err != nil {
rootLog.Info("Failed to remove schedule info from node, continuing", "node", nodeName, "error", err)
}
}
}

func addScheduleInfoToNode(nodeName string, runtimeInfo base.RuntimeInfoInterface, client client.Client) (err error) {
Expand All @@ -119,7 +167,7 @@ func addScheduleInfoToNode(nodeName string, runtimeInfo base.RuntimeInfoInterfac
return err
}

if CheckIfRuntimeInNode(node, runtimeInfo) {
if hasRuntimeLabel(node, runtimeInfo) {
rootLog.Info("Node already added schedule info, skip.", "node", nodeName)
return
}
Expand All @@ -141,7 +189,7 @@ func removeScheduleInfoFromNode(nodeName string, runtimeInfo base.RuntimeInfoInt
return err
}

if !CheckIfRuntimeInNode(node, runtimeInfo) {
if !hasRuntimeLabel(node, runtimeInfo) {
rootLog.Info("Node doesn't have existing schedule info, skip.", "node", nodeName)
return
}
Expand All @@ -154,46 +202,14 @@ func removeScheduleInfoFromNode(nodeName string, runtimeInfo base.RuntimeInfoInt
return nil
}

func getAssignedNodes(runtimeInfo base.RuntimeInfoInterface, cli client.Client) (nodeNames []string, err error) {
var (
nodeList = &corev1.NodeList{}
runtimeLabel = runtimeInfo.GetRuntimeLabelName()
)

nodeNames = []string{}
datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", runtimeLabel))
if err != nil {
return
}

err = cli.List(context.TODO(), nodeList, &client.ListOptions{
LabelSelector: datasetLabels,
})
if err != nil {
return
}

for _, node := range nodeList.Items {
nodeNames = append(nodeNames, node.Name)
}

return
}

// CheckIfRuntimeInNode checks if the the runtime on this node
func CheckIfRuntimeInNode(node corev1.Node, runtimeInfo base.RuntimeInfoInterface) (found bool) {
// hasRuntimeLabel checks if the node has the runtime label
func hasRuntimeLabel(node corev1.Node, runtimeInfo base.RuntimeInfoInterface) bool {
key := runtimeInfo.GetRuntimeLabelName()
return findLabelNameOnNode(node, key)
}

// findLabelNameOnNode checks if the label exist
func findLabelNameOnNode(node corev1.Node, key string) (found bool) {
labels := node.Labels
if len(labels) == 0 {
return
if len(node.Labels) == 0 {
return false
}
_, found = labels[key]
return
_, found := node.Labels[key]
return found
}

// labelCacheNode adds labels on a selected node to indicate the node is scheduled with corresponding runtime
Expand Down
Loading
Loading