Skip to content

Commit 13ee13a

Browse files
committed
using cache worker set to handle the previous request of statefulset
1 parent 10b3cde commit 13ee13a

File tree

26 files changed

+679
-124
lines changed

26 files changed

+679
-124
lines changed

api/v1alpha1/alluxioruntime_types.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package v1alpha1
1818

1919
import (
20+
"github.com/fluid-cloudnative/fluid/pkg/types/cacheworkerset"
2021
corev1 "k8s.io/api/core/v1"
2122
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2223

@@ -237,11 +238,18 @@ type AlluxioRuntimeSpec struct {
237238
// +optional
238239
RuntimeManagement RuntimeManagement `json:"management,omitempty"`
239240

241+
ScaleConfig AlluxioScaleSpec `json:"scale,omitempty"`
242+
240243
// ImagePullSecrets that will be used to pull images
241244
// +optional
242245
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
243246
}
244247

248+
type AlluxioScaleSpec struct {
249+
WorkerType cacheworkerset.WorkerType `json:"workerType,omitempty"`
250+
ScaleInSlots []int `json:"scaleInSlots,omitempty"`
251+
}
252+
245253
// +kubebuilder:object:root=true
246254
// +kubebuilder:subresource:status
247255
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.currentWorkerNumberScheduled,selectorpath=.status.selector
@@ -259,7 +267,6 @@ type AlluxioRuntimeSpec struct {
259267
// +kubebuilder:resource:scope=Namespaced
260268
// +kubebuilder:resource:categories={fluid},shortName=alluxio
261269
// +genclient
262-
263270
// AlluxioRuntime is the Schema for the alluxioruntimes API
264271
type AlluxioRuntime struct {
265272
metav1.TypeMeta `json:",inline"`

pkg/controllers/v1alpha1/datamigrate/status_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont
165165
if currentJob.Spec.Suspend != nil && *currentJob.Spec.Suspend {
166166
ctx.Log.Info("scale the migrate workers statefulset", "name", utils.GetParallelOperationWorkersName(releaseName))
167167
// scale the stateful set, the job acts as a worker.
168-
err = kubeclient.ScaleStatefulSet(c.Client, utils.GetParallelOperationWorkersName(releaseName), c.dataMigrate.Namespace, c.dataMigrate.Spec.Parallelism-1)
168+
err = kubeclient.ScaleCacheWorkerSet(c.Client, utils.GetParallelOperationWorkersName(releaseName), c.dataMigrate.Namespace, c.dataMigrate.Spec.Parallelism-1)
169169
if err != nil {
170170
return
171171
}

pkg/ctrl/affinity.go

Lines changed: 178 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,40 @@ package ctrl
1919
import (
2020
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
2121
"github.com/fluid-cloudnative/fluid/pkg/common"
22+
"github.com/fluid-cloudnative/fluid/pkg/types/cacheworkerset"
2223
"github.com/fluid-cloudnative/fluid/pkg/utils"
24+
openkruise "github.com/openkruise/kruise/apis/apps/v1beta1"
2325

24-
appsv1 "k8s.io/api/apps/v1"
2526
corev1 "k8s.io/api/core/v1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
)
2829

29-
func (e *Helper) checkWorkerAffinity(workers *appsv1.StatefulSet) (found bool) {
30+
func (e *Helper) checkWorkerAffinity(workers *cacheworkerset.CacheWorkerSet) (found bool) {
31+
32+
if workers.GetAffinity() == nil {
33+
return
34+
}
35+
36+
if workers.GetNodeAffinity() == nil {
37+
return
38+
}
39+
40+
if len(workers.GetNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution()) == 0 {
41+
return
42+
}
43+
44+
for _, preferred := range workers.GetNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution() {
45+
for _, term := range preferred.Preference.MatchExpressions {
46+
if term.Key == e.runtimeInfo.GetFuseLabelName() {
47+
found = true
48+
return
49+
}
50+
}
51+
}
52+
53+
return
54+
}
55+
func (e *Helper) checkWorkerAffinityForAts(workers *openkruise.StatefulSet) (found bool) {
3056

3157
if workers.Spec.Template.Spec.Affinity == nil {
3258
return
@@ -52,12 +78,14 @@ func (e *Helper) checkWorkerAffinity(workers *appsv1.StatefulSet) (found bool) {
5278
return
5379
}
5480

55-
// BuildWorkersAffinity builds workers affinity if it doesn't have
56-
func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpdate *appsv1.StatefulSet, err error) {
57-
// TODO: for now, runtime affinity can't be set by user, so we can assume the affinity is nil in the first time.
58-
// We need to enhance it in future
59-
workersToUpdate = workers.DeepCopy()
60-
if e.checkWorkerAffinity(workersToUpdate) {
81+
func (e *Helper) BuildCacheWorkersAffinity(workers *cacheworkerset.CacheWorkerSet) (workersToUpdate *cacheworkerset.CacheWorkerSet, err error) {
82+
returnV, err := e.BuildWorkersAffinity(workers)
83+
return returnV, err
84+
}
85+
86+
func (e *Helper) BuildWorkersAffinityForAsts(workers *openkruise.StatefulSet) (workersToUpdate *openkruise.StatefulSet, err error) {
87+
88+
if e.checkWorkerAffinityForAts(workersToUpdate) {
6189
return
6290
}
6391
var (
@@ -168,3 +196,145 @@ func (e *Helper) BuildWorkersAffinity(workers *appsv1.StatefulSet) (workersToUpd
168196

169197
return
170198
}
199+
200+
// BuildWorkersAffinity builds workers affinity if it doesn't have
201+
func (e *Helper) BuildWorkersAffinity(workers *cacheworkerset.CacheWorkerSet) (workersToUpdate *cacheworkerset.CacheWorkerSet, err error) {
202+
// TODO: for now, runtime affinity can't be set by user, so we can assume the affinity is nil in the first time.
203+
// We need to enhance it in future
204+
205+
if e.checkWorkerAffinity(workersToUpdate) {
206+
return
207+
}
208+
var (
209+
name = e.runtimeInfo.GetName()
210+
namespace = e.runtimeInfo.GetNamespace()
211+
)
212+
213+
if workersToUpdate.GetAffinity() == nil {
214+
affinity := &corev1.Affinity{}
215+
workersToUpdate.SetAffinity(affinity)
216+
dataset, err := utils.GetDataset(e.client, name, namespace)
217+
if err != nil {
218+
return workersToUpdate, err
219+
}
220+
// 1. Set pod anti affinity(required) for same dataset (Current using port conflict for scheduling, no need to do)
221+
222+
// 2. Set pod anti affinity for the different dataset
223+
if dataset.IsExclusiveMode() {
224+
podAntiAffinity := &corev1.PodAntiAffinity{
225+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
226+
{
227+
LabelSelector: &metav1.LabelSelector{
228+
MatchExpressions: []metav1.LabelSelectorRequirement{
229+
{
230+
Key: "fluid.io/dataset",
231+
Operator: metav1.LabelSelectorOpExists,
232+
},
233+
},
234+
},
235+
TopologyKey: "kubernetes.io/hostname",
236+
},
237+
},
238+
}
239+
workersToUpdate.SetPodAntiAffinity(podAntiAffinity)
240+
241+
} else {
242+
podAntiAffinity := &corev1.PodAntiAffinity{
243+
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
244+
{
245+
// The default weight is 50
246+
Weight: 50,
247+
PodAffinityTerm: corev1.PodAffinityTerm{
248+
LabelSelector: &metav1.LabelSelector{
249+
MatchExpressions: []metav1.LabelSelectorRequirement{
250+
{
251+
Key: "fluid.io/dataset",
252+
Operator: metav1.LabelSelectorOpExists,
253+
},
254+
},
255+
},
256+
TopologyKey: "kubernetes.io/hostname",
257+
},
258+
},
259+
},
260+
261+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
262+
{
263+
LabelSelector: &metav1.LabelSelector{
264+
MatchExpressions: []metav1.LabelSelectorRequirement{
265+
{
266+
Key: "fluid.io/dataset-placement",
267+
Operator: metav1.LabelSelectorOpIn,
268+
Values: []string{string(datav1alpha1.ExclusiveMode)},
269+
},
270+
},
271+
},
272+
TopologyKey: "kubernetes.io/hostname",
273+
},
274+
},
275+
}
276+
workersToUpdate.SetPodAntiAffinity(podAntiAffinity)
277+
278+
// TODO: remove this when EFC is ready for spread-first scheduling policy
279+
// Currently EFC prefers binpack-first scheduling policy to spread-first scheduling policy. Set PreferredDuringSchedulingIgnoredDuringExecution to empty
280+
// to avoid using spread-first scheduling policy
281+
if e.runtimeInfo.GetRuntimeType() == common.EFCRuntime {
282+
preferredDuringSchedulingIgnoredDuringExecution := []corev1.WeightedPodAffinityTerm{}
283+
workersToUpdate.SetPodAntiAffinityPreferredDuringSchedulingIgnoredDuringExecution(preferredDuringSchedulingIgnoredDuringExecution)
284+
285+
}
286+
}
287+
288+
// 3. Prefer to locate on the node which already has fuse on it
289+
if workersToUpdate.GetNodeAffinity() == nil {
290+
NodeAffinity := &corev1.NodeAffinity{}
291+
workersToUpdate.SetNodeAffinity(NodeAffinity)
292+
293+
}
294+
295+
if len(workersToUpdate.GetNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution()) == 0 {
296+
297+
PreferredDuringSchedulingIgnoredDuringExecution := []corev1.PreferredSchedulingTerm{}
298+
workersToUpdate.SetNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution(PreferredDuringSchedulingIgnoredDuringExecution)
299+
300+
}
301+
302+
//workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
303+
// append(workersToUpdate.Spec.Template.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
304+
// corev1.PreferredSchedulingTerm{
305+
// Weight: 100,
306+
// Preference: corev1.NodeSelectorTerm{
307+
// MatchExpressions: []corev1.NodeSelectorRequirement{
308+
// {
309+
// Key: e.runtimeInfo.GetFuseLabelName(),
310+
// Operator: corev1.NodeSelectorOpIn,
311+
// Values: []string{"true"},
312+
// },
313+
// },
314+
// },
315+
// })
316+
PreferredDuringSchedulingIgnoredDuringExecution := corev1.PreferredSchedulingTerm{
317+
Weight: 100,
318+
Preference: corev1.NodeSelectorTerm{
319+
MatchExpressions: []corev1.NodeSelectorRequirement{
320+
{
321+
Key: e.runtimeInfo.GetFuseLabelName(),
322+
Operator: corev1.NodeSelectorOpIn,
323+
Values: []string{"true"},
324+
},
325+
},
326+
},
327+
}
328+
workersToUpdate.AppendNodeAffinityPreferredDuringSchedulingIgnoredDuringExecution(PreferredDuringSchedulingIgnoredDuringExecution)
329+
330+
// 3. set node affinity if possible
331+
if dataset.Spec.NodeAffinity != nil {
332+
if dataset.Spec.NodeAffinity.Required != nil {
333+
workersToUpdate.SetNodeAffinityRequired(dataset.Spec.NodeAffinity.Required)
334+
335+
}
336+
}
337+
}
338+
339+
return
340+
}

pkg/ctrl/ctrl.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,13 @@ import (
2121
"reflect"
2222

2323
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
24+
cacheworkerset "github.com/fluid-cloudnative/fluid/pkg/types/cacheworkerset"
2425
"github.com/fluid-cloudnative/fluid/pkg/utils"
2526
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
26-
appsv1 "k8s.io/api/apps/v1"
27+
"github.com/go-logr/logr"
2728
corev1 "k8s.io/api/core/v1"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
2930

30-
"github.com/go-logr/logr"
31-
3231
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
3332
)
3433

@@ -54,18 +53,19 @@ func BuildHelper(runtimeInfo base.RuntimeInfoInterface, client client.Client, lo
5453
// calls for a status update and finally returns error if anything unexpected happens.
5554
func (e *Helper) SetupWorkers(runtime base.RuntimeInterface,
5655
currentStatus datav1alpha1.RuntimeStatus,
57-
workers *appsv1.StatefulSet) (err error) {
56+
workers *cacheworkerset.CacheWorkerSet) (err error) {
5857

5958
desireReplicas := runtime.Replicas()
60-
if *workers.Spec.Replicas != desireReplicas {
59+
if *workers.GetReplicas() != desireReplicas {
6160
// workerToUpdate, err := e.buildWorkersAffinity(workers)
6261

6362
workerToUpdate, err := e.BuildWorkersAffinity(workers)
6463
if err != nil {
6564
return err
6665
}
6766

68-
workerToUpdate.Spec.Replicas = &desireReplicas
67+
//workerToUpdate.Spec.Replicas = &desireReplicas
68+
workerToUpdate.SetReplicas(&desireReplicas)
6969
err = e.client.Update(context.TODO(), workerToUpdate)
7070
if err != nil {
7171
return err
@@ -76,11 +76,11 @@ func (e *Helper) SetupWorkers(runtime base.RuntimeInterface,
7676
e.log.V(1).Info("Nothing to do for syncing")
7777
}
7878

79-
if *workers.Spec.Replicas != runtime.GetStatus().DesiredWorkerNumberScheduled {
79+
if *workers.GetReplicas() != runtime.GetStatus().DesiredWorkerNumberScheduled {
8080
statusToUpdate := runtime.GetStatus()
8181

82-
if workers.Status.ReadyReplicas > 0 {
83-
if runtime.Replicas() == workers.Status.ReadyReplicas {
82+
if workers.GetReadyReplicas() > 0 {
83+
if runtime.Replicas() == workers.GetReadyReplicas() {
8484
statusToUpdate.WorkerPhase = datav1alpha1.RuntimePhaseReady
8585
} else {
8686
statusToUpdate.WorkerPhase = datav1alpha1.RuntimePhasePartialReady
@@ -114,10 +114,10 @@ func (e *Helper) SetupWorkers(runtime base.RuntimeInterface,
114114
// CheckWorkersReady checks if workers are ready
115115
func (e *Helper) CheckWorkersReady(runtime base.RuntimeInterface,
116116
currentStatus datav1alpha1.RuntimeStatus,
117-
workers *appsv1.StatefulSet) (ready bool, err error) {
117+
workers *cacheworkerset.CacheWorkerSet) (ready bool, err error) {
118118

119119
var (
120-
phase datav1alpha1.RuntimePhase = kubeclient.GetPhaseFromStatefulset(runtime.Replicas(), *workers)
120+
phase datav1alpha1.RuntimePhase = kubeclient.GetPhaseFromCacheWorkset(runtime.Replicas(), workers)
121121
cond datav1alpha1.RuntimeCondition = datav1alpha1.RuntimeCondition{}
122122
)
123123

pkg/ctrl/master.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package ctrl
1919
import (
2020
"context"
2121
"fmt"
22+
"github.com/fluid-cloudnative/fluid/pkg/types/cacheworkerset"
2223
"reflect"
2324

2425
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
2526
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
2627
"github.com/fluid-cloudnative/fluid/pkg/utils"
2728
"github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient"
28-
appsv1 "k8s.io/api/apps/v1"
29+
_ "k8s.io/api/apps/v1"
2930
corev1 "k8s.io/api/core/v1"
3031
"k8s.io/client-go/tools/record"
3132

@@ -37,13 +38,13 @@ import (
3738
// CheckMasterHealthy checks the sts healthy with role
3839
func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.RuntimeInterface,
3940
currentStatus datav1alpha1.RuntimeStatus,
40-
sts *appsv1.StatefulSet) (err error) {
41+
sts *cacheworkerset.CacheWorkerSet) (err error) {
4142
var (
4243
healthy bool
4344
selector labels.Selector
4445
unavailablePodNames []types.NamespacedName
4546
)
46-
if sts.Status.Replicas == sts.Status.ReadyReplicas {
47+
if *sts.GetReplicas() == sts.GetReadyReplicas() {
4748
healthy = true
4849
}
4950

@@ -67,7 +68,7 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.
6768
} else {
6869
// 1. Update the status
6970
cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterReady, "The master is not ready.",
70-
fmt.Sprintf("The master %s in %s is not ready.", sts.Name, sts.Namespace), corev1.ConditionFalse)
71+
fmt.Sprintf("The master %s in %s is not ready.", sts.GetName(), sts.GetNamespace()), corev1.ConditionFalse)
7172
_, oldCond := utils.GetRuntimeCondition(statusToUpdate.Conditions, cond.Type)
7273

7374
if oldCond == nil || oldCond.Type != cond.Type {
@@ -79,22 +80,22 @@ func (e *Helper) CheckMasterHealthy(recorder record.EventRecorder, runtime base.
7980

8081
// 2. Record the event
8182

82-
selector, err = metav1.LabelSelectorAsSelector(sts.Spec.Selector)
83+
selector, err = metav1.LabelSelectorAsSelector(sts.GetSelector())
8384
if err != nil {
84-
return fmt.Errorf("error converting StatefulSet %s in namespace %s selector: %v", sts.Name, sts.Namespace, err)
85+
return fmt.Errorf("error converting StatefulSet %s in namespace %s selector: %v", sts.GetName(), sts.GetNamespace(), err)
8586
}
8687

87-
unavailablePodNames, err = kubeclient.GetUnavailablePodNamesForStatefulSet(e.client, sts, selector)
88+
unavailablePodNames, err = kubeclient.GetUnavailablePodNamesForCacheWorkerSet(e.client, sts, selector)
8889
if err != nil {
8990
return err
9091
}
9192

9293
// 3. Set event
9394
err = fmt.Errorf("the master %s in %s is not ready. The expected number is %d, the actual number is %d, the unhealthy pods are %v",
94-
sts.Name,
95-
sts.Namespace,
96-
sts.Status.Replicas,
97-
sts.Status.ReadyReplicas,
95+
sts.GetName(),
96+
sts.GetNamespace(),
97+
sts.GetReplicas(),
98+
sts.GetReadyReplicas(),
9899
unavailablePodNames)
99100

100101
recorder.Eventf(runtime, corev1.EventTypeWarning, "MasterUnhealthy", err.Error())

0 commit comments

Comments
 (0)