Skip to content

Commit 8d36d7d

Browse files
authored
fix: forward leader api auth issue (#459)
* fix: forward leader api auth issue * fix: forward api auth issue
1 parent 86a254b commit 8d36d7d

File tree

8 files changed

+52
-14
lines changed

8 files changed

+52
-14
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
"Hygon",
8585
"iface",
8686
"imageutils",
87+
"indexallocator",
8788
"influxdata",
8889
"internalcache",
8990
"internalqueue",

cmd/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ func startHttpServerForTFClient(
343343
connectionRouter, assignHostPortRouter, assignIndexRouter, allocatorInfoRouter, nodeScalerInfoRouter, leaderChan,
344344
)
345345
go func() {
346+
// Port defaults to 8080, set with env var `PORT`
346347
err := httpServer.Run()
347348
if err != nil {
348349
setupLog.Error(err, "problem running HTTP server")

internal/indexallocator/indexallocator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,12 @@ func (s *IndexAllocator) SetupWithManager(ctx context.Context, mgr manager.Manag
8686
// Index wraps around from 512 to 1 (simple modulo operation)
8787
func (s *IndexAllocator) AssignIndex(podName string) (int, error) {
8888
if !s.IsLeader {
89+
log.FromContext(s.ctx).Error(nil, "only leader can assign index", "podName", podName)
8990
return 0, fmt.Errorf("only leader can assign index")
9091
}
91-
9292
// Atomic increment and wrap around
9393
next := atomic.AddInt64(&s.currentIndex, 1)
9494
index := int((next-1)%IndexRangeEnd) + IndexRangeStart
95-
95+
log.FromContext(s.ctx).Info("assigned index successfully", "podName", podName, "index", index)
9696
return index, nil
9797
}

internal/server/router/assign_host_port.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"net/http"
77

8-
"github.com/NexusGPU/tensor-fusion/internal/constants"
98
"github.com/NexusGPU/tensor-fusion/internal/portallocator"
109
"github.com/NexusGPU/tensor-fusion/internal/utils"
1110
"github.com/gin-gonic/gin"
@@ -26,9 +25,8 @@ func NewAssignHostPortRouter(ctx context.Context, allocator *portallocator.PortA
2625

2726
func (r *AssignHostPortRouter) AssignHostPort(ctx *gin.Context) {
2827
podName := ctx.Query("podName")
29-
token := ctx.Request.Header.Get(constants.AuthorizationHeader)
30-
31-
if token == "" {
28+
token, ok := utils.ExtractBearerToken(ctx)
29+
if !ok {
3230
log.FromContext(ctx).Error(nil, "assigned host port failed, missing token", "podName", podName)
3331
ctx.String(http.StatusUnauthorized, "missing authorization header")
3432
return

internal/server/router/assign_index.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"net/http"
77

8-
"github.com/NexusGPU/tensor-fusion/internal/constants"
98
"github.com/NexusGPU/tensor-fusion/internal/indexallocator"
109
"github.com/NexusGPU/tensor-fusion/internal/utils"
1110
"github.com/gin-gonic/gin"
@@ -26,9 +25,8 @@ func NewAssignIndexRouter(ctx context.Context, allocator *indexallocator.IndexAl
2625

2726
func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) {
2827
podName := ctx.Query("podName")
29-
token := ctx.Request.Header.Get(constants.AuthorizationHeader)
30-
31-
if token == "" {
28+
token, ok := utils.ExtractBearerToken(ctx)
29+
if !ok {
3230
log.FromContext(ctx).Error(nil, "assigned index failed, missing token", "podName", podName)
3331
ctx.String(http.StatusUnauthorized, "missing authorization header")
3432
return
@@ -47,7 +45,8 @@ func (r *AssignIndexRouter) AssignIndex(ctx *gin.Context) {
4745
return
4846
}
4947
if !tokenReview.Status.Authenticated || tokenReview.Status.User.Username != utils.GetSelfServiceAccountNameFull() {
50-
log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName)
48+
log.FromContext(ctx).Error(nil, "assigned index failed, token invalid", "podName", podName,
49+
"authPassed", tokenReview.Status.Authenticated, "username", tokenReview.Status.User.Username)
5150
ctx.String(http.StatusUnauthorized, "token authentication failed")
5251
return
5352
}

internal/utils/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ const (
2929
var selfServiceAccountName string
3030

3131
func InitServiceAccountConfig() {
32+
if os.Getenv("IMPERSONATE_SERVICE_ACCOUNT") != "" {
33+
selfServiceAccountName = os.Getenv("IMPERSONATE_SERVICE_ACCOUNT")
34+
ctrl.Log.Info("impersonate service account mode detected", "name", selfServiceAccountName)
35+
return
36+
}
3237
data, err := os.ReadFile(ServiceAccountTokenPath)
3338
if err != nil {
3439
ctrl.Log.Info("service account token not found, run outside of Kubernetes cluster")

internal/utils/svr.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package utils
2+
3+
import (
4+
"strings"
5+
6+
"github.com/NexusGPU/tensor-fusion/internal/constants"
7+
"github.com/gin-gonic/gin"
8+
)
9+
10+
const BearerPrefix = "Bearer "
11+
12+
// ExtractBearerToken extracts the authorization token from the gin context.
13+
// It handles both cases: token with "Bearer " prefix and token without prefix.
14+
// Returns the token string (with Bearer prefix stripped if present) and true if token exists.
15+
// Returns empty string and false if token is missing.
16+
func ExtractBearerToken(ctx *gin.Context) (string, bool) {
17+
token := ctx.Request.Header.Get(constants.AuthorizationHeader)
18+
if token == "" {
19+
return "", false
20+
}
21+
22+
// Strip Bearer prefix if present
23+
token = strings.TrimPrefix(token, BearerPrefix)
24+
25+
return token, true
26+
}

internal/webhook/v1/pod_webhook.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"io"
2424
"net/http"
25+
"os"
2526
"strconv"
2627
"time"
2728

@@ -46,6 +47,13 @@ import (
4647
)
4748

4849
var httpClient = &http.Client{Timeout: 10 * time.Second}
50+
var operatorPort = "8080"
51+
52+
func init() {
53+
if port := os.Getenv("PORT"); port != "" {
54+
operatorPort = port
55+
}
56+
}
4957

5058
// SetupPodWebhookWithManager registers the webhook for Pod in the manager.
5159
func SetupPodWebhookWithManager(mgr ctrl.Manager, portAllocator *portallocator.PortAllocator, indexAllocator *indexallocator.IndexAllocator, pricingProvider pricing.PricingProvider) error {
@@ -386,7 +394,7 @@ func (m *TensorFusionPodMutator) assignDeviceAllocationIndex(ctx context.Context
386394
var indexErr error
387395
podIdentifier := pod.Name
388396
if podIdentifier == "" {
389-
// For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID
397+
// For Deployment/StatefulSet created pods, Name might be empty, use GenerateName + UID(maybe empty)
390398
podIdentifier = pod.GenerateName + string(pod.UID)
391399
}
392400

@@ -576,7 +584,7 @@ func (m *TensorFusionPodMutator) assignClusterHostPortFromLeader(pod *corev1.Pod
576584
return 0, fmt.Errorf("operator leader IP not found")
577585
}
578586

579-
urlStr := fmt.Sprintf("http://%s:8080/assign-host-port?podName=%s", leaderIP, pod.Name)
587+
urlStr := fmt.Sprintf("http://%s:%s/api/assign-host-port?podName=%s", leaderIP, operatorPort, pod.Name)
580588
req, err := http.NewRequest("GET", urlStr, nil)
581589
if err != nil {
582590
return 0, err
@@ -613,7 +621,7 @@ func (m *TensorFusionPodMutator) assignIndexFromLeader(ctx context.Context, pod
613621
if podIdentifier == "" {
614622
podIdentifier = pod.GenerateName + string(pod.UID)
615623
}
616-
urlStr := fmt.Sprintf("http://%s:8080/assign-index?podName=%s", leaderIP, podIdentifier)
624+
urlStr := fmt.Sprintf("http://%s:%s/api/assign-index?podName=%s", leaderIP, operatorPort, podIdentifier)
617625
req, err := http.NewRequestWithContext(ctx, "GET", urlStr, nil)
618626
if err != nil {
619627
return 0, err

0 commit comments

Comments
 (0)