Skip to content

[WIP] Add Latency predictor to EPP #1161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
Expand Down Expand Up @@ -118,6 +118,9 @@ var (
"totalQueuedRequestsMetric",
runserver.DefaultTotalQueuedRequestsMetric,
"Prometheus metric for the number of queued requests.")
totalRunningRequestsMetric = flag.String("totalRunningRequestsMetric",
runserver.DefaultTotalRunningRequestsMetric,
"Prometheus metric for the number of running requests.")
kvCacheUsagePercentageMetric = flag.String(
"kvCacheUsagePercentageMetric",
runserver.DefaultKvCacheUsagePercentageMetric,
Expand All @@ -137,6 +140,8 @@ var (
runserver.DefaultConfigText,
"The configuration specified as text, in lieu of a file")

enableLatencyPredictor = flag.Bool("enable-latency-predictor", false, "Enable the regression-based latency predictor and scheduler scorer.")

modelServerMetricsPort = flag.Int("modelServerMetricsPort", 0, "Port to scrape metrics from pods. "+
"Default value will be set to InferencePool.Spec.TargetPortNumber if not set.")
modelServerMetricsPath = flag.String("modelServerMetricsPath", "/metrics", "Path to scrape metrics from pods")
Expand Down Expand Up @@ -231,6 +236,7 @@ func (r *Runner) Run(ctx context.Context) error {
// --- Setup Datastore ---
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
)
Expand Down Expand Up @@ -282,6 +288,26 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

// ===================================================================
// == Latency Predictor Integration
// ===================================================================
var predictor latencypredictor.PredictorInterface // Use the interface type
if *enableLatencyPredictor {
setupLog.Info("Latency predictor is enabled. Initializing...")
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))

// For the runnable, you'll need to type assert back to the concrete type
concretePredictor := predictor.(*latencypredictor.Predictor)
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
setupLog.Error(err, "Failed to register latency predictor runnable")
return err
}
} else {
setupLog.Info("Latency predictor is disabled.")
predictor = nil // This will be a true nil interface
}

// ===================================================================
// --- Initialize Core EPP Components ---
scheduler, err := r.initializeScheduler()
if err != nil {
Expand All @@ -291,7 +317,7 @@ func (r *Runner) Run(ctx context.Context) error {

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig, predictor)

// --- Setup ExtProc Server Runner ---
serverRunner := &runserver.ExtProcServerRunner{
Expand All @@ -306,6 +332,7 @@ func (r *Runner) Run(ctx context.Context) error {
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Director: director,
SaturationDetector: saturationDetector,
LatencyPredictor: predictor,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
Expand Down Expand Up @@ -497,3 +524,22 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
}
return nil
}

// ===================================================================
// == Latency Predictor Plugin and Helpers
// ===================================================================

// predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
type predictorRunnable struct {
predictor *latencypredictor.Predictor
}

// Start begins the predictor's background processes and blocks until the context is cancelled.
func (p *predictorRunnable) Start(ctx context.Context) error {
setupLog.Info("Starting latency predictor...")
p.predictor.Start(ctx)
<-ctx.Done()
setupLog.Info("Stopping latency predictor...")
p.predictor.Stop()
return nil
}
Loading