Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ type AutoscalingOptions struct {
ProactiveScaleupEnabled bool
// PodInjectionLimit limits total number of pods while injecting fake pods.
PodInjectionLimit int
// NodeLatencyTrackingEnabled is used to enable/disable node latency tracking.
NodeLatencyTrackingEnabled bool
}

// KubeClientOptions specify options for kube client
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ var (
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
nodeLatencyTrackingEnabled = flag.Bool("enable-node-latency-tracking", false, "Whether logic for monitoring of node latency is enabled.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name of the flag is unclear. We are missing information that it is latency of removal.


// Deprecated flags
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
Expand Down Expand Up @@ -408,6 +409,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeInfoCacheExpireTime: *nodeInfoCacheExpireTime,
ProactiveScaleupEnabled: *proactiveScaleupEnabled,
PodInjectionLimit: *podInjectionLimit,
NodeLatencyTrackingEnabled: *nodeLatencyTrackingEnabled,
}
}

Expand Down
8 changes: 7 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
Expand Down Expand Up @@ -58,6 +59,7 @@ const (
type Actuator struct {
ctx *context.AutoscalingContext
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeLatencyTracker *latencytracker.NodeLatencyTracker
nodeDeletionScheduler *GroupDeletionScheduler
deleteOptions options.NodeDeleteOptions
drainabilityRules rules.Rules
Expand All @@ -78,7 +80,7 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupchange.NodeGroupChangeObserver, ndt *deletiontracker.NodeDeletionTracker, nlt *latencytracker.NodeLatencyTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, scaleStateNotifier, ndt, ctx.NodeDeletionBatcherInterval)
legacyFlagDrainConfig := SingleRuleDrainConfig(ctx.MaxGracefulTerminationSec)
var evictor Evictor
Expand All @@ -90,6 +92,7 @@ func NewActuator(ctx *context.AutoscalingContext, scaleStateNotifier nodegroupch
return &Actuator{
ctx: ctx,
nodeDeletionTracker: ndt,
nodeLatencyTracker: nlt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
Expand Down Expand Up @@ -324,6 +327,9 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
}

for _, node := range nodes {
if a.nodeLatencyTracker != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn’t seem like the best placement. Deletion still might fail (look at the checks below). I think we would want to report only successful deletion.

a.nodeLatencyTracker.ObserveDeletion(node.Name, time.Now())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider covering this logic with test.

}
nodeInfo, err := clusterSnapshot.GetNodeInfo(node.Name)
if err != nil {
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerErrorf(errors.InternalError, "nodeInfos.Get for %q returned error: %v", node.Name, err)}
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
Expand Down Expand Up @@ -1279,6 +1280,7 @@ func runStartDeletionTest(t *testing.T, tc startDeletionTestCase, force bool) {
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
}

var gotResult status.ScaleDownResult
Expand Down Expand Up @@ -1557,6 +1559,7 @@ func TestStartDeletionInBatchBasic(t *testing.T) {
ctx: &ctx, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
nodeLatencyTracker: latencytracker.NewNodeLatencyTracker(),
}

for _, nodes := range deleteNodes {
Expand Down

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names of the test file and file implementation do not match.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any test checking the reported value.

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package latencytracker

import (
"sync"
"testing"
"time"
)

func TestUpdateStateWithUnneededList_AddsNewNodes(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}

tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)

tracker.Lock()
defer tracker.Unlock()
if _, ok := tracker.nodes["node1"]; !ok {
t.Errorf("expected node1 to be tracked, but was not")
}
}

func TestUpdateStateWithUnneededList_DoesNotDuplicate(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{Name: "node1", UnneededSince: now, Threshold: 5 * time.Minute}

tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now.Add(time.Minute))

tracker.Lock()
defer tracker.Unlock()
if len(tracker.nodes) != 1 {
t.Errorf("expected 1 tracked node, got %d", len(tracker.nodes))
}
}

func TestObserveDeletion_RemovesNode(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()
node := NodeInfo{
Name: "node1",
UnneededSince: now.Add(-10 * time.Minute),
Threshold: 5 * time.Minute,
}
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, now)

tracker.ObserveDeletion("node1", now)

tracker.Lock()
defer tracker.Unlock()
if _, ok := tracker.nodes["node1"]; ok {
t.Errorf("expected node1 removed after ObserveDeletion")
}
}

func TestObserveDeletion_NoOpIfNodeNotTracked(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()

tracker.ObserveDeletion("node1", now)

tracker.Lock()
defer tracker.Unlock()
if len(tracker.nodes) != 0 {
t.Errorf("expected no nodes tracked, got %d", len(tracker.nodes))
}
}

func TestConcurrentUpdatesAndDeletions(t *testing.T) {
tracker := NewNodeLatencyTracker()
now := time.Now()

node := NodeInfo{
Name: "node1",
UnneededSince: now,
Threshold: 2 * time.Minute,
}

var wg sync.WaitGroup
stop := make(chan struct{})

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
tracker.UpdateStateWithUnneededList([]NodeInfo{node}, time.Now())
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
tracker.ObserveDeletion("node1", time.Now())
}
}
}()

time.Sleep(50 * time.Millisecond)
close(stop)
wg.Wait()

tracker.Lock()
defer tracker.Unlock()
if len(tracker.nodes) > 1 {
t.Errorf("expected at most 1 tracked node, got %d", len(tracker.nodes))
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File name not aligned with convention - using underscore between words.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package latencytracker

import (
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/metrics"

"k8s.io/klog/v2"
)

type NodeInfo struct {
Name string

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have duplicated information. Name is already in key.

UnneededSince time.Time
Threshold time.Duration
}

type NodeLatencyTracker struct {
sync.Mutex
nodes map[string]NodeInfo
}

// NewNodeLatencyTracker creates a new tracker.
func NewNodeLatencyTracker() *NodeLatencyTracker {
return &NodeLatencyTracker{
nodes: make(map[string]NodeInfo),
}
}

func (t *NodeLatencyTracker) UpdateStateWithUnneededList(list []NodeInfo, timestamp time.Time) {
t.Lock()
defer t.Unlock()

currentSet := make(map[string]struct{}, len(list))
for _, info := range list {
currentSet[info.Name] = struct{}{}
_, exists := t.nodes[info.Name]
if !exists {
t.nodes[info.Name] = NodeInfo{
Name: info.Name,
UnneededSince: info.UnneededSince,
Threshold: info.Threshold,
}
klog.V(2).Infof("Started tracking unneeded node %s at %v with ScaleDownUnneededTime=%v",
info.Name, info.UnneededSince, info.Threshold)
}
}
}

// ObserveDeletion is called by the actuator just before node deletion.
func (t *NodeLatencyTracker) ObserveDeletion(nodeName string, timestamp time.Time) {
t.Lock()
defer t.Unlock()

if info, exists := t.nodes[nodeName]; exists {
duration := timestamp.Sub(info.UnneededSince)

klog.V(2).Infof(
"Observing deletion for node %s, unneeded for %s (threshold was %s).",
nodeName, duration, info.Threshold,
)

metrics.UpdateScaleDownNodeDeletionDuration("true", duration-info.Threshold)
Comment on lines +58 to +63

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging and updating metric don't need and shouldn’t be done under lock.

delete(t.nodes, nodeName)
}
}
18 changes: 17 additions & 1 deletion cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
Expand Down Expand Up @@ -76,10 +77,11 @@ type Planner struct {
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
nodeLatencyTracker *latencytracker.NodeLatencyTracker
}

// New creates a new Planner object.
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner {
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, nlt *latencytracker.NodeLatencyTracker) *Planner {
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
if minUpdateInterval == 0*time.Nanosecond {
Expand All @@ -98,6 +100,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
scaleDownContext: nodes.NewDefaultScaleDownContext(),
minUpdateInterval: minUpdateInterval,
nodeLatencyTracker: nlt,
}
}

Expand Down Expand Up @@ -301,6 +304,19 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
}
}
p.unneededNodes.Update(removableList, p.latestUpdate)
if p.nodeLatencyTracker != nil {
var unneededList []latencytracker.NodeInfo
for _, n := range p.unneededNodes.AsList() {
if threshold, ok := p.unneededNodes.GetUnneededTimeForNode(p.context, n.Name); ok {
unneededList = append(unneededList, latencytracker.NodeInfo{
Name: n.Name,
UnneededSince: p.latestUpdate,
Threshold: threshold,
})
}
}
p.nodeLatencyTracker.UpdateStateWithUnneededList(unneededList, p.latestUpdate)
}
if unremovableCount > 0 {
klog.V(1).Infof("%v nodes found to be unremovable in simulation, will re-check them at %v", unremovableCount, unremovableTimeout)
}
Expand Down
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/latencytracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
Expand Down Expand Up @@ -500,7 +501,7 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
Expand Down Expand Up @@ -696,7 +697,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
Expand Down Expand Up @@ -864,7 +865,7 @@ func TestNodesToDelete(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil, latencytracker.NewNodeLatencyTracker())
p.latestUpdate = time.Now()
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))
Expand Down
Loading
Loading