Skip to content

Commit fc2375d

Browse files
toelkeclaude
andcommitted
Add global rate limiting to prevent rapid deployment churn
Implements global rate limiting using golang.org/x/time/rate.Limiter to prevent Wave from updating deployments too frequently when secrets or configmaps change rapidly. This prevents scenarios where a buggy controller rapidly updating secrets causes Wave to rapidly update deployments, which can overwhelm the Kubernetes API server. Key features: - Token bucket rate limiting (default: 1 update/sec globally, burst of 10) - Global rate limiting shared across all deployments/statefulsets/daemonsets - Configurable via --update-rate and --update-burst flags - Configurable via Helm chart (updateRate and updateBurst values) - Stalls operator pipeline via Wait() instead of requeueing - Thread-safe implementation using rate.Limiter Technical details: - Uses standard library golang.org/x/time/rate for token bucket algorithm - Burst allows initial rapid updates, then enforces steady-state rate - Infinite rate (math.Inf) disables rate limiting for testing - All resources share single global rate limiter Fixes #182 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent c3bcbe7 commit fc2375d

25 files changed

+575
-45
lines changed

charts/wave/templates/deployment.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ spec:
3535
{{- if .Values.syncPeriod }}
3636
- --sync-period={{ .Values.syncPeriod }}
3737
{{- end }}
38+
{{- if .Values.updateRate }}
39+
- --update-rate={{ .Values.updateRate }}
40+
{{- end }}
41+
{{- if .Values.updateBurst }}
42+
- --update-burst={{ .Values.updateBurst }}
43+
{{- end }}
3844
{{- if .Values.webhooks.enabled }}
3945
- --enable-webhooks=true
4046
{{- end }}

charts/wave/values.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ webhooks:
5353
# Period for reconciliation
5454
# syncPeriod: 5m
5555

56+
# Global rate limiting for updates to prevent rapid deployment churn
57+
# Rate limit is shared across all deployments, statefulsets, and daemonsets
58+
# updateRate: maximum updates per second globally (default: 1.0 = 1 update per second)
59+
# updateBurst: maximum burst size (default: 10 = allow 10 immediate updates)
60+
# Set updateRate to 0 or very high value to disable rate limiting
61+
updateRate: 1.0
62+
updateBurst: 10
63+
5664
resources: {}
5765
# We usually recommend not to specify default resources and to leave this as a conscious
5866
# choice for the user. This also increases chances charts run on environments with little

cmd/manager/main.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ var (
4747
leaderElectionID = flag.String("leader-election-id", "", "Name of the configmap used by the leader election system")
4848
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace for the configmap used by the leader election system")
4949
syncPeriod = flag.Duration("sync-period", 10*time.Hour, "Reconcile sync period")
50+
updateRate = flag.Float64("update-rate", core.DefaultUpdateRate, "Global maximum update rate per second (default: 1.0 = 1 update per second)")
51+
updateBurst = flag.Int("update-burst", core.DefaultUpdateBurst, "Global maximum burst size for updates (default: 10 immediate updates allowed)")
5052
showVersion = flag.Bool("version", false, "Show version and exit")
5153
enableWebhooks = flag.Bool("enable-webhooks", false, "Enable webhooks")
5254
namespaces = flag.String("namespaces", "", "Comma-separated list of namespaces to watch. Defaults to all namespaces.")
@@ -109,22 +111,26 @@ func main() {
109111

110112
// Setup all Controllers
111113
setupLog.Info("Setting up controller")
112-
if err := controller.AddToManager(mgr); err != nil {
114+
controllerConfig := controller.Config{
115+
UpdateRate: *updateRate,
116+
UpdateBurst: *updateBurst,
117+
}
118+
if err := controller.AddToManager(mgr, controllerConfig); err != nil {
113119
setupLog.Error(err, "unable to register controllers to the manager")
114120
os.Exit(1)
115121
}
116122
if *enableWebhooks {
117-
if err := deployment.AddDeploymentWebhook(mgr); err != nil {
123+
if err := deployment.AddDeploymentWebhook(mgr, *updateRate, *updateBurst); err != nil {
118124
setupLog.Error(err, "unable to create webhook", "webhook", "Deployment")
119125
os.Exit(1)
120126
}
121127

122-
if err := statefulset.AddStatefulSetWebhook(mgr); err != nil {
128+
if err := statefulset.AddStatefulSetWebhook(mgr, *updateRate, *updateBurst); err != nil {
123129
setupLog.Error(err, "unable to create webhook", "webhook", "StatefulSet")
124130
os.Exit(1)
125131
}
126132

127-
if err := daemonset.AddDaemonSetWebhook(mgr); err != nil {
133+
if err := daemonset.AddDaemonSetWebhook(mgr, *updateRate, *updateBurst); err != nil {
128134
setupLog.Error(err, "unable to create webhook", "webhook", "DaemonSet")
129135
os.Exit(1)
130136
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/onsi/ginkgo/v2 v2.27.1
99
github.com/onsi/gomega v1.38.2
1010
github.com/prometheus/client_golang v1.22.0
11+
golang.org/x/time v0.9.0
1112
k8s.io/api v0.34.1
1213
k8s.io/apimachinery v0.34.1
1314
k8s.io/client-go v0.34.1
@@ -71,7 +72,6 @@ require (
7172
golang.org/x/sys v0.37.0 // indirect
7273
golang.org/x/term v0.36.0 // indirect
7374
golang.org/x/text v0.30.0 // indirect
74-
golang.org/x/time v0.9.0 // indirect
7575
golang.org/x/tools v0.38.0 // indirect
7676
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
7777
google.golang.org/protobuf v1.36.10 // indirect

hack/test-update-throttling.sh

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
#!/usr/bin/env bash
2+
#
3+
# Test script for Wave's global rate limiting feature.
4+
#
5+
# Usage: ./hack/test-update-throttling.sh [kubernetes-version] [update-rate] [update-burst]
6+
#
7+
# Arguments:
8+
# kubernetes-version: Kubernetes version for minikube (default: v1.21)
9+
# update-rate: Updates per second globally (default: 0.2 = 1 update per 5 seconds)
10+
# update-burst: Maximum burst size (default: 2)
11+
#
12+
# Examples:
13+
# ./hack/test-update-throttling.sh # Use defaults
14+
# ./hack/test-update-throttling.sh v1.21 0.5 5 # 0.5 updates/sec, burst of 5
15+
# ./hack/test-update-throttling.sh v1.21 1.0 10 # 1 update/sec, burst of 10 (production defaults)
16+
17+
set -eu
18+
set -o pipefail
19+
20+
helm --help > /dev/null 2>&1 || {
21+
echo "helm is not installed"
22+
exit 1
23+
}
24+
25+
kubectl --help > /dev/null 2>&1 || {
26+
echo "kubectl is not installed"
27+
exit 1
28+
}
29+
30+
MINIKUBE_ALREADY_RUNNING=0
31+
kubectl get node minikube >/dev/null 2>&1 && MINIKUBE_ALREADY_RUNNING=1
32+
33+
minikube --help > /dev/null 2>&1 || {
34+
echo "minikube is not installed"
35+
exit 1
36+
}
37+
38+
KUBERNETES_VERSION=${1:-v1.21}
39+
UPDATE_RATE=${2:-0.2}
40+
UPDATE_BURST=${3:-2}
41+
42+
[ $MINIKUBE_ALREADY_RUNNING -eq 0 ] && {
43+
echo Starting minikube...
44+
minikube start --kubernetes-version "$KUBERNETES_VERSION"
45+
trap "minikube delete" EXIT
46+
}
47+
48+
eval $(minikube -p minikube docker-env)
49+
docker build -f ./Dockerfile -t wave-local:local .
50+
51+
echo Installing wave with updateRate=${UPDATE_RATE} updateBurst=${UPDATE_BURST}...
52+
helm install wave charts/wave --set image.name=wave-local --set image.tag=local --set updateRate=${UPDATE_RATE} --set updateBurst=${UPDATE_BURST}
53+
54+
while [ "$(kubectl get pods -n default | grep -cE 'wave-wave')" -gt 1 ]; do echo Waiting for \"wave\" to be scheduled; sleep 10; done
55+
56+
while [ "$(kubectl get pods -A | grep -cEv 'Running|Completed')" -gt 1 ]; do echo Waiting for \"cluster\" to start; sleep 10; done
57+
58+
echo Creating test resources...
59+
kubectl apply -f - <<'EOF'
60+
apiVersion: v1
61+
kind: ConfigMap
62+
metadata:
63+
name: throttle-test
64+
data:
65+
counter: "0"
66+
timestamp: "0"
67+
EOF
68+
69+
kubectl apply -f - <<'EOF'
70+
apiVersion: apps/v1
71+
kind: Deployment
72+
metadata:
73+
name: throttle-test
74+
annotations:
75+
wave.pusher.com/update-on-config-change: "true"
76+
spec:
77+
replicas: 1
78+
selector:
79+
matchLabels:
80+
app: throttle-test
81+
template:
82+
metadata:
83+
labels:
84+
app: throttle-test
85+
spec:
86+
containers:
87+
- name: test
88+
image: nixery.dev/shell/bash
89+
command:
90+
- /bin/bash
91+
- -c
92+
- |
93+
echo "Pod started at $(date +%s)"
94+
echo "Counter: $(cat /etc/config/counter)"
95+
sleep infinity
96+
volumeMounts:
97+
- name: config
98+
mountPath: /etc/config
99+
volumes:
100+
- name: config
101+
configMap:
102+
name: throttle-test
103+
EOF
104+
105+
echo Waiting for initial deployment to be ready...
106+
kubectl wait --for=condition=available --timeout=60s deployment/throttle-test
107+
108+
# Get the initial pod name and creation time
109+
INITIAL_POD=$(kubectl get pods -l app=throttle-test -o jsonpath='{.items[0].metadata.name}')
110+
INITIAL_CREATION=$(kubectl get pod $INITIAL_POD -o jsonpath='{.metadata.creationTimestamp}')
111+
echo "Initial pod: $INITIAL_POD created at $INITIAL_CREATION"
112+
113+
# Record start time
114+
START_TIME=$(date +%s)
115+
116+
echo ""
117+
echo "Testing update throttling by rapidly updating the ConfigMap..."
118+
echo "Expected behavior with updateRate=${UPDATE_RATE} updateBurst=${UPDATE_BURST}:"
119+
echo " - First ${UPDATE_BURST} updates will happen immediately (burst tokens)"
120+
echo " - Subsequent updates will be rate-limited to ${UPDATE_RATE} per second"
121+
echo ""
122+
123+
# Rapidly update the ConfigMap 10 times
124+
for i in 1 2 3 4 5 6 7 8 9 10; do
125+
echo "Update $i at $(date +%H:%M:%S)"
126+
kubectl patch configmap throttle-test --type merge -p "{\"data\":{\"counter\":\"$i\",\"timestamp\":\"$(date +%s)\"}}"
127+
sleep 1
128+
done
129+
130+
echo ""
131+
echo "Waiting 20 seconds to observe throttling behavior..."
132+
sleep 20
133+
134+
# Check how many pod restarts/updates occurred
135+
echo ""
136+
echo "Checking deployment update history..."
137+
kubectl get replicasets -l app=throttle-test -o wide
138+
139+
# Get all pods that were created (including terminated ones)
140+
echo ""
141+
echo "Checking pod creation times..."
142+
POD_COUNT=$(kubectl get pods -l app=throttle-test --show-all 2>/dev/null | grep -c throttle-test || kubectl get pods -l app=throttle-test | grep -c throttle-test)
143+
echo "Total pods created: $POD_COUNT"
144+
145+
# Get the deployment's pod template hash changes
146+
HASH_CHANGES=$(kubectl get replicasets -l app=throttle-test -o jsonpath='{range .items[*]}{.metadata.creationTimestamp}{"\t"}{.spec.template.spec.containers[0].image}{"\t"}{.spec.replicas}{"\n"}{end}')
147+
echo ""
148+
echo "ReplicaSet history (creation time, image, replicas):"
149+
echo "$HASH_CHANGES"
150+
151+
# Count how many distinct replicasets were created
152+
RS_COUNT=$(kubectl get replicasets -l app=throttle-test --no-headers | wc -l)
153+
echo ""
154+
echo "Number of ReplicaSets created: $RS_COUNT"
155+
156+
# Check wave controller logs for throttling messages
157+
echo ""
158+
echo "Wave controller logs (throttling messages):"
159+
kubectl logs -l app=wave --tail=50 | grep -i "throttl\|delayed" || echo "No throttling messages found in logs"
160+
161+
# Verify throttling worked
162+
# Calculate expected updates: burst + (rate * time)
163+
# With rate=0.2, burst=2, and ~30 seconds total time:
164+
# Expected: 2 (burst) + (0.2 * 30) = 2 + 6 = 8 updates max
165+
# We'll allow some margin and expect ≤9 to account for timing variations
166+
echo ""
167+
echo "=== Test Results ==="
168+
EXPECTED_MAX=$((UPDATE_BURST + 7))
169+
if [ "$RS_COUNT" -le "$EXPECTED_MAX" ]; then
170+
echo "✓ PASS: Rate limiting is working correctly"
171+
echo " - ConfigMap was updated 10 times rapidly"
172+
echo " - $RS_COUNT deployment updates occurred (expected ≤${EXPECTED_MAX} with rate=${UPDATE_RATE}/sec, burst=${UPDATE_BURST})"
173+
echo " - Burst allowed ${UPDATE_BURST} immediate updates, then rate-limited to ${UPDATE_RATE} per second"
174+
exit 0
175+
else
176+
echo "✗ FAIL: Rate limiting may not be working correctly"
177+
echo " - ConfigMap was updated 10 times rapidly"
178+
echo " - $RS_COUNT deployment updates occurred (expected ≤${EXPECTED_MAX} with rate=${UPDATE_RATE}/sec, burst=${UPDATE_BURST})"
179+
exit 1
180+
fi

pkg/controller/add_daemonset.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ package controller
1818

1919
import (
2020
"github.com/wave-k8s/wave/pkg/controller/daemonset"
21+
"sigs.k8s.io/controller-runtime/pkg/manager"
2122
)
2223

2324
func init() {
2425
// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
25-
AddToManagerFuncs = append(AddToManagerFuncs, daemonset.Add)
26+
AddToManagerFuncs = append(AddToManagerFuncs, func(mgr manager.Manager, cfg Config) error {
27+
return daemonset.Add(mgr, cfg.UpdateRate, cfg.UpdateBurst)
28+
})
2629
}

pkg/controller/add_deployment.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ package controller
1818

1919
import (
2020
"github.com/wave-k8s/wave/pkg/controller/deployment"
21+
"sigs.k8s.io/controller-runtime/pkg/manager"
2122
)
2223

2324
func init() {
2425
// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
25-
AddToManagerFuncs = append(AddToManagerFuncs, deployment.Add)
26+
AddToManagerFuncs = append(AddToManagerFuncs, func(mgr manager.Manager, cfg Config) error {
27+
return deployment.Add(mgr, cfg.UpdateRate, cfg.UpdateBurst)
28+
})
2629
}

pkg/controller/add_statefulset.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ package controller
1818

1919
import (
2020
"github.com/wave-k8s/wave/pkg/controller/statefulset"
21+
"sigs.k8s.io/controller-runtime/pkg/manager"
2122
)
2223

2324
func init() {
2425
// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
25-
AddToManagerFuncs = append(AddToManagerFuncs, statefulset.Add)
26+
AddToManagerFuncs = append(AddToManagerFuncs, func(mgr manager.Manager, cfg Config) error {
27+
return statefulset.Add(mgr, cfg.UpdateRate, cfg.UpdateBurst)
28+
})
2629
}

pkg/controller/controller.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,19 @@ import (
2020
"sigs.k8s.io/controller-runtime/pkg/manager"
2121
)
2222

23+
// Config holds configuration options for controllers
24+
type Config struct {
25+
UpdateRate float64 // updates per second
26+
UpdateBurst int // maximum burst size
27+
}
28+
2329
// AddToManagerFuncs is a list of functions to add all Controllers to the Manager
24-
var AddToManagerFuncs []func(manager.Manager) error
30+
var AddToManagerFuncs []func(manager.Manager, Config) error
2531

26-
// AddToManager adds all Controllers to the Manager
27-
func AddToManager(m manager.Manager) error {
32+
// AddToManager adds all Controllers to the Manager with the given configuration
33+
func AddToManager(m manager.Manager, cfg Config) error {
2834
for _, f := range AddToManagerFuncs {
29-
if err := f(m); err != nil {
35+
if err := f(m, cfg); err != nil {
3036
return err
3137
}
3238
}

pkg/controller/daemonset/daemonset_controller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,16 @@ import (
3333

3434
// Add creates a new DaemonSet Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
3535
// and Start it when the Manager is Started.
36-
func Add(mgr manager.Manager) error {
37-
r := newReconciler(mgr)
36+
func Add(mgr manager.Manager, updateRate float64, updateBurst int) error {
37+
r := newReconciler(mgr, updateRate, updateBurst)
3838
return add(mgr, r, r.handler)
3939
}
4040

4141
// newReconciler returns a new reconcile.Reconciler
42-
func newReconciler(mgr manager.Manager) *ReconcileDaemonSet {
42+
func newReconciler(mgr manager.Manager, updateRate float64, updateBurst int) *ReconcileDaemonSet {
4343
return &ReconcileDaemonSet{
4444
scheme: mgr.GetScheme(),
45-
handler: core.NewHandler[*appsv1.DaemonSet](mgr.GetClient(), mgr.GetEventRecorderFor("wave")),
45+
handler: core.NewHandler[*appsv1.DaemonSet](mgr.GetClient(), mgr.GetEventRecorderFor("wave"), updateRate, updateBurst),
4646
}
4747
}
4848

0 commit comments

Comments
 (0)