Skip to content

Commit ef7c537

Browse files
committed
Node removal latency metrics added
1 parent 2e528f9 commit ef7c537

File tree

12 files changed

+324
-29
lines changed

12 files changed

+324
-29
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ type AutoscalingOptions struct {
342342
ProactiveScaleupEnabled bool
343343
// PodInjectionLimit limits total number of pods while injecting fake pods.
344344
PodInjectionLimit int
345+
// NodeLatencyTrackingEnabled is used to enable/disable node latency tracking.
346+
NodeLatencyTrackingEnabled bool
345347
}
346348

347349
// KubeClientOptions specify options for kube client

cluster-autoscaler/config/flags/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ var (
227227
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
228228
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
229229
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
230+
nodeLatencyTrackingEnabled = flag.Bool("enable-node-latency-tracking", false, "Whether logic for monitoring of node latency is enabled.")
230231

231232
// Deprecated flags
232233
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
@@ -408,6 +409,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
408409
NodeInfoCacheExpireTime: *nodeInfoCacheExpireTime,
409410
ProactiveScaleupEnabled: *proactiveScaleupEnabled,
410411
PodInjectionLimit: *podInjectionLimit,
412+
NodeLatencyTrackingEnabled: *nodeLatencyTrackingEnabled,
411413
}
412414
}
413415

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
2929
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
30+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3233
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
@@ -58,6 +59,7 @@ const (
5859
type Actuator struct {
5960
ctx *context.AutoscalingContext
6061
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
62+
nodeLatencyTracker *latencytracker.NodeLatencyTracker
6163
nodeDeletionScheduler *GroupDeletionScheduler
6264
deleteOptions options.NodeDeleteOptions
6365
drainabilityRules rules.Rules
@@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface {
7880
}
7981

8082
// NewActuator returns a new instance of Actuator.
81-
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
83+
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt *latencytracker.NodeLatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
8284
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
8385
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
8486
var evictor Evictor
@@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
9092
return &Actuator{
9193
ctx: ctx,
9294
nodeDeletionTracker: ndt,
95+
nodeLatencyTracker: nlt,
9396
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
9497
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
9598
deleteOptions: deleteOptions,
@@ -324,6 +327,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
324327
}
325328

326329
for _, node := range nodes {
330+
if a.nodeLatencyTracker != nil {
331+
a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now())
332+
}
327333
nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name)
328334
if err != nil {
329335
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerErrorf(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}

cluster-autoscaler/core/scaledown/actuation/actuator_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"k8s.io/autoscaler/cluster-autoscaler/config"
4040
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
4141
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
42+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
4243
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
4344
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
4445
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
@@ -1279,6 +1280,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
12791280
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
12801281
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
12811282
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
1283+
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
12821284
}
12831285

12841286
var gotResult status.ScaleDownResult
@@ -1557,6 +1559,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
15571559
ctx: &ctx, nodeDeletionTracker: ndt,
15581560
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
15591561
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
1562+
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
15601563
}
15611564

15621565
for _, nodes := range deleteNodes {
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing,
11+
software distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package latencytracker
18+
19+
import (
20+
"sync"
21+
"testing"
22+
"time"
23+
)
24+
25+
func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) {
26+
tracker := NewNodeLatencyTracker()
27+
now := time.Now()
28+
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
29+
30+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
31+
32+
tracker.Lock()
33+
defer tracker.Unlock()
34+
if _, ok := tracker.nodes["node1"]; !ok {
35+
t.Errorf("expected node1 to be tracked, but was not")
36+
}
37+
}
38+
39+
func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) {
40+
tracker := NewNodeLatencyTracker()
41+
now := time.Now()
42+
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}
43+
44+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
45+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute))
46+
47+
tracker.Lock()
48+
defer tracker.Unlock()
49+
if len(tracker.nodes) != 1 {
50+
t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes))
51+
}
52+
}
53+
54+
func TestObserveDeletion_RemovesNode(t *testing.T) {
55+
tracker := NewNodeLatencyTracker()
56+
now := time.Now()
57+
node := NodeInfo{
58+
Name: "node1",
59+
UnneededSince: now.Add(-10 * time.Minute),
60+
Threshold: 5 * time.Minute,
61+
}
62+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
63+
64+
tracker.ObserveDeletion("node1", now)
65+
66+
tracker.Lock()
67+
defer tracker.Unlock()
68+
if _, ok := tracker.nodes["node1"]; ok {
69+
t.Errorf("expected node1 removed after ObserveDeletion")
70+
}
71+
}
72+
73+
func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
74+
tracker := NewNodeLatencyTracker()
75+
now := time.Now()
76+
77+
tracker.ObserveDeletion("node1", now)
78+
79+
tracker.Lock()
80+
defer tracker.Unlock()
81+
if len(tracker.nodes) != 0 {
82+
t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes))
83+
}
84+
}
85+
86+
func TestConcurrentUpdatesAndDeletions(t *testing.T) {
87+
tracker := NewNodeLatencyTracker()
88+
now := time.Now()
89+
90+
node := NodeInfo{
91+
Name: "node1",
92+
UnneededSince: now,
93+
Threshold: 2 * time.Minute,
94+
}
95+
96+
var wg sync.WaitGroup
97+
stop := make(chan struct{})
98+
99+
wg.Add(1)
100+
go func() {
101+
defer wg.Done()
102+
for {
103+
select {
104+
case <-stop:
105+
return
106+
default:
107+
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now())
108+
}
109+
}
110+
}()
111+
112+
wg.Add(1)
113+
go func() {
114+
defer wg.Done()
115+
for {
116+
select {
117+
case <-stop:
118+
return
119+
default:
120+
tracker.ObserveDeletion("node1", time.Now())
121+
}
122+
}
123+
}()
124+
125+
time.Sleep(50 * time.Millisecond)
126+
close(stop)
127+
wg.Wait()
128+
129+
tracker.Lock()
130+
defer tracker.Unlock()
131+
if len(tracker.nodes) > 1 {
132+
t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes))
133+
}
134+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package latencytracker
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"k8s.io/autoscaler/cluster-autoscaler/metrics"
8+
9+
"k8s.io/klog/v2"
10+
)
11+
12+
type NodeInfo struct {
13+
Name string
14+
UnneededSince time.Time
15+
Threshold time.Duration
16+
}
17+
18+
type NodeLatencyTracker struct {
19+
sync.Mutex
20+
nodes map[string]NodeInfo
21+
}
22+
23+
// NewNodeLatencyTracker creates a new tracker.
24+
func NewNodeLatencyTracker() *NodeLatencyTracker {
25+
return &NodeLatencyTracker{
26+
nodes: make(map[string]NodeInfo),
27+
}
28+
}
29+
30+
func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) {
31+
t.Lock()
32+
defer t.Unlock()
33+
34+
currentSet := make(map[string]struct{}, len(list))
35+
for _, info := range list {
36+
currentSet[info.Name] = struct{}{}
37+
_, exists := t.nodes[info.Name]
38+
if !exists {
39+
t.nodes[info.Name] = NodeInfo{
40+
Name: info.Name,
41+
UnneededSince: info.UnneededSince,
42+
Threshold: info.Threshold,
43+
}
44+
klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v",
45+
info.Name, info.UnneededSince, info.Threshold)
46+
}
47+
}
48+
}
49+
50+
// ObserveDeletion is called by the actuator just before node deletion.
51+
func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
52+
t.Lock()
53+
defer t.Unlock()
54+
55+
if info, exists := t.nodes[nodeName]; exists {
56+
duration := timestamp.Sub(info.UnneededSince)
57+
58+
klog.V(2).Infof(
59+
"Observing deletion for node %s, unneeded for %s (threshold was %s).",
60+
nodeName, duration, info.Threshold,
61+
)
62+
63+
metrics.UpdateScaleDownNodeDeletionDuration("true", duration-info.Threshold)
64+
delete(t.nodes, nodeName)
65+
}
66+
}

cluster-autoscaler/core/scaledown/planner/planner.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/autoscaler/cluster-autoscaler/context"
2727
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
2828
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
29+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
2930
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3031
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
3132
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
@@ -76,10 +77,11 @@ type Planner struct {
7677
cc controllerReplicasCalculator
7778
scaleDownSetProcessor nodes.ScaleDownSetProcessor
7879
scaleDownContext *nodes.ScaleDownContext
80+
nodeLatencyTracker *latencytracker.NodeLatencyTracker
7981
}
8082

8183
// New creates a new Planner object.
82-
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner {
84+
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt *latencytracker.NodeLatencyTracker) *Planner {
8385
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
8486
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
8587
if minUpdateInterval == 0*time.Nanosecond {
@@ -98,6 +100,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
98100
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
99101
scaleDownContext: nodes.NewDefaultScaleDownContext(),
100102
minUpdateInterval: minUpdateInterval,
103+
nodeLatencyTracker: nlt,
101104
}
102105
}
103106

@@ -301,6 +304,19 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
301304
}
302305
}
303306
p.unneededNodes.Update(removableList, p.latestUpdate)
307+
if p.nodeLatencyTracker != nil {
308+
var unneededList []latencytracker.NodeInfo
309+
for _, n := range p.unneededNodes.AsList() {
310+
if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok {
311+
unneededList = append(unneededList, latencytracker.NodeInfo{
312+
Name: n.Name,
313+
UnneededSince: p.latestUpdate,
314+
Threshold: threshold,
315+
})
316+
}
317+
}
318+
p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate)
319+
}
304320
if unremovableCount > 0 {
305321
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
306322
}

cluster-autoscaler/core/scaledown/planner/planner_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"k8s.io/autoscaler/cluster-autoscaler/config"
3333
"k8s.io/autoscaler/cluster-autoscaler/context"
3434
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
35+
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
3536
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
3637
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
3738
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
@@ -500,7 +501,7 @@ func TestUpdateClusterState(t *testing.T) {
500501
assert.NoError(t, err)
501502
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
502503
deleteOptions := options.NodeDeleteOptions{}
503-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
504+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
504505
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
505506
if tc.isSimulationTimeout {
506507
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
@@ -696,7 +697,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
696697
assert.NoError(t, err)
697698
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
698699
deleteOptions := options.NodeDeleteOptions{}
699-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
700+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
700701
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
701702
p.minUpdateInterval = tc.updateInterval
702703
p.unneededNodes.Update(previouslyUnneeded, time.Now())
@@ -864,7 +865,7 @@ func TestNodesToDelete(t *testing.T) {
864865
assert.NoError(t, err)
865866
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
866867
deleteOptions := options.NodeDeleteOptions{}
867-
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
868+
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
868869
p.latestUpdate = time.Now()
869870
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
870871
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))

0 commit comments

Comments
 (0)