Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ type StaticConfig struct {
StsScopes []string `toml:"sts_scopes,omitempty"`
CertificateAuthority string `toml:"certificate_authority,omitempty"`
ServerURL string `toml:"server_url,omitempty"`
// HealthEndpoint is the health check endpoint
HealthEndpoint string `toml:"health_endpoint,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every new flag complicates the maintenance and user experience. Why do we need to modify all these endpoints?. Why do we need to add a functionality to modify SSE endpoints, as it is already a deprecated functionality?.

// SSEEndpoint is the SSE endpoint
SSEEndpoint string `toml:"sse_endpoint,omitempty"`
// SSEMessageEndpoint is the SSE message endpoint
SSEMessageEndpoint string `toml:"sse_message_endpoint,omitempty"`
// StreamableHttpEndpoint is the streamable http endpoint
StreamableHttpEndpoint string `toml:"streamable_http_endpoint,omitempty"`
}

type GroupVersionKind struct {
Expand Down
25 changes: 25 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ denied_resources = [

enabled_tools = ["configuration_view", "events_list", "namespaces_list", "pods_list", "resources_list", "resources_get", "resources_create_or_update", "resources_delete"]
disabled_tools = ["pods_delete", "pods_top", "pods_log", "pods_run", "pods_exec"]

health_endpoint = "/k8s/healthz"
streamable_http_endpoint = "/k8s/mcp"
sse_endpoint = "/k8s/sse"
sse_message_endpoint = "/k8s/message"
`)

config, err := ReadConfig(validConfigPath)
Expand Down Expand Up @@ -142,6 +147,26 @@ disabled_tools = ["pods_delete", "pods_top", "pods_log", "pods_run", "pods_exec"
}
}
})
t.Run("health_endpoint parsed correctly", func(t *testing.T) {
if config.HealthEndpoint != "/k8s/healthz" {
t.Fatalf("Unexpected health_endpoint value: %v", config.HealthEndpoint)
}
})
t.Run("streamable_http_endpoint parsed correctly", func(t *testing.T) {
if config.StreamableHttpEndpoint != "/k8s/mcp" {
t.Fatalf("Unexpected streamable_http_endpoint value: %v", config.StreamableHttpEndpoint)
}
})
t.Run("sse_endpoint parsed correctly", func(t *testing.T) {
if config.SSEEndpoint != "/k8s/sse" {
t.Fatalf("Unexpected sse_endpoint value: %v", config.SSEEndpoint)
}
})
t.Run("sse_message_endpoint parsed correctly", func(t *testing.T) {
if config.SSEMessageEndpoint != "/k8s/message" {
t.Fatalf("Unexpected sse_message_endpoint value: %v", config.SSEMessageEndpoint)
}
})
}

func writeConfig(t *testing.T, content string) string {
Expand Down
1 change: 1 addition & 0 deletions pkg/http/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type KubernetesApiTokenVerifier interface {
func AuthorizationMiddleware(staticConfig *config.StaticConfig, oidcProvider *oidc.Provider, verifier KubernetesApiTokenVerifier) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
healthEndpoint := getEndpointOrDefault(staticConfig.HealthEndpoint, defaultHealthEndpoint)
if r.URL.Path == healthEndpoint || slices.Contains(WellKnownEndpoints, r.URL.EscapedPath()) {
next.ServeHTTP(w, r)
return
Expand Down
25 changes: 19 additions & 6 deletions pkg/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ import (
)

const (
healthEndpoint = "/healthz"
mcpEndpoint = "/mcp"
sseEndpoint = "/sse"
sseMessageEndpoint = "/message"
defaultHealthEndpoint = "/healthz"
defaultMcpEndpoint = "/mcp"
defaultSseEndpoint = "/sse"
defaultSseMessageEndpoint = "/message"
)

// getEndpointOrDefault returns the endpoint value, otherwise returns the default value.
func getEndpointOrDefault(configValue, defaultValue string) string {
if configValue != "" {
return configValue
}
return defaultValue
}

func Serve(ctx context.Context, mcpServer *mcp.Server, staticConfig *config.StaticConfig, oidcProvider *oidc.Provider) error {
mux := http.NewServeMux()

Expand All @@ -36,7 +44,12 @@ func Serve(ctx context.Context, mcpServer *mcp.Server, staticConfig *config.Stat
Handler: wrappedMux,
}

sseServer := mcpServer.ServeSse(staticConfig.SSEBaseURL, httpServer)
healthEndpoint := getEndpointOrDefault(staticConfig.HealthEndpoint, defaultHealthEndpoint)
mcpEndpoint := getEndpointOrDefault(staticConfig.StreamableHttpEndpoint, defaultMcpEndpoint)
sseEndpoint := getEndpointOrDefault(staticConfig.SSEEndpoint, defaultSseEndpoint)
sseMessageEndpoint := getEndpointOrDefault(staticConfig.SSEMessageEndpoint, defaultSseMessageEndpoint)

sseServer := mcpServer.ServeSse(staticConfig.SSEBaseURL, sseEndpoint, sseMessageEndpoint, httpServer)
streamableHttpServer := mcpServer.ServeHTTP(httpServer)
mux.Handle(sseEndpoint, sseServer)
mux.Handle(sseMessageEndpoint, sseServer)
Expand All @@ -54,7 +67,7 @@ func Serve(ctx context.Context, mcpServer *mcp.Server, staticConfig *config.Stat

serverErr := make(chan error, 1)
go func() {
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths /mcp, /sse, /message", staticConfig.Port)
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths %s, %s, %s", staticConfig.Port, mcpEndpoint, sseEndpoint, sseMessageEndpoint)
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
serverErr <- err
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/kubernetes-mcp-server/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type MCPServerOptions struct {
CertificateAuthority string
ServerURL string

HealthEndpoint string
StreamableHttpEndpoint string
SSEEndpoint string
SSEMessageEndpoint string

ConfigPath string
StaticConfig *config.StaticConfig

Expand Down Expand Up @@ -120,6 +125,10 @@ func NewMCPServer(streams genericiooptions.IOStreams) *cobra.Command {
cmd.Flags().BoolVar(&o.ReadOnly, "read-only", o.ReadOnly, "If true, only tools annotated with readOnlyHint=true are exposed")
cmd.Flags().BoolVar(&o.DisableDestructive, "disable-destructive", o.DisableDestructive, "If true, tools annotated with destructiveHint=true are disabled")
cmd.Flags().BoolVar(&o.RequireOAuth, "require-oauth", o.RequireOAuth, "If true, requires OAuth authorization as defined in the Model Context Protocol (MCP) specification. This flag is ignored if transport type is stdio")
cmd.Flags().StringVar(&o.HealthEndpoint, "health-endpoint", "/healthz", "Use for set health endpoint to use for health checks")
cmd.Flags().StringVar(&o.StreamableHttpEndpoint, "streamable-http-endpoint", "/mcp", "Use for set streamable http requests endpoint")
cmd.Flags().StringVar(&o.SSEEndpoint, "sse-endpoint", "/sse", "Use for set sse requests endpoint")
cmd.Flags().StringVar(&o.SSEMessageEndpoint, "sse-message-endpoint", "/message", "Use for set sse message requests endpoint")
_ = cmd.Flags().MarkHidden("require-oauth")
cmd.Flags().StringVar(&o.OAuthAudience, "oauth-audience", o.OAuthAudience, "OAuth audience for token claims validation. Optional. If not set, the audience is not validated. Only valid if require-oauth is enabled.")
_ = cmd.Flags().MarkHidden("oauth-audience")
Expand Down Expand Up @@ -200,6 +209,18 @@ func (m *MCPServerOptions) loadFlags(cmd *cobra.Command) {
if cmd.Flag("certificate-authority").Changed {
m.StaticConfig.CertificateAuthority = m.CertificateAuthority
}
if cmd.Flag("health-endpoint").Changed {
m.StaticConfig.HealthEndpoint = m.HealthEndpoint
}
if cmd.Flag("streamable-http-endpoint").Changed {
m.StaticConfig.StreamableHttpEndpoint = m.StreamableHttpEndpoint
}
if cmd.Flag("sse-endpoint").Changed {
m.StaticConfig.SSEEndpoint = m.SSEEndpoint
}
if cmd.Flag("sse-message-endpoint").Changed {
m.StaticConfig.SSEMessageEndpoint = m.SSEMessageEndpoint
}
}

func (m *MCPServerOptions) initializeLogging() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/mcp/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,18 @@ func (s *Server) ServeStdio() error {
return server.ServeStdio(s.server)
}

func (s *Server) ServeSse(baseUrl string, httpServer *http.Server) *server.SSEServer {
func (s *Server) ServeSse(baseUrl, sseEndpoint, sseMessageEndpoint string, httpServer *http.Server) *server.SSEServer {
options := make([]server.SSEOption, 0)
options = append(options, server.WithSSEContextFunc(contextFunc), server.WithHTTPServer(httpServer))
if baseUrl != "" {
options = append(options, server.WithBaseURL(baseUrl))
}
if sseEndpoint != "" {
options = append(options, server.WithSSEEndpoint(sseEndpoint))
}
if sseMessageEndpoint != "" {
options = append(options, server.WithMessageEndpoint(sseMessageEndpoint))
}
return server.NewSSEServer(s.server, options...)
}

Expand Down