Skip to content

Support scale to zero rabbitMQ #1899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if err := builder.Update(sts); err != nil {
return ctrl.Result{}, err
}
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
if r.scaleToZero(current, sts) {
err := r.saveReplicasBeforeZero(ctx, rabbitmqCluster, current)
if err != nil {
return ctrl.Result{}, err
}
} else {
if r.scaleDown(ctx, rabbitmqCluster, current, sts) {
// return when cluster scale down detected; unsupported operation
return ctrl.Result{}, nil
}
}
if r.scaleFromZero(current, sts) {
if r.scaleFromZeroToBeforeReplicasConfigured(ctx, rabbitmqCluster, sts) {
// return when cluster scale down from zero detected; unsupported operation
return ctrl.Result{}, nil
}
r.removeReplicasBeforeZeroAnnotationIfExists(ctx, rabbitmqCluster)
}
}

Expand All @@ -213,7 +227,6 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this line break removal improves readability of this function. In fact, I think the opposite. This empty line makes the function cluttered. This line break separates high level steps of the reconcile process. Prior to this line break, it's the logic to reconcile the STS (plus other things earlier on), post this line break is the logic to update the STS and emit the relevant metadata. I believe this separation makes sense, and I want to keep unless there's a compelling argument for the opposite.

var operationResult controllerutil.OperationResult
err = clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
var apiError error
Expand Down Expand Up @@ -249,9 +262,9 @@ func (r *RabbitmqClusterReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

// Set ReconcileSuccess to true and update observedGeneration after all reconciliation steps have finished with no error
rabbitmqCluster.Status.ObservedGeneration = rabbitmqCluster.GetGeneration()

Comment on lines -252 to +267
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before with both the empty line removal and the new line addition.

r.setReconcileSuccess(ctx, rabbitmqCluster, corev1.ConditionTrue, "Success", "Finish reconciling")

logger.Info("Finished reconciling")
Expand Down
95 changes: 95 additions & 0 deletions controllers/reconcile_scale_zero.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package controllers

import (
"context"
"errors"
"fmt"
"strconv"

ctrl "sigs.k8s.io/controller-runtime"

"github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
"github.com/rabbitmq/cluster-operator/v2/internal/status"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)

const beforeZeroReplicasConfigured = "rabbitmq.com/before-zero-replicas-configured"

// scaleToZero checks if the desired replicas is zero and the current replicas is not zero.
func (r *RabbitmqClusterReconciler) scaleToZero(current, sts *appsv1.StatefulSet) bool {
currentReplicas := *current.Spec.Replicas
desiredReplicas := *sts.Spec.Replicas
return desiredReplicas == 0 && currentReplicas > 0
}
Comment on lines +20 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does not need to be part of the RabbitmqClusterReconciler, because it doesn't make any use of the struct functions or fields, it takes all its information from the arguments.


// scaleFromZero checks if the current replicas is zero and the desired replicas is greater than zero.
func (r *RabbitmqClusterReconciler) scaleFromZero(current, sts *appsv1.StatefulSet) bool {
currentReplicas := *current.Spec.Replicas
desiredReplicas := *sts.Spec.Replicas
return currentReplicas == 0 && desiredReplicas > 0
}
Comment on lines +27 to +31
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does not need to be part of the RabbitmqClusterReconciler, because it doesn't make any use of the struct functions or fields, it takes all its information from the arguments.


// scaleDownFromZero checks if the current replicas is desired replicas would be greatter than replicas configured before zero state.
func (r *RabbitmqClusterReconciler) scaleFromZeroToBeforeReplicasConfigured(ctx context.Context, cluster *v1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) bool {
var err error
var beforeZeroReplicas int64
desiredReplicas := *sts.Spec.Replicas
annotationValue, ok := cluster.Annotations[beforeZeroReplicasConfigured]
if !ok {
return false
}

beforeZeroReplicas, err = strconv.ParseInt(annotationValue, 10, 32)
if err != nil {
msg := "Failed to convert string to integer for before-zero-replicas-configuration annotation"
reason := "TransformErrorOperation"
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
if err != nil {
return true
}
return true
Comment on lines +48 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other comment:

We should add a debug log line here indicating there was an error emitting the event and/or setting the status condition. At it is right now, it's silently ignoring the error.

Since the function returns true at this point in either case, you could simply log a debug message (without returning inside the if err) and return true after the if conditional

}
if desiredReplicas != int32(beforeZeroReplicas) {
msg := fmt.Sprintf("Unsupported operation; when scaling from zero, you can only restore the previous number of replicas (%d)", int32(beforeZeroReplicas))
reason := "UnsupportedOperation"
err = r.recordEventsAndSetCondition(ctx, cluster, status.ReconcileSuccess, corev1.ConditionFalse, corev1.EventTypeWarning, reason, msg)
if err != nil {
return true
}
Comment on lines +57 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a debug log line here indicating there was an error emitting the event and/or setting the status condition. At it is right now, it's silently ignoring the error.

In fact, since the function returns true at this point in either case, you could simply log a debug message (without returning inside the if) and return true after the if conditional

return true
}
return false

}

// saveReplicasBeforeZero saves the current replicas count in an annotation before scaling down to zero.
// This is used to prevent scaling down when the cluster change from zero replicas to a number less than the saved replicas count.
func (r *RabbitmqClusterReconciler) saveReplicasBeforeZero(ctx context.Context, cluster *v1beta1.RabbitmqCluster, current *appsv1.StatefulSet) error {
var err error
currentReplicas := *current.Spec.Replicas
logger := ctrl.LoggerFrom(ctx)
msg := "Cluster Scale down to 0 replicas."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
msg := "Cluster Scale down to 0 replicas."
msg := "Cluster Scale down to 0 replicas"

Log and/or event messages should not end with a dot .

reason := "ScaleDownToZero"
logger.Info(msg)
err = r.updateAnnotation(ctx, cluster, cluster.Namespace, cluster.Name, beforeZeroReplicasConfigured, fmt.Sprint(currentReplicas))
r.Recorder.Event(cluster, corev1.EventTypeNormal, reason, msg)
return err
Comment on lines +75 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err = r.updateAnnotation(ctx, cluster, cluster.Namespace, cluster.Name, beforeZeroReplicasConfigured, fmt.Sprint(currentReplicas))
r.Recorder.Event(cluster, corev1.EventTypeNormal, reason, msg)
return err
r.Recorder.Event(cluster, corev1.EventTypeNormal, reason, msg)
return r.updateAnnotation(ctx, cluster, cluster.Namespace, cluster.Name, beforeZeroReplicasConfigured, fmt.Sprint(currentReplicas))

Event() function does not use err. We can simply return the function call.

}

// If the annotation rabbitmq.com/before-zero-replicas-configured exists it will be deleted.
func (r *RabbitmqClusterReconciler) removeReplicasBeforeZeroAnnotationIfExists(ctx context.Context, cluster *v1beta1.RabbitmqCluster) {
if _, ok := cluster.Annotations[beforeZeroReplicasConfigured]; ok {
r.deleteAnnotation(ctx, cluster, beforeZeroReplicasConfigured)
}
}

func (r *RabbitmqClusterReconciler) recordEventsAndSetCondition(ctx context.Context, cluster *v1beta1.RabbitmqCluster, condType status.RabbitmqClusterConditionType, condStatus corev1.ConditionStatus, eventType, reason, msg string) error {
logger := ctrl.LoggerFrom(ctx)
var statusErr error
logger.Error(errors.New(reason), msg)
Comment on lines +88 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. Why do we log an error unconditionally here?

r.Recorder.Event(cluster, eventType, reason, msg)
cluster.Status.SetCondition(condType, condStatus, reason, msg)
statusErr = r.Status().Update(ctx, cluster)
return statusErr
}
231 changes: 231 additions & 0 deletions controllers/reconcile_scale_zero_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package controllers_test

import (
"context"
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/v2/api/v1beta1"
"github.com/rabbitmq/cluster-operator/v2/internal/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
runtimeClient "sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Cluster scale to zero", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}).Should(BeTrue())
})

It("scale to zero", func() {
By("update statefulSet replicas to zero", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-to-zero",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: ptr.To(int32(2)),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = ptr.To(int32(0))
})).To(Succeed())

Eventually(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(0)))

})

By("setting ReconcileSuccess to 'true'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 0).Should(Equal("ReconcileSuccess status: True, " +
"with reason: Success " +
"and message: Finish reconciling"))
})
})
})

var _ = Describe("Cluster scale from zero", func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}).Should(BeTrue())
})

It("scale from zero", func() {
By("update statefulSet replicas from zero", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-from-zero",
Namespace: defaultNamespace,
Annotations: map[string]string{
"rabbitmq.com/before-zero-replicas-configured": "2",
},
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: ptr.To(int32(0)),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = ptr.To(int32(2))
})).To(Succeed())

Eventually(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(2)))

})

By("setting ReconcileSuccess to 'true'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 0).Should(Equal("ReconcileSuccess status: True, " +
"with reason: Success " +
"and message: Finish reconciling"))
})
})
})

var _ = Describe("Cluster scale from zero to less replicas configured", Ordered, func() {
var (
cluster *rabbitmqv1beta1.RabbitmqCluster
defaultNamespace = "default"
ctx = context.Background()
)

AfterEach(func() {
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
Eventually(func() bool {
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
return apierrors.IsNotFound(err)
}).Should(BeTrue())
})

It("scale from zero to less replicas", func() {
By("update statefulSet replicas from zero", func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-from-zero-to-less",
Namespace: defaultNamespace,
Annotations: map[string]string{
"rabbitmq.com/before-zero-replicas-configured": "2",
},
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: ptr.To(int32(0)),
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
r.Spec.Replicas = ptr.To(int32(1))
})).To(Succeed())

Consistently(func() int32 {
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
return *sts.Spec.Replicas
}, 10, 1).Should(Equal(int32(0)))

})

By("setting 'Warning' events", func() {
Expect(aggregateEventMsgs(ctx, cluster, "UnsupportedOperation")).To(
ContainSubstring("Unsupported operation"))
})

By("setting ReconcileSuccess to 'false'", func() {
Eventually(func() string {
rabbit := &rabbitmqv1beta1.RabbitmqCluster{}
Expect(client.Get(ctx, runtimeClient.ObjectKey{
Name: cluster.Name,
Namespace: defaultNamespace,
}, rabbit)).To(Succeed())

for i := range rabbit.Status.Conditions {
if rabbit.Status.Conditions[i].Type == status.ReconcileSuccess {
return fmt.Sprintf(
"ReconcileSuccess status: %s, with reason: %s and message: %s",
rabbit.Status.Conditions[i].Status,
rabbit.Status.Conditions[i].Reason,
rabbit.Status.Conditions[i].Message)
}
}
return "ReconcileSuccess status: condition not present"
}, 0).Should(Equal("ReconcileSuccess status: False, " +
"with reason: UnsupportedOperation " +
"and message: Unsupported operation; when scaling from zero, you can only restore the previous number of replicas (2)"))
})
})
})
7 changes: 7 additions & 0 deletions internal/status/all_replicas_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ func AllReplicasReadyCondition(resources []runtime.Object,
if resource.Spec.Replicas != nil {
desiredReplicas = *resource.Spec.Replicas
}

if desiredReplicas == 0 {
condition.Status = corev1.ConditionFalse
condition.Reason = "ScaledToZero"
goto assignLastTransitionTime
}

if desiredReplicas == resource.Status.ReadyReplicas {
condition.Status = corev1.ConditionTrue
condition.Reason = "AllPodsAreReady"
Expand Down