Skip to content

Commit 32db64f

Browse files
authored
Merge pull request #1323 from rhrmo/restart-csi-sidecars-faster
Restart external-snapshotter faster by releasing leader election lease on sigterm
2 parents 207bf5e + 389d076 commit 32db64f

File tree

1,290 files changed

+306142
-40
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,290 files changed

+306142
-40
lines changed

cmd/csi-snapshotter/main.go

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import (
2424
"os"
2525
"os/signal"
2626
"strings"
27+
"sync"
2728
"time"
2829

2930
"google.golang.org/grpc"
3031

3132
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3233
"k8s.io/apimachinery/pkg/labels"
3334
"k8s.io/apimachinery/pkg/runtime"
35+
server "k8s.io/apiserver/pkg/server"
3436
utilfeature "k8s.io/apiserver/pkg/util/feature"
3537
coreinformers "k8s.io/client-go/informers"
3638
"k8s.io/client-go/kubernetes"
@@ -301,19 +303,55 @@ func main() {
301303
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
302304
)
303305

304-
run := func(context.Context) {
305-
// run...
306-
stopCh := make(chan struct{})
307-
snapshotContentfactory.Start(stopCh)
308-
factory.Start(stopCh)
309-
coreFactory.Start(stopCh)
310-
go ctrl.Run(*threads, stopCh)
311-
312-
// ...until SIGINT
313-
c := make(chan os.Signal, 1)
314-
signal.Notify(c, os.Interrupt)
315-
<-c
316-
close(stopCh)
306+
// handle SIGTERM and SIGINT by cancelling the context.
307+
var (
308+
terminate func() // called when all controllers are finished
309+
controllerCtx context.Context // shuts down all controllers on a signal
310+
shutdownHandler <-chan struct{} // called when the signal is received
311+
)
312+
313+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
314+
// ctx waits for all controllers to finish, then shuts down the whole process, incl. leader election
315+
ctx, terminate = context.WithCancel(ctx)
316+
var cancelControllerCtx context.CancelFunc
317+
controllerCtx, cancelControllerCtx = context.WithCancel(ctx)
318+
shutdownHandler = server.SetupSignalHandler()
319+
320+
defer terminate()
321+
322+
go func() {
323+
defer cancelControllerCtx()
324+
<-shutdownHandler
325+
klog.Info("Received SIGTERM or SIGINT signal, shutting down controller.")
326+
}()
327+
}
328+
329+
run := func(ctx context.Context) {
330+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
331+
// run...
332+
stopCh := controllerCtx.Done()
333+
snapshotContentfactory.Start(stopCh)
334+
factory.Start(stopCh)
335+
coreFactory.Start(stopCh)
336+
var controllerWg sync.WaitGroup
337+
go ctrl.Run(*threads, stopCh, &controllerWg)
338+
<-shutdownHandler
339+
controllerWg.Wait()
340+
terminate()
341+
} else {
342+
// run...
343+
stopCh := make(chan struct{})
344+
snapshotContentfactory.Start(stopCh)
345+
factory.Start(stopCh)
346+
coreFactory.Start(stopCh)
347+
go ctrl.Run(*threads, stopCh, nil)
348+
349+
// ...until SIGINT
350+
c := make(chan os.Signal, 1)
351+
signal.Notify(c, os.Interrupt)
352+
<-c
353+
close(stopCh)
354+
}
317355
}
318356

319357
if !*leaderElection {
@@ -338,6 +376,10 @@ func main() {
338376
le.WithLeaseDuration(*leaderElectionLeaseDuration)
339377
le.WithRenewDeadline(*leaderElectionRenewDeadline)
340378
le.WithRetryPeriod(*leaderElectionRetryPeriod)
379+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
380+
le.WithReleaseOnCancel(true)
381+
le.WithContext(ctx)
382+
}
341383

342384
if err := le.Run(); err != nil {
343385
klog.Fatalf("failed to initialize leader election: %v", err)

cmd/snapshot-controller/main.go

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4040
"k8s.io/apimachinery/pkg/runtime"
4141
"k8s.io/apimachinery/pkg/util/wait"
42+
server "k8s.io/apiserver/pkg/server"
4243

4344
klog "k8s.io/klog/v2"
4445

@@ -259,18 +260,55 @@ func main() {
259260
os.Exit(1)
260261
}
261262

262-
run := func(context.Context) {
263-
// run...
264-
stopCh := make(chan struct{})
265-
factory.Start(stopCh)
266-
coreFactory.Start(stopCh)
267-
go ctrl.Run(*threads, stopCh)
268-
269-
// ...until SIGINT
270-
c := make(chan os.Signal, 1)
271-
signal.Notify(c, os.Interrupt)
272-
<-c
273-
close(stopCh)
263+
ctx := context.Background()
264+
265+
// handle SIGTERM and SIGINT by cancelling the context.
266+
var (
267+
terminate func() // called when all controllers are finished
268+
controllerCtx context.Context // shuts down all controllers on a signal
269+
shutdownHandler <-chan struct{} // called when the signal is received
270+
)
271+
272+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
273+
// ctx waits for all controllers to finish, then shuts down the whole process, incl. leader election
274+
ctx, terminate = context.WithCancel(ctx)
275+
var cancelControllerCtx context.CancelFunc
276+
controllerCtx, cancelControllerCtx = context.WithCancel(ctx)
277+
shutdownHandler = server.SetupSignalHandler()
278+
279+
defer terminate()
280+
281+
go func() {
282+
defer cancelControllerCtx()
283+
<-shutdownHandler
284+
klog.Info("Received SIGTERM or SIGINT signal, shutting down controller.")
285+
}()
286+
}
287+
288+
run := func(ctx context.Context) {
289+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
290+
// run...
291+
stopCh := controllerCtx.Done()
292+
factory.Start(stopCh)
293+
coreFactory.Start(stopCh)
294+
var controllerWg sync.WaitGroup
295+
go ctrl.Run(*threads, stopCh, &controllerWg)
296+
<-shutdownHandler
297+
controllerWg.Wait()
298+
terminate()
299+
} else {
300+
// run...
301+
stopCh := make(chan struct{})
302+
factory.Start(stopCh)
303+
coreFactory.Start(stopCh)
304+
go ctrl.Run(*threads, stopCh, nil)
305+
306+
// ...until SIGINT
307+
c := make(chan os.Signal, 1)
308+
signal.Notify(c, os.Interrupt)
309+
<-c
310+
close(stopCh)
311+
}
274312
}
275313

276314
// start listening & serving http endpoint if set
@@ -289,7 +327,7 @@ func main() {
289327
klog.Infof("Metrics http server successfully started on %s, %s", *httpEndpoint, *metricsPath)
290328

291329
defer func() {
292-
err := srv.Shutdown(context.Background())
330+
err := srv.Shutdown(ctx)
293331
if err != nil {
294332
klog.Errorf("Failed to shutdown metrics server: %s", err.Error())
295333
}
@@ -300,7 +338,7 @@ func main() {
300338
}
301339

302340
if !*leaderElection {
303-
run(context.TODO())
341+
run(ctx)
304342
} else {
305343
lockName := "snapshot-controller-leader"
306344
// Create a new clientset for leader election to prevent throttling
@@ -320,6 +358,11 @@ func main() {
320358
le.WithLeaseDuration(*leaderElectionLeaseDuration)
321359
le.WithRenewDeadline(*leaderElectionRenewDeadline)
322360
le.WithRetryPeriod(*leaderElectionRetryPeriod)
361+
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
362+
le.WithReleaseOnCancel(true)
363+
le.WithContext(ctx)
364+
}
365+
323366
if err := le.Run(); err != nil {
324367
klog.Fatalf("failed to initialize leader election: %v", err)
325368
}

go.mod

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,18 @@ require (
3131
)
3232

3333
require (
34+
cel.dev/expr v0.19.1 // indirect
35+
github.com/NYTimes/gziphandler v1.1.1 // indirect
36+
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
3437
github.com/beorn7/perks v1.0.1 // indirect
3538
github.com/blang/semver/v4 v4.0.0 // indirect
39+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
3640
github.com/cespare/xxhash/v2 v2.3.0 // indirect
41+
github.com/coreos/go-semver v0.3.1 // indirect
42+
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
3743
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3844
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
45+
github.com/felixge/httpsnoop v1.0.4 // indirect
3946
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
4047
github.com/go-logr/logr v1.4.2 // indirect
4148
github.com/go-logr/stdr v1.2.2 // indirect
@@ -44,39 +51,58 @@ require (
4451
github.com/go-openapi/jsonreference v0.21.0 // indirect
4552
github.com/go-openapi/swag v0.23.0 // indirect
4653
github.com/gogo/protobuf v1.3.2 // indirect
54+
github.com/golang/protobuf v1.5.4 // indirect
55+
github.com/google/cel-go v0.23.2 // indirect
4756
github.com/google/gnostic-models v0.6.9 // indirect
4857
github.com/google/go-cmp v0.7.0 // indirect
4958
github.com/google/uuid v1.6.0 // indirect
59+
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
60+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
5061
github.com/inconshreveable/mousetrap v1.1.0 // indirect
5162
github.com/josharian/intern v1.0.0 // indirect
5263
github.com/json-iterator/go v1.1.12 // indirect
64+
github.com/kylelemons/godebug v1.1.0 // indirect
5365
github.com/mailru/easyjson v0.9.0 // indirect
5466
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
5567
github.com/modern-go/reflect2 v1.0.2 // indirect
5668
github.com/pkg/errors v0.9.1 // indirect
5769
github.com/prometheus/procfs v0.15.1 // indirect
5870
github.com/spf13/cobra v1.8.1 // indirect
5971
github.com/spf13/pflag v1.0.5 // indirect
72+
github.com/stoewer/go-strcase v1.3.0 // indirect
6073
github.com/x448/float16 v0.8.4 // indirect
74+
go.etcd.io/etcd/api/v3 v3.5.21 // indirect
75+
go.etcd.io/etcd/client/pkg/v3 v3.5.21 // indirect
76+
go.etcd.io/etcd/client/v3 v3.5.21 // indirect
6177
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
6278
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect
79+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
6380
go.opentelemetry.io/otel v1.33.0 // indirect
81+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 // indirect
82+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 // indirect
6483
go.opentelemetry.io/otel/metric v1.33.0 // indirect
84+
go.opentelemetry.io/otel/sdk v1.33.0 // indirect
6585
go.opentelemetry.io/otel/trace v1.33.0 // indirect
86+
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
6687
go.uber.org/automaxprocs v1.6.0 // indirect
6788
go.uber.org/multierr v1.11.0 // indirect
6889
go.uber.org/zap v1.27.0 // indirect
90+
golang.org/x/crypto v0.37.0 // indirect
91+
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
6992
golang.org/x/net v0.39.0 // indirect
7093
golang.org/x/oauth2 v0.27.0 // indirect
94+
golang.org/x/sync v0.13.0 // indirect
7195
golang.org/x/sys v0.32.0 // indirect
7296
golang.org/x/term v0.31.0 // indirect
7397
golang.org/x/text v0.24.0 // indirect
7498
golang.org/x/time v0.9.0 // indirect
99+
google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
75100
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
76101
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
77102
gopkg.in/inf.v0 v0.9.1 // indirect
78103
gopkg.in/yaml.v3 v3.0.1 // indirect
79104
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
105+
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
80106
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect
81107
sigs.k8s.io/randfill v1.0.0 // indirect
82108
sigs.k8s.io/structured-merge-diff/v4 v4.6.0 // indirect

0 commit comments

Comments
 (0)