Skip to content

feat: improve logging for plugin selection and scoring #1178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/epp/common/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1"
configapi "sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

var scheme = runtime.NewScheme()
Expand Down Expand Up @@ -93,7 +95,17 @@ func LoadSchedulerConfig(configProfiles []v1alpha1.SchedulingProfile, handle plu
return nil, errors.New("no profile handler was specified")
}

return scheduling.NewSchedulerConfig(profileHandler, profiles), nil
schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, profiles)

// Log the configuration at startup for visibility
logger := ctrl.Log.WithName("scheduler-config")
logger.V(logutil.DEFAULT).Info("Profile handler enabled", "type", profileHandler.TypedName().Type, "name", profileHandler.TypedName().Name)

for profileName, profile := range profiles {
profile.LogConfiguration(logger, profileName)
}

return schedulerConfig, nil
}

func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error {
Expand Down
63 changes: 59 additions & 4 deletions pkg/epp/scheduling/framework/scheduler_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
Expand Down Expand Up @@ -104,6 +105,50 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...plugins.Plugin) error {
return nil
}

// LogConfiguration logs the configuration of this scheduler profile at startup.
// This provides visibility into which plugins are enabled and their weights.
func (p *SchedulerProfile) LogConfiguration(logger logr.Logger, profileName string) {
logger = logger.V(logutil.DEFAULT) // Use DEFAULT level so it's always visible to operators

// Log filters
if len(p.filters) > 0 {
filterNames := make([]string, 0, len(p.filters))
for _, filter := range p.filters {
filterNames = append(filterNames, filter.TypedName().Type)
}
logger.Info("Scheduler profile filters enabled", "profile", profileName, "filters", filterNames)
} else {
logger.Info("Scheduler profile has no filters", "profile", profileName)
}

// Log scorers with their weights
if len(p.scorers) > 0 {
scorerInfo := make([]string, 0, len(p.scorers))
for _, scorer := range p.scorers {
scorerInfo = append(scorerInfo, fmt.Sprintf("%s(weight=%d)", scorer.TypedName().Type, scorer.Weight()))
}
logger.Info("Scheduler profile scorers enabled", "profile", profileName, "scorers", scorerInfo)
} else {
logger.Info("Scheduler profile has no scorers", "profile", profileName)
}

// Log picker
if p.picker != nil {
logger.Info("Scheduler profile picker enabled", "profile", profileName, "picker", p.picker.TypedName().Type)
} else {
logger.Info("Scheduler profile has no picker", "profile", profileName)
}

// Log post-cycle plugins
if len(p.postCyclePlugins) > 0 {
postCycleNames := make([]string, 0, len(p.postCyclePlugins))
for _, plugin := range p.postCyclePlugins {
postCycleNames = append(postCycleNames, plugin.TypedName().Type)
}
logger.Info("Scheduler profile post-cycle plugins enabled", "profile", profileName, "postCyclePlugins", postCycleNames)
}
}

// RunCycle runs a SchedulerProfile cycle. In other words, it invokes all the SchedulerProfile plugins in this
// order - Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result.
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, candidatePods []types.Pod) (*types.ProfileRunResult, error) {
Expand Down Expand Up @@ -151,16 +196,26 @@ func (p *SchedulerProfile) runScorerPlugins(ctx context.Context, request *types.
}
// Iterate through each scorer in the chain and accumulate the weighted scores.
for _, scorer := range p.scorers {
loggerDebug.Info("Running scorer", "scorer", scorer.TypedName().Type)
loggerDebug.Info("Running scorer", "scorer", scorer.TypedName().Type, "weight", scorer.Weight())
before := time.Now()
scores := scorer.Score(ctx, cycleState, request, pods)
metrics.RecordSchedulerPluginProcessingLatency(ScorerPluginType, scorer.TypedName().Type, time.Since(before))
for pod, score := range scores { // weight is relative to the sum of weights
weightedScorePerPod[pod] += score * float64(scorer.Weight())

// Log individual scores for debugging
for pod, score := range scores {
weightedScore := score * float64(scorer.Weight())
weightedScorePerPod[pod] += weightedScore
loggerDebug.Info("Scorer result",
"scorer", scorer.TypedName().Type,
"pod", pod.GetPod().NamespacedName,
"rawScore", score,
"weight", scorer.Weight(),
"weightedScore", weightedScore,
"totalScore", weightedScorePerPod[pod])
}
loggerDebug.Info("After running scorer", "scorer", scorer.TypedName().Type)
}
loggerDebug.Info("After running scorer plugins")
loggerDebug.Info("After running scorer plugins", "finalScores", weightedScorePerPod)

return weightedScorePerPod
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/epp/scheduling/framework/scheduler_profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"github.com/google/uuid"
k8stypes "k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

func TestSchedulePlugins(t *testing.T) {
Expand Down Expand Up @@ -270,3 +272,33 @@ func findPods(pods []types.Pod, names ...k8stypes.NamespacedName) []types.Pod {
}
return res
}

func TestSchedulerProfileLogConfiguration(t *testing.T) {
// Create test plugins
tp1 := &testPlugin{TypeRes: "test-filter"}
tp2 := &testPlugin{TypeRes: "test-scorer"}
tp3 := &testPlugin{TypeRes: "test-picker"}
tp4 := &testPlugin{TypeRes: "test-postcycle"}

// Create a scheduler profile with various plugins
profile := NewSchedulerProfile().
WithFilters(tp1).
WithScorers(NewWeightedScorer(tp2, 10)).
WithPicker(tp3).
WithPostCyclePlugins(tp4)

// Test LogConfiguration doesn't panic and can be called
// We can't easily test the actual log output without complex setup,
// but we can ensure the method executes without errors
defer func() {
if r := recover(); r != nil {
t.Errorf("LogConfiguration panicked: %v", r)
}
}()

// Use a test logger context
ctx := logutil.NewTestLoggerIntoContext(context.Background())
logger := log.FromContext(ctx)

profile.LogConfiguration(logger, "test-profile")
}
53 changes: 48 additions & 5 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,38 @@ type Datastore interface {

// NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration.
func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler {
return &Scheduler{
scheduler := &Scheduler{
profileHandler: config.profileHandler,
profiles: config.profiles,
}

// Log scheduler configuration at startup
scheduler.LogConfiguration()

return scheduler
}

type Scheduler struct {
profileHandler framework.ProfileHandler
profiles map[string]*framework.SchedulerProfile
}

// LogConfiguration logs the overall scheduler configuration at startup.
func (s *Scheduler) LogConfiguration() {
logger := log.Log.WithName("scheduler").V(logutil.DEFAULT)

logger.Info("Scheduler initialized",
"profileHandler", s.profileHandler.TypedName().Type,
"numProfiles", len(s.profiles))

// Log available profiles
profileNames := make([]string, 0, len(s.profiles))
for name := range s.profiles {
profileNames = append(profileNames, name)
}
logger.Info("Available scheduler profiles", "profiles", profileNames)
}

// Schedule finds the target pod based on metrics and the requested lora adapter.
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) {
logger := log.FromContext(ctx).WithValues("request", request)
Expand All @@ -65,19 +86,41 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, can
before := time.Now()
profiles := s.profileHandler.Pick(ctx, cycleState, request, s.profiles, profileRunResults)
metrics.RecordSchedulerPluginProcessingLatency(framework.ProfilePickerType, s.profileHandler.TypedName().Type, time.Since(before))
loggerDebug.Info("After running profile handler Pick profiles", "plugin", s.profileHandler.TypedName().Type, "result", profiles)

// Log which profiles were selected
selectedProfileNames := make([]string, 0, len(profiles))
for name := range profiles {
selectedProfileNames = append(selectedProfileNames, name)
}
loggerDebug.Info("Profile handler selected profiles",
"plugin", s.profileHandler.TypedName().Type,
"selectedProfiles", selectedProfileNames,
"totalAvailableProfiles", len(s.profiles))

if len(profiles) == 0 { // profile picker didn't pick any profile to run
loggerDebug.Info("No profiles selected, ending scheduling cycle")
break
}

for name, profile := range profiles {
loggerDebug.Info("Running scheduler profile", "name", name)
loggerDebug.Info("Running scheduler profile", "name", name, "candidatePods", len(candidatePods))
// run the selected profiles and collect results (current code runs all profiles)
profileRunResult, err := profile.Run(ctx, request, cycleState, candidatePods)
if err != nil {
loggerDebug.Info("failed to run scheduler profile", "profile", name, "error", err.Error())
loggerDebug.Info("Failed to run scheduler profile", "profile", name, "error", err.Error())
} else {
loggerDebug.Info("After running scheduler profile succuessfully", "name", name)
// Log successful profile execution with results
var selectedPods []string
if profileRunResult != nil && profileRunResult.TargetPods != nil {
selectedPods = make([]string, 0, len(profileRunResult.TargetPods))
for _, pod := range profileRunResult.TargetPods {
selectedPods = append(selectedPods, pod.GetPod().NamespacedName.String())
}
}
loggerDebug.Info("Successfully ran scheduler profile",
"profile", name,
"selectedPods", selectedPods,
"numSelectedPods", len(selectedPods))
}

profileRunResults[name] = profileRunResult // if profile failed to run, the run result is nil
Expand Down