diff --git a/pkg/controllers/leaderworkerset_controller.go b/pkg/controllers/leaderworkerset_controller.go index 82e4fe0aa..9a9d9534a 100644 --- a/pkg/controllers/leaderworkerset_controller.go +++ b/pkg/controllers/leaderworkerset_controller.go @@ -271,11 +271,12 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, // wantReplicas calculates the final replicas if needed. wantReplicas := func(unreadyReplicas int32) int32 { - if unreadyReplicas <= int32(maxSurge) { - // When we have n unready replicas and n bursted replicas, we should - // start to release the burst replica gradually for the accommodation of - // the unready ones. - finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1) + if unreadyReplicas < int32(maxSurge) { + // When the unready replicas less than maxSurge, we should start to release + // the burst replica gradually for the accommodation of the unready ones. + // Actually we should keep the burst replicas when the unready replicas is + // equal to maxSurge, because the rolling update is not completed yet. + finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)) r.Record.Eventf(lws, corev1.EventTypeNormal, GroupsProgressing, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas)) return finalReplicas } @@ -286,7 +287,7 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context, // Indicates a new rolling update here. if leaderWorkerSetUpdated { // Processing scaling up/down first prior to rolling update. - return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil + return min(lwsReplicas, stsReplicas), lwsReplicas, nil } partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition diff --git a/pkg/controllers/leaderworkerset_controller_test.go b/pkg/controllers/leaderworkerset_controller_test.go index 1a21fa4a0..173aa4c0b 100644 --- a/pkg/controllers/leaderworkerset_controller_test.go +++ b/pkg/controllers/leaderworkerset_controller_test.go @@ -29,10 +29,8 @@ import ( coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1" metaapplyv1 "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/utils/ptr" - - leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" - "sigs.k8s.io/controller-runtime/pkg/client/fake" + leaderworkerset "sigs.k8s.io/lws/api/leaderworkerset/v1" revisionutils "sigs.k8s.io/lws/pkg/utils/revision" "sigs.k8s.io/lws/test/wrappers" ) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index fd9b6b506..a5a1dbf35 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -140,7 +140,7 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() { testing.UpdateWorkerTemplate(ctx, k8sClient, lws) // Happen during rolling update. - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 7) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 8) // Rolling update completes. testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) @@ -186,7 +186,7 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() { testing.UpdateWorkerTemplate(ctx, k8sClient, lws) // Happen during rolling update. - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 7) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 8) // Rolling update completes. testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) @@ -245,8 +245,8 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() { testing.UpdateWorkerTemplate(ctx, k8sClient, lws) - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 7) - testing.ExpectValidServices(ctx, k8sClient, lws, 7) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 8) + testing.ExpectValidServices(ctx, k8sClient, lws, 8) // Rolling update completes. testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) diff --git a/test/integration/controllers/leaderworkerset_test.go b/test/integration/controllers/leaderworkerset_test.go index 9054fbcf8..a51636891 100644 --- a/test/integration/controllers/leaderworkerset_test.go +++ b/test/integration/controllers/leaderworkerset_test.go @@ -1061,22 +1061,22 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { // Rolling update index-1 replica. lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-1", lws) - // Reclaim the replica. - testing.DeleteLeaderPod(ctx, k8sClient, lws, 4, 5) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 5) testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) - testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 3) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 5, 4) }, }, { // Rolling update index-0 replica. lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-0", lws) + // Reclaim the replica. + testing.DeleteLeaderPod(ctx, k8sClient, lws, 4, 5) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) @@ -1159,6 +1159,83 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { }, }, }), + ginkgo.Entry("rolling update with one replicas", &testCase{ + makeLeaderWorkerSet: func(nsName string) *wrappers.LeaderWorkerSetWrapper { + return wrappers.BuildLeaderWorkerSet(nsName).MaxSurge(1).Replica(1) + }, + updates: []*update{ + { + // Set lws to available condition. + lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.SetPodGroupsToReady(ctx, k8sClient, lws, 1) + }, + checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 1) + testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) + testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready") + testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 1, 1) + }, + }, + { + // Update the worker template. + lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { + gomega.Eventually(func() error { + var leaderworkerset leaderworkerset.LeaderWorkerSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset); err != nil { + return err + } + leaderworkerset.Spec.LeaderWorkerTemplate.WorkerTemplate.Spec.Containers[0].Name = "new-worker" + return k8sClient.Update(ctx, &leaderworkerset) + }, testing.Timeout, testing.Interval).Should(gomega.Succeed()) + + var leaderSts appsv1.StatefulSet + testing.GetLeaderStatefulset(ctx, lws, k8sClient, &leaderSts) + // Create leader pod for maxSurge. + gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 1, 2)).To(gomega.Succeed()) + }, + checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2) + testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") + testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") + testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") + testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) + }, + }, + { + // rolling update the last replica + lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-1", lws) + }, + checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2) + testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") + testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") + testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") + testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 2, 1) + }, + }, + + { + // rolling update the first replica + lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-0", lws) + // Reclaim the replica. + testing.DeleteLeaderPod(ctx, k8sClient, lws, 1, 2) + }, + checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 1) + testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) + testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready") + testing.ExpectLeaderWorkerSetNotProgressing(ctx, k8sClient, lws, "Replicas are progressing") + testing.ExpectLeaderWorkerSetNoUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") + testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 1, 1) + }, + }, + }, + }), ginkgo.Entry("rolling update with replicas scaled down and maxSurge set", &testCase{ makeLeaderWorkerSet: func(nsName string) *wrappers.LeaderWorkerSetWrapper { return wrappers.BuildLeaderWorkerSet(nsName).Replica(6).MaxSurge(2) @@ -1258,10 +1335,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { var leaderSts appsv1.StatefulSet testing.GetLeaderStatefulset(ctx, lws, k8sClient, &leaderSts) // Create leader pod for maxSurge. - gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 3)).To(gomega.Succeed()) + gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 4)).To(gomega.Succeed()) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") @@ -1271,7 +1348,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { { // Set all groups to ready. lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { - testing.SetPodGroupsToReady(ctx, k8sClient, lws, 3) + testing.SetPodGroupsToReady(ctx, k8sClient, lws, 4) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready") @@ -1393,30 +1470,31 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { return k8sClient.Update(ctx, &leaderworkerset) }, testing.Timeout, testing.Interval).Should(gomega.Succeed()) testing.DeleteLeaderPod(ctx, k8sClient, lws, 4, 8) - // Reclaim the last replica. - testing.DeleteLeaderPod(ctx, k8sClient, lws, 3, 4) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 1) - testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 0) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 0) }, }, { // Rolling update index-2 replica. lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-2", lws) + testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-3", lws) + // Reclaim the last replica. + // testing.DeleteLeaderPod(ctx, k8sClient, lws, 3, 4) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) - testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 1) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 4, 2) }, }, { @@ -1424,21 +1502,23 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-1", lws) // Reclaim the last replica. - testing.DeleteLeaderPod(ctx, k8sClient, lws, 2, 3) + testing.DeleteLeaderPod(ctx, k8sClient, lws, 3, 4) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2) + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 3) testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress") testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0) - testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 2, 1) + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 2) }, }, { // Rolling update index-0 replica. lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-0", lws) + // Reclaim the last replica. + testing.DeleteLeaderPod(ctx, k8sClient, lws, 2, 3) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2) diff --git a/test/wrappers/wrappers.go b/test/wrappers/wrappers.go index f31dd1aaf..23cd9cb36 100644 --- a/test/wrappers/wrappers.go +++ b/test/wrappers/wrappers.go @@ -18,6 +18,7 @@ import ( "fmt" "strconv" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -197,6 +198,45 @@ func MakePodWithLabels(setName, groupIndex, workerIndex, namespace string, size } } +func MakePodWithLabelsAndStatus(setName, groupIndex, workerIndex, namespace string, size int, status corev1.PodPhase) *corev1.Pod { + pod := MakePodWithLabels(setName, groupIndex, workerIndex, namespace, size) + pod.Status.Phase = status + + if status == corev1.PodRunning { + pod.Status.Conditions = []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + } + } + return pod +} + +func MakeLeaderStatefulSetWithLabels(setName, namespace string, replica int) *appsv1.StatefulSet { + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: setName, + Namespace: namespace, + Labels: map[string]string{ + leaderworkerset.SetNameLabelKey: setName, + }, + Annotations: map[string]string{ + leaderworkerset.ReplicasAnnotationKey: strconv.Itoa(replica), + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To(int32(replica)), + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.RollingUpdateStatefulSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: ptr.To[int32](0), + }, + }, + }, + } +} + func MakeWorkerPodSpec() corev1.PodSpec { return corev1.PodSpec{ Containers: []corev1.Container{