Skip to content

Add opentelemetry tracing instrumentation #1125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
"sigs.k8s.io/gateway-api-inference-extension/pkg/tracing"
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)
Expand Down Expand Up @@ -168,6 +169,16 @@ func (r *Runner) Run(ctx context.Context) error {
})
setupLog.Info("Flags processed", "flags", flags)

// --- Initialize Distributed Tracing ---
tracingConfig := tracing.NewConfigFromEnv()
if tracingShutdown, err := tracing.Initialize(ctx, tracingConfig); err != nil {
setupLog.Error(err, "failed to setup distributed tracing")
return err
} else {
defer tracingShutdown()
setupLog.Info("tracing initialized", "enabled", tracingConfig.Enabled)
}

// --- Load Configurations from Environment Variables ---
sdConfig := saturationdetector.LoadConfigFromEnv()

Expand Down
24 changes: 24 additions & 0 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/go-logr/logr"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/tracing"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
Expand Down Expand Up @@ -137,6 +139,10 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
loggerTrace := logger.V(logutil.TRACE)
loggerTrace.Info("Processing")

// Create parent span for the entire request processing
ctx, span := tracing.StartGatewaySpan(ctx, tracing.OperationGatewayRequest)
defer span.End()

// Create request context to share states during life time of an HTTP request.
// See https://github.com/envoyproxy/envoy/issues/17540.
reqCtx := &RequestContext{
Expand All @@ -161,8 +167,13 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
defer func(error, *RequestContext) {
if reqCtx.ResponseStatusCode != "" {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeError))
} else if err != nil {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeError))
} else {
span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeSuccess))
tracing.SetSpanSuccess(span)
}
if reqCtx.RequestRunning {
metrics.DecRunningRequests(reqCtx.Model)
Expand Down Expand Up @@ -218,13 +229,26 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
reqCtx, err = s.director.HandleRequest(ctx, reqCtx)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error handling request")
tracing.SetSpanError(span, err)
break
}

// Add span attributes now that we have model and endpoint information
if reqCtx.Model != "" {
span.SetAttributes(attribute.String(tracing.AttrGenAIRequestModel, reqCtx.Model))
}
if reqCtx.ResolvedTargetModel != "" {
span.SetAttributes(attribute.String(tracing.AttrGatewayTargetModel, reqCtx.ResolvedTargetModel))
}
if reqCtx.TargetEndpoint != "" {
span.SetAttributes(attribute.String(tracing.AttrGatewayTargetEndpoint, reqCtx.TargetEndpoint))
}

// Populate the ExtProc protocol responses for the request body.
requestBodyBytes, err := json.Marshal(reqCtx.Request.Body)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error marshalling request body")
tracing.SetSpanError(span, err)
break
}
reqCtx.RequestSize = len(requestBodyBytes)
Expand Down
46 changes: 43 additions & 3 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/go-logr/logr"
"go.opentelemetry.io/otel/attribute"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
Expand All @@ -36,6 +37,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/tracing"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
Expand Down Expand Up @@ -84,20 +86,31 @@ type Director struct {
//
// It always returns the requestContext even in the error case, as the request context is used in error handling.
func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
ctx, span := tracing.StartGatewaySpan(ctx, tracing.OperationRequestOrchestration)
defer span.End()

logger := log.FromContext(ctx)

// --- 1. Parse Request, Resolve Target Models, and Determine Parameters ---
var ok bool
requestBodyMap := reqCtx.Request.Body
reqCtx.Model, ok = requestBodyMap["model"].(string)
if !ok {
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"}
err := errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"}
tracing.SetSpanError(span, err)
return reqCtx, err
}
prompt, err := requtil.ExtractPromptFromRequestBody(requestBodyMap)
if err != nil {
tracing.SetSpanError(span, err)
return reqCtx, err
}

span.SetAttributes(
attribute.String(tracing.AttrGenAIRequestModel, reqCtx.Model),
attribute.Int("gateway.prompt_length", len(prompt)),
)

modelObj := d.datastore.ModelGet(reqCtx.Model)
if modelObj == nil {
logger.Info("No associated inferenceModel found, using default", "model", reqCtx.Model)
Expand Down Expand Up @@ -137,29 +150,56 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
ctx = log.IntoContext(ctx, logger)
logger.V(logutil.DEBUG).Info("LLM request assembled")

span.SetAttributes(
attribute.String(tracing.AttrGatewayTargetModel, reqCtx.ResolvedTargetModel),
attribute.String(tracing.AttrGatewayRequestCriticality, string(requestCriticality)),
)

// --- 2. Admission Control check --
if err := d.admitRequest(ctx, requestCriticality); err != nil {
tracing.SetSpanError(span, err)
return reqCtx, err
}

// --- 3. Call Scheduler (with the relevant candidate pods) ---
candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata)
if len(candidatePods) == 0 {
return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
err := errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"}
tracing.SetSpanError(span, err)
return reqCtx, err
}
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)

span.SetAttributes(attribute.Int(tracing.AttrGatewayCandidatePods, len(candidatePods)))

// Create a child span for scheduling operation - this will connect to KV Cache Manager
schedulingCtx, schedulingSpan := tracing.StartGatewaySpan(ctx, tracing.OperationScheduling)
defer schedulingSpan.End()

results, err := d.scheduler.Schedule(schedulingCtx, reqCtx.SchedulingRequest, candidatePods)
if err != nil {
tracing.SetSpanError(schedulingSpan, err)
tracing.SetSpanError(span, err)
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}

tracing.SetSpanSuccess(schedulingSpan)

// --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) ---
// Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number.
// Invoke PreRequest registered plugins.
reqCtx, err = d.prepareRequest(ctx, reqCtx, results)
if err != nil {
tracing.SetSpanError(span, err)
return reqCtx, err
}

if reqCtx.TargetEndpoint != "" {
span.SetAttributes(attribute.String(tracing.AttrGatewayTargetEndpoint, reqCtx.TargetEndpoint))
}

span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeSuccess))
tracing.SetSpanSuccess(span)

return reqCtx, nil
}

Expand Down
164 changes: 164 additions & 0 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package tracing provides OpenTelemetry tracing infrastructure for the gateway-api-inference-extension
package tracing

import (
"context"
"fmt"
"os"
"strconv"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
ServiceName = "gateway-api-inference-extension"

envOTELTracingEnabled = "OTEL_TRACING_ENABLED"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we just check the existence of OTEL_EXPORTER_OTLP_ENDPOINT for enablement? If the endpoint is specified then it's likely the user wants to enable the feature.

envOTELExporterEndpoint = "OTEL_EXPORTER_OTLP_ENDPOINT"
envOTELServiceName = "OTEL_SERVICE_NAME"
envOTELSamplingRate = "OTEL_SAMPLING_RATE"

OperationGatewayRequest = "gateway.ext_proc.request"
OperationRequestOrchestration = "gateway.request_orchestration"
OperationScheduling = "gateway.scheduling"

AttrGenAIRequestModel = "gen_ai.request.model"
AttrOperationOutcome = "operation.outcome"

OutcomeSuccess = "success"
OutcomeError = "error"
)

type Config struct {
Enabled bool
ExporterEndpoint string
SamplingRate float64
ServiceName string
}

func NewConfigFromEnv() *Config {
config := &Config{
Enabled: false,
ExporterEndpoint: "http://localhost:4317",
SamplingRate: 0.1,
ServiceName: ServiceName,
}

if enabled := os.Getenv(envOTELTracingEnabled); enabled != "" {
if enabledBool, err := strconv.ParseBool(enabled); err == nil {
config.Enabled = enabledBool
}
}

if endpoint := os.Getenv(envOTELExporterEndpoint); endpoint != "" {
config.ExporterEndpoint = endpoint
}

if serviceName := os.Getenv(envOTELServiceName); serviceName != "" {
config.ServiceName = serviceName
}

if samplingRate := os.Getenv(envOTELSamplingRate); samplingRate != "" {
if rate, err := strconv.ParseFloat(samplingRate, 64); err == nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to inform the user e.g. by failing during the creation whenever a value is not recognized?

config.SamplingRate = rate
}
}

return config
}

// Initialize sets up OpenTelemetry tracing with the given configuration.
// It always sets up context propagation, even if tracing is disabled.
func Initialize(ctx context.Context, config *Config) (func(), error) {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))

if !config.Enabled {
// Return a no-op shutdown function if tracing is disabled
return func() {}, nil
}

conn, err := grpc.DialContext(ctx, config.ExporterEndpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(5*time.Second),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be configurable?

)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}

exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err)
}

res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String(config.ServiceName),
semconv.ServiceVersionKey.String("1.0.0"),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use debug/buildinfo here?

),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.SamplingRate)),
)

otel.SetTracerProvider(tp)

return func() {
tp.Shutdown(context.Background())
}, nil
}

func StartSpan(ctx context.Context, name, operation string) (context.Context, trace.Span) {
tracer := otel.Tracer(ServiceName)
return tracer.Start(ctx, name)
}

func StartGatewaySpan(ctx context.Context, operation string) (context.Context, trace.Span) {
ctx, span := StartSpan(ctx, operation, operation)
// TODO: Add common gateway attributes here
return ctx, span
}
Comment on lines +150 to +155
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want to replace them with something like:

func SetGatewayAttributes(span trace.Span, gateway Gateway) {
	if gateway == nil {
		return
	}

	span.SetAttributes(
		AttrGatewaySomething.String(gateway.Something()),
	)
}

Allowing a user to set attributes without the need to know the specific Keys?


func SetSpanError(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

func SetSpanSuccess(span trace.Span) {
span.SetStatus(codes.Ok, "")
}