Skip to content

Commit a676533

Browse files
authored
fix: optimize node scaler log, and info API (#457)
* fix: optimize logging * fix: optimize node scaler log, and info API
1 parent 98306a1 commit a676533

File tree

6 files changed

+142
-45
lines changed

6 files changed

+142
-45
lines changed

cmd/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func main() {
254254

255255
startCustomResourceController(ctx, mgr, metricsRecorder, allocator, portAllocator, nodeExpander)
256256

257-
startHttpServerForTFClient(ctx, kc, portAllocator, indexAllocator, allocator, scheduler, mgr.Elected())
257+
startHttpServerForTFClient(ctx, kc, portAllocator, indexAllocator, allocator, scheduler, nodeExpander, mgr.Elected())
258258

259259
// +kubebuilder:scaffold:builder
260260
addHealthCheckAPI(mgr)
@@ -306,6 +306,7 @@ func startHttpServerForTFClient(
306306
indexAllocator *indexallocator.IndexAllocator,
307307
allocator *gpuallocator.GpuAllocator,
308308
scheduler *scheduler.Scheduler,
309+
nodeExpander *expander.NodeExpander,
309310
leaderChan <-chan struct{},
310311
) {
311312
client, err := client.NewWithWatch(kc, client.Options{Scheme: scheme})
@@ -333,8 +334,13 @@ func startHttpServerForTFClient(
333334
setupLog.Error(err, "failed to create allocator info router")
334335
os.Exit(1)
335336
}
337+
nodeScalerInfoRouter, err := router.NewNodeScalerInfoRouter(ctx, nodeExpander)
338+
if err != nil {
339+
setupLog.Error(err, "failed to create node scaler info router")
340+
os.Exit(1)
341+
}
336342
httpServer := server.NewHTTPServer(
337-
connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, leaderChan,
343+
connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, nodeScalerInfoRouter, leaderChan,
338344
)
339345
go func() {
340346
err := httpServer.Run()

internal/controller/pod_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
6868
pod := &corev1.Pod{}
6969
if err := r.Get(ctx, req.NamespacedName, pod); err != nil {
7070
if errors.IsNotFound(err) {
71-
r.Expander.RemovePreSchedulePod(req.Name, true)
71+
_ = r.Expander.RemovePreSchedulePod(req.Name, true)
7272
r.Allocator.DeallocByPodIdentifier(ctx, req.NamespacedName)
7373
metrics.RemoveWorkerMetrics(req.Name, time.Now())
7474
log.Info("Released GPU resources when pod deleted", "pod", req.NamespacedName)

internal/scheduler/expander/handler.go

Lines changed: 104 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,17 @@ const (
3434
)
3535

3636
type NodeExpander struct {
37-
client client.Client
38-
scheduler *scheduler.Scheduler
39-
allocator *gpuallocator.GpuAllocator
40-
logger klog.Logger
41-
inFlightNodes map[string][]*tfv1.GPU
42-
preSchedulePods map[string]*tfv1.AllocRequest
43-
preScheduleTimers map[string]*time.Timer
44-
eventRecorder record.EventRecorder
45-
mu sync.RWMutex
46-
ctx context.Context
37+
client client.Client
38+
scheduler *scheduler.Scheduler
39+
allocator *gpuallocator.GpuAllocator
40+
logger klog.Logger
41+
inFlightNodes map[string][]*tfv1.GPU
42+
inFlightNodeClaims sync.Map
43+
preSchedulePods map[string]*tfv1.AllocRequest
44+
preScheduleTimers map[string]*time.Timer
45+
eventRecorder record.EventRecorder
46+
mu sync.RWMutex
47+
ctx context.Context
4748
}
4849

4950
func NewNodeExpander(
@@ -54,15 +55,16 @@ func NewNodeExpander(
5455
) *NodeExpander {
5556

5657
expander := &NodeExpander{
57-
client: allocator.Client,
58-
scheduler: scheduler,
59-
allocator: allocator,
60-
logger: log.FromContext(ctx).WithValues("component", "NodeExpander"),
61-
inFlightNodes: make(map[string][]*tfv1.GPU, 10),
62-
preSchedulePods: make(map[string]*tfv1.AllocRequest, 20),
63-
preScheduleTimers: make(map[string]*time.Timer, 20),
64-
eventRecorder: recorder,
65-
ctx: ctx,
58+
client: allocator.Client,
59+
scheduler: scheduler,
60+
allocator: allocator,
61+
logger: log.FromContext(ctx).WithValues("component", "NodeExpander"),
62+
inFlightNodes: make(map[string][]*tfv1.GPU, 10),
63+
preSchedulePods: make(map[string]*tfv1.AllocRequest, 20),
64+
preScheduleTimers: make(map[string]*time.Timer, 20),
65+
inFlightNodeClaims: sync.Map{},
66+
eventRecorder: recorder,
67+
ctx: ctx,
6668
}
6769
allocator.RegisterBindHandler(func(req *tfv1.AllocRequest) {
6870
obj := &corev1.ObjectReference{
@@ -73,15 +75,68 @@ func NewNodeExpander(
7375
UID: req.PodMeta.UID,
7476
ResourceVersion: req.PodMeta.ResourceVersion,
7577
}
76-
recorder.Eventf(obj, corev1.EventTypeNormal, "NodeExpansionCheck",
77-
"new node provisioned and pod scheduled successfully")
78-
expander.logger.Info("new node provisioned and pod scheduled successfully",
79-
"namespace", req.PodMeta.Namespace, "pod", req.PodMeta.Name)
80-
expander.RemovePreSchedulePod(req.PodMeta.Name, true)
78+
79+
removed := expander.RemovePreSchedulePod(req.PodMeta.Name, true)
80+
if removed {
81+
recorder.Eventf(obj, corev1.EventTypeNormal, "NodeExpansionCheck",
82+
"new node provisioned and pod scheduled successfully")
83+
}
8184
})
85+
86+
// Start checking inFlightNodeClaims every minute to avoid stuck in inFlightNodes
87+
go func() {
88+
for {
89+
time.Sleep(time.Minute)
90+
expander.inFlightNodeClaims.Range(func(key, _ interface{}) bool {
91+
karpenterNodeClaim := &karpv1.NodeClaim{}
92+
if err := expander.client.Get(expander.ctx, client.ObjectKey{Name: key.(string)}, karpenterNodeClaim); err != nil {
93+
if errors.IsNotFound(err) {
94+
expander.inFlightNodeClaims.Delete(key)
95+
expander.RemoveInFlightNode(key.(string))
96+
expander.logger.Info("karpenter node claim not found, remove from inFlightNodeClaims and inFlightNodes", "nodeClaimName", key.(string))
97+
return true
98+
}
99+
expander.logger.Error(err, "failed to get karpenter node claim", "nodeClaimName", key.(string))
100+
return true
101+
}
102+
if !karpenterNodeClaim.DeletionTimestamp.IsZero() {
103+
expander.inFlightNodeClaims.Delete(key)
104+
expander.RemoveInFlightNode(key.(string))
105+
expander.logger.Info("karpenter node claim is deleted, remove from inFlightNodeClaims and inFlightNodes", "nodeClaimName", key.(string))
106+
return true
107+
}
108+
expander.mu.RLock()
109+
defer expander.mu.RUnlock()
110+
if _, ok := expander.inFlightNodes[karpenterNodeClaim.Status.NodeName]; !ok {
111+
expander.inFlightNodeClaims.Delete(key)
112+
expander.logger.Info("karpenter node claim has been provisioned, remove from inFlightNodeClaims", "nodeClaimName", key.(string))
113+
return true
114+
}
115+
return true
116+
})
117+
}
118+
}()
119+
82120
return expander
83121
}
84122

123+
func (e *NodeExpander) GetNodeScalerInfo() any {
124+
e.mu.RLock()
125+
defer e.mu.RUnlock()
126+
127+
inFlightNodeClaimSnapshot := make(map[string]any)
128+
e.inFlightNodeClaims.Range(func(key, value interface{}) bool {
129+
inFlightNodeClaimSnapshot[key.(string)] = value
130+
return true
131+
})
132+
return map[string]any{
133+
"inFlightNodes": e.inFlightNodes,
134+
"inFlightNodeClaims": inFlightNodeClaimSnapshot,
135+
"preSchedulePods": e.preSchedulePods,
136+
"preScheduleTimerNum": len(e.preScheduleTimers),
137+
}
138+
}
139+
85140
func (e *NodeExpander) ProcessExpansion(ctx context.Context, pod *corev1.Pod) error {
86141
if pod == nil {
87142
return fmt.Errorf("pod cannot be nil")
@@ -196,11 +251,11 @@ func (e *NodeExpander) addInFlightNodeAndPreSchedulePod(allocRequest *tfv1.Alloc
196251
err := e.client.Get(e.ctx, client.ObjectKey{Name: podMeta.Name, Namespace: podMeta.Namespace}, currentPod)
197252
if err != nil {
198253
if errors.IsNotFound(err) || !currentPod.DeletionTimestamp.IsZero() {
199-
e.RemovePreSchedulePod(podMeta.Name, false)
254+
_ = e.RemovePreSchedulePod(podMeta.Name, false)
200255
}
201256
e.logger.Error(err, "failed to get pod for node expansion check",
202257
"namespace", podMeta.Namespace, "pod", podMeta.Name)
203-
e.RemovePreSchedulePod(podMeta.Name, false)
258+
_ = e.RemovePreSchedulePod(podMeta.Name, false)
204259
return
205260
}
206261
if currentPod.Spec.NodeName != "" {
@@ -209,14 +264,14 @@ func (e *NodeExpander) addInFlightNodeAndPreSchedulePod(allocRequest *tfv1.Alloc
209264
"new node provisioned and pod scheduled successfully")
210265
e.logger.Info("new node provisioned and pod scheduled successfully",
211266
"namespace", podMeta.Namespace, "pod", podMeta.Name)
212-
e.RemovePreSchedulePod(podMeta.Name, false)
267+
_ = e.RemovePreSchedulePod(podMeta.Name, false)
213268
} else {
214269
// not scheduled, record warning event and remove pre-scheduled pod
215270
e.eventRecorder.Eventf(currentPod, corev1.EventTypeWarning, "NodeExpansionCheck",
216271
"failed to schedule pod after 10 minutes")
217272
e.logger.Info("failed to schedule pod after 10 minutes",
218273
"namespace", podMeta.Namespace, "pod", podMeta.Name)
219-
e.RemovePreSchedulePod(podMeta.Name, false)
274+
_ = e.RemovePreSchedulePod(podMeta.Name, false)
220275
}
221276
})
222277
e.preScheduleTimers[podMeta.Name] = timer
@@ -228,25 +283,32 @@ func (e *NodeExpander) RemoveInFlightNode(nodeName string) {
228283
return
229284
}
230285
e.mu.Lock()
231-
delete(e.inFlightNodes, nodeName)
232-
e.logger.Info("Removed in-flight node", "node", nodeName, "remaining inflight nodes", len(e.inFlightNodes))
286+
if _, ok := e.inFlightNodes[nodeName]; ok {
287+
delete(e.inFlightNodes, nodeName)
288+
e.logger.Info("Removed in-flight node", "node", nodeName, "remaining inflight nodes", len(e.inFlightNodes))
289+
}
233290
e.mu.Unlock()
234291
}
235292

236-
func (e *NodeExpander) RemovePreSchedulePod(podName string, stopTimer bool) {
293+
func (e *NodeExpander) RemovePreSchedulePod(podName string, stopTimer bool) bool {
237294
if e == nil {
238-
return
295+
return false
239296
}
240297
e.mu.Lock()
298+
defer e.mu.Unlock()
241299
if stopTimer {
242300
if timer, ok := e.preScheduleTimers[podName]; ok {
243301
timer.Stop()
244302
}
245303
}
246304
delete(e.preScheduleTimers, podName)
247-
delete(e.preSchedulePods, podName)
248-
e.logger.Info("Removed pre-scheduled pod", "pod", podName, "remaining pre-scheduled pods", len(e.preSchedulePods))
249-
e.mu.Unlock()
305+
306+
if _, ok := e.preSchedulePods[podName]; ok {
307+
delete(e.preSchedulePods, podName)
308+
e.logger.Info("Removed pre-scheduled pod", "pod", podName, "remaining pre-scheduled pods", len(e.preSchedulePods))
309+
return true
310+
}
311+
return false
250312
}
251313

252314
func (e *NodeExpander) prepareNewNodesForScheduleAttempt(
@@ -327,7 +389,7 @@ func (e *NodeExpander) checkGPUFitWithInflightNodes(pod *corev1.Pod, gpus []*tfv
327389
if !preScheduledPodPreAllocated {
328390
e.logger.Info("[Warning] pre-scheduled pod can not set into InFlight node anymore, remove queue and retry later",
329391
"pod", alloc.PodMeta.Name, "namespace", alloc.PodMeta.Namespace)
330-
e.RemovePreSchedulePod(alloc.PodMeta.Name, true)
392+
_ = e.RemovePreSchedulePod(alloc.PodMeta.Name, true)
331393
}
332394
}
333395

@@ -409,13 +471,13 @@ func (e *NodeExpander) createGPUNodeClaim(ctx context.Context, pod *corev1.Pod,
409471
e.logger.Info("node is not owned by any known provisioner, skip expansion", "node", preparedNode.Name)
410472
return fmt.Errorf("node is not owned by any known provisioner, skip expansion")
411473
}
412-
e.logger.Info("start expanding node from existing template node", "tmplNode", preparedNode.Name)
474+
e.logger.Info("start expanding node from existing template node", "newNodeClaimName", preparedNode.Name)
413475
if isKarpenterNodeClaim {
414476
// Check if controllerMeta's parent is GPUNodeClaim using unstructured object
415477
return e.handleKarpenterNodeClaim(ctx, pod, preparedNode, controlledBy)
416478
} else if isGPUNodeClaim {
417479
// Running in Provisioning mode, clone the parent GPUNodeClaim and apply
418-
e.logger.Info("node is controlled by GPUNodeClaim, cloning another to expand node", "tmplNode", preparedNode.Name)
480+
e.logger.Info("node is controlled by GPUNodeClaim, cloning another to expand node", "newNode", preparedNode.Name)
419481
return e.cloneGPUNodeClaim(ctx, pod, preparedNode, controlledBy)
420482
}
421483
return nil
@@ -450,12 +512,12 @@ func (e *NodeExpander) handleKarpenterNodeClaim(ctx context.Context, pod *corev1
450512
if nodeClaimParent != nil && nodeClaimParent.Kind == tfv1.GPUNodeClaimKind {
451513
// Parent is GPUNodeClaim, clone it and let cloudprovider module create real GPUNode
452514
e.logger.Info("NodeClaim parent is GPUNodeClaim, cloning another to expand node",
453-
"nodeClaimName", controlledBy.Name, "gpuNodeClaimParent", nodeClaimParent.Name)
515+
"controlledBy", controlledBy.Name, "gpuNodeClaimParent", nodeClaimParent.Name)
454516
return e.cloneGPUNodeClaim(ctx, pod, preparedNode, nodeClaimParent)
455517
} else if hasNodePoolParent {
456518
// owned by Karpenter node pool, create NodeClaim directly with special label identifier
457519
e.logger.Info("NodeClaim owned by Karpenter Pool, creating Karpenter NodeClaim to expand node",
458-
"nodeClaimName", controlledBy.Name)
520+
"controlledBy", controlledBy.Name)
459521
return e.createKarpenterNodeClaimDirect(ctx, pod, preparedNode, nodeClaim)
460522
} else {
461523
return fmt.Errorf("NodeClaim has no valid parent, can not expand node, should not happen")
@@ -527,9 +589,10 @@ func (e *NodeExpander) createKarpenterNodeClaimDirect(ctx context.Context, pod *
527589
e.eventRecorder.Eventf(pod, corev1.EventTypeWarning, "NodeExpansionFailed", "failed to create new NodeClaim: %v", err)
528590
return fmt.Errorf("failed to create NodeClaim: %w", err)
529591
}
530-
592+
e.inFlightNodeClaims.Store(newNodeClaim.Name, true)
531593
e.eventRecorder.Eventf(pod, corev1.EventTypeNormal, "NodeExpansionCompleted", "created new NodeClaim for node expansion: %s", newNodeClaim.Name)
532594
e.logger.Info("created new NodeClaim for node expansion", "pod", pod.Name, "namespace", pod.Namespace, "nodeClaim", newNodeClaim.Name)
595+
533596
return nil
534597
}
535598

internal/scheduler/expander/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func testPreScheduledPodManagement(suite *NodeExpanderTestSuite) {
273273
Expect(exists).To(BeTrue())
274274

275275
// Test removing pre-scheduled pod
276-
suite.nodeExpander.RemovePreSchedulePod("test-pod", true)
276+
_ = suite.nodeExpander.RemovePreSchedulePod("test-pod", true)
277277

278278
// Verify pre-scheduled pod is removed
279279
suite.nodeExpander.mu.RLock()
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package router
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
"github.com/NexusGPU/tensor-fusion/internal/scheduler/expander"
8+
"github.com/gin-gonic/gin"
9+
)
10+
11+
type NodeScalerInfoRouter struct {
12+
nodeExpander *expander.NodeExpander
13+
}
14+
15+
func NewNodeScalerInfoRouter(
16+
ctx context.Context,
17+
nodeExpander *expander.NodeExpander,
18+
) (*NodeScalerInfoRouter, error) {
19+
return &NodeScalerInfoRouter{nodeExpander: nodeExpander}, nil
20+
}
21+
22+
func (r *NodeScalerInfoRouter) Get(ctx *gin.Context) {
23+
ctx.JSON(http.StatusOK, gin.H{
24+
"data": r.nodeExpander.GetNodeScalerInfo(),
25+
})
26+
}

internal/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ func NewHTTPServer(
1515
ahp *router.AssignHostPortRouter,
1616
ai *router.AssignIndexRouter,
1717
alc *router.AllocatorInfoRouter,
18+
nsi *router.NodeScalerInfoRouter,
1819
leaderChan <-chan struct{},
1920
) *gin.Engine {
2021

@@ -59,5 +60,6 @@ func NewHTTPServer(
5960
apiGroup.GET("/config", func(ctx *gin.Context) {
6061
ctx.JSON(http.StatusOK, gin.H{"config": config.GetGlobalConfig()})
6162
})
63+
apiGroup.GET("/node-scaler", nsi.Get)
6264
return r
6365
}

0 commit comments

Comments
 (0)