diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 45acf1a95..cb4410989 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -53,6 +53,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/tracing" envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -168,6 +169,16 @@ func (r *Runner) Run(ctx context.Context) error { }) setupLog.Info("Flags processed", "flags", flags) + // --- Initialize Distributed Tracing --- + tracingConfig := tracing.NewConfigFromEnv() + if tracingShutdown, err := tracing.Initialize(ctx, tracingConfig); err != nil { + setupLog.Error(err, "failed to setup distributed tracing") + return err + } else { + defer tracingShutdown() + setupLog.Info("tracing initialized", "enabled", tracingConfig.Enabled) + } + // --- Load Configurations from Environment Variables --- sdConfig := saturationdetector.LoadConfigFromEnv() diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 3ac13c892..bcdf2b8a9 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -26,6 +26,7 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/go-logr/logr" + "go.opentelemetry.io/otel/attribute" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "sigs.k8s.io/controller-runtime/pkg/log" @@ -33,6 +34,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/tracing" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" @@ -137,6 +139,10 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) loggerTrace := logger.V(logutil.TRACE) loggerTrace.Info("Processing") + // Create parent span for the entire request processing + ctx, span := tracing.StartGatewaySpan(ctx, tracing.OperationGatewayRequest) + defer span.End() + // Create request context to share states during life time of an HTTP request. // See https://github.com/envoyproxy/envoy/issues/17540. reqCtx := &RequestContext{ @@ -161,8 +167,13 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) defer func(error, *RequestContext) { if reqCtx.ResponseStatusCode != "" { metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode) + span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeError)) } else if err != nil { metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err)) + span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeError)) + } else { + span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeSuccess)) + tracing.SetSpanSuccess(span) } if reqCtx.RequestRunning { metrics.DecRunningRequests(reqCtx.Model) @@ -218,13 +229,26 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) reqCtx, err = s.director.HandleRequest(ctx, reqCtx) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Error handling request") + tracing.SetSpanError(span, err) break } + // Add span attributes now that we have model and endpoint information + if reqCtx.Model != "" { + span.SetAttributes(attribute.String(tracing.AttrGenAIRequestModel, reqCtx.Model)) + } + if reqCtx.ResolvedTargetModel != "" { + span.SetAttributes(attribute.String(tracing.AttrGatewayTargetModel, reqCtx.ResolvedTargetModel)) + } + if reqCtx.TargetEndpoint != "" { + span.SetAttributes(attribute.String(tracing.AttrGatewayTargetEndpoint, reqCtx.TargetEndpoint)) + } + // Populate the ExtProc protocol responses for the request body. requestBodyBytes, err := json.Marshal(reqCtx.Request.Body) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Error marshalling request body") + tracing.SetSpanError(span, err) break } reqCtx.RequestSize = len(requestBodyBytes) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 2eabde353..96bc9c6a7 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -28,6 +28,7 @@ import ( "time" "github.com/go-logr/logr" + "go.opentelemetry.io/otel/attribute" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" @@ -36,6 +37,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/tracing" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" @@ -84,6 +86,9 @@ type Director struct { // // It always returns the requestContext even in the error case, as the request context is used in error handling. func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { + ctx, span := tracing.StartGatewaySpan(ctx, tracing.OperationRequestOrchestration) + defer span.End() + logger := log.FromContext(ctx) // --- 1. Parse Request, Resolve Target Models, and Determine Parameters --- @@ -91,13 +96,21 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo requestBodyMap := reqCtx.Request.Body reqCtx.Model, ok = requestBodyMap["model"].(string) if !ok { - return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} + err := errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} + tracing.SetSpanError(span, err) + return reqCtx, err } prompt, err := requtil.ExtractPromptFromRequestBody(requestBodyMap) if err != nil { + tracing.SetSpanError(span, err) return reqCtx, err } + span.SetAttributes( + attribute.String(tracing.AttrGenAIRequestModel, reqCtx.Model), + attribute.Int("gateway.prompt_length", len(prompt)), + ) + modelObj := d.datastore.ModelGet(reqCtx.Model) if modelObj == nil { logger.Info("No associated inferenceModel found, using default", "model", reqCtx.Model) @@ -137,29 +150,56 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo ctx = log.IntoContext(ctx, logger) logger.V(logutil.DEBUG).Info("LLM request assembled") + span.SetAttributes( + attribute.String(tracing.AttrGatewayTargetModel, reqCtx.ResolvedTargetModel), + attribute.String(tracing.AttrGatewayRequestCriticality, string(requestCriticality)), + ) + // --- 2. Admission Control check -- if err := d.admitRequest(ctx, requestCriticality); err != nil { + tracing.SetSpanError(span, err) return reqCtx, err } // --- 3. Call Scheduler (with the relevant candidate pods) --- candidatePods := d.getCandidatePodsForScheduling(ctx, reqCtx.Request.Metadata) if len(candidatePods) == 0 { - return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} + err := errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} + tracing.SetSpanError(span, err) + return reqCtx, err } - results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods) + + span.SetAttributes(attribute.Int(tracing.AttrGatewayCandidatePods, len(candidatePods))) + + // Create a child span for scheduling operation - this will connect to KV Cache Manager + schedulingCtx, schedulingSpan := tracing.StartGatewaySpan(ctx, tracing.OperationScheduling) + defer schedulingSpan.End() + + results, err := d.scheduler.Schedule(schedulingCtx, reqCtx.SchedulingRequest, candidatePods) if err != nil { + tracing.SetSpanError(schedulingSpan, err) + tracing.SetSpanError(span, err) return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} } + tracing.SetSpanSuccess(schedulingSpan) + // --- 4. Prepare Request (Populates RequestContext and call PreRequest plugins) --- // Insert target endpoint to instruct Envoy to route requests to the specified target pod and attach the port number. // Invoke PreRequest registered plugins. reqCtx, err = d.prepareRequest(ctx, reqCtx, results) if err != nil { + tracing.SetSpanError(span, err) return reqCtx, err } + if reqCtx.TargetEndpoint != "" { + span.SetAttributes(attribute.String(tracing.AttrGatewayTargetEndpoint, reqCtx.TargetEndpoint)) + } + + span.SetAttributes(attribute.String(tracing.AttrOperationOutcome, tracing.OutcomeSuccess)) + tracing.SetSpanSuccess(span) + return reqCtx, nil } diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go new file mode 100644 index 000000000..787444f5f --- /dev/null +++ b/pkg/tracing/tracing.go @@ -0,0 +1,164 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package tracing provides OpenTelemetry tracing infrastructure for the gateway-api-inference-extension +package tracing + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + ServiceName = "gateway-api-inference-extension" + + envOTELTracingEnabled = "OTEL_TRACING_ENABLED" + envOTELExporterEndpoint = "OTEL_EXPORTER_OTLP_ENDPOINT" + envOTELServiceName = "OTEL_SERVICE_NAME" + envOTELSamplingRate = "OTEL_SAMPLING_RATE" + + OperationGatewayRequest = "gateway.ext_proc.request" + OperationRequestOrchestration = "gateway.request_orchestration" + OperationScheduling = "gateway.scheduling" + + AttrGenAIRequestModel = "gen_ai.request.model" + AttrOperationOutcome = "operation.outcome" + + OutcomeSuccess = "success" + OutcomeError = "error" +) + +type Config struct { + Enabled bool + ExporterEndpoint string + SamplingRate float64 + ServiceName string +} + +func NewConfigFromEnv() *Config { + config := &Config{ + Enabled: false, + ExporterEndpoint: "http://localhost:4317", + SamplingRate: 0.1, + ServiceName: ServiceName, + } + + if enabled := os.Getenv(envOTELTracingEnabled); enabled != "" { + if enabledBool, err := strconv.ParseBool(enabled); err == nil { + config.Enabled = enabledBool + } + } + + if endpoint := os.Getenv(envOTELExporterEndpoint); endpoint != "" { + config.ExporterEndpoint = endpoint + } + + if serviceName := os.Getenv(envOTELServiceName); serviceName != "" { + config.ServiceName = serviceName + } + + if samplingRate := os.Getenv(envOTELSamplingRate); samplingRate != "" { + if rate, err := strconv.ParseFloat(samplingRate, 64); err == nil { + config.SamplingRate = rate + } + } + + return config +} + +// Initialize sets up OpenTelemetry tracing with the given configuration. +// It always sets up context propagation, even if tracing is disabled. +func Initialize(ctx context.Context, config *Config) (func(), error) { + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + if !config.Enabled { + // Return a no-op shutdown function if tracing is disabled + return func() {}, nil + } + + conn, err := grpc.DialContext(ctx, config.ExporterEndpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithTimeout(5*time.Second), + ) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err) + } + + exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) + } + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceNameKey.String(config.ServiceName), + semconv.ServiceVersionKey.String("1.0.0"), + ), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.TraceIDRatioBased(config.SamplingRate)), + ) + + otel.SetTracerProvider(tp) + + return func() { + tp.Shutdown(context.Background()) + }, nil +} + +func StartSpan(ctx context.Context, name, operation string) (context.Context, trace.Span) { + tracer := otel.Tracer(ServiceName) + return tracer.Start(ctx, name) +} + +func StartGatewaySpan(ctx context.Context, operation string) (context.Context, trace.Span) { + ctx, span := StartSpan(ctx, operation, operation) + // TODO: Add common gateway attributes here + return ctx, span +} + +func SetSpanError(span trace.Span, err error) { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) +} + +func SetSpanSuccess(span trace.Span) { + span.SetStatus(codes.Ok, "") +}