| 
 | 1 | +/*  | 
 | 2 | +Copyright 2025 The Kubernetes Authors.  | 
 | 3 | +
  | 
 | 4 | +Licensed under the Apache License, Version 2.0 (the "License");  | 
 | 5 | +you may not use this file except in compliance with the License.  | 
 | 6 | +You may obtain a copy of the License at  | 
 | 7 | +
  | 
 | 8 | +    http://www.apache.org/licenses/LICENSE-2.0  | 
 | 9 | +
  | 
 | 10 | +Unless required by applicable law or agreed to in writing, software  | 
 | 11 | +distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 13 | +See the License for the specific language governing permissions and  | 
 | 14 | +limitations under the License.  | 
 | 15 | +*/  | 
 | 16 | + | 
 | 17 | +package registry  | 
 | 18 | + | 
 | 19 | +import (  | 
 | 20 | +	"errors"  | 
 | 21 | +	"fmt"  | 
 | 22 | + | 
 | 23 | +	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"  | 
 | 24 | +	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"  | 
 | 25 | +	inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"  | 
 | 26 | +	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"  | 
 | 27 | +	intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"  | 
 | 28 | +	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"  | 
 | 29 | +	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"  | 
 | 30 | +	"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"  | 
 | 31 | +)  | 
 | 32 | + | 
 | 33 | +// Config holds the master configuration for the entire `FlowRegistry`. It serves as the top-level blueprint, defining  | 
 | 34 | +// global capacity limits and the structure of its priority bands.  | 
 | 35 | +//  | 
 | 36 | +// This master configuration is validated and defaulted once at startup. It is then partitioned and distributed to each  | 
 | 37 | +// internal `registryShard`, ensuring a consistent and predictable state across the system.  | 
 | 38 | +type Config struct {  | 
 | 39 | +	// MaxBytes defines an optional, global maximum total byte size limit aggregated across all priority bands and shards.  | 
 | 40 | +	// The `controller.FlowController` enforces this limit in addition to per-band capacity limits.  | 
 | 41 | +	//  | 
 | 42 | +	// Optional: Defaults to 0, which signifies that the global limit is ignored.  | 
 | 43 | +	MaxBytes uint64  | 
 | 44 | + | 
 | 45 | +	// PriorityBands defines the set of priority bands managed by the `FlowRegistry`. The configuration for each band,  | 
 | 46 | +	// including its default policies and queue types, is specified here.  | 
 | 47 | +	//  | 
 | 48 | +	// Required: At least one `PriorityBandConfig` must be provided for a functional registry.  | 
 | 49 | +	PriorityBands []PriorityBandConfig  | 
 | 50 | +}  | 
 | 51 | + | 
 | 52 | +// partition calculates and returns a new `Config` with capacity values partitioned for a specific shard.  | 
 | 53 | +// This method ensures that the total capacity is distributed as evenly as possible across all shards.  | 
 | 54 | +func (c *Config) partition(shardIndex, totalShards int) (*Config, error) {  | 
 | 55 | +	if totalShards <= 0 || shardIndex < 0 || shardIndex >= totalShards {  | 
 | 56 | +		return nil, fmt.Errorf("invalid shard partitioning arguments: shardIndex=%d, totalShards=%d",  | 
 | 57 | +			shardIndex, totalShards)  | 
 | 58 | +	}  | 
 | 59 | + | 
 | 60 | +	partitionValue := func(total uint64) uint64 {  | 
 | 61 | +		if total == 0 {  | 
 | 62 | +			return 0  | 
 | 63 | +		}  | 
 | 64 | +		base := total / uint64(totalShards)  | 
 | 65 | +		remainder := total % uint64(totalShards)  | 
 | 66 | +		if uint64(shardIndex) < remainder {  | 
 | 67 | +			return base + 1  | 
 | 68 | +		}  | 
 | 69 | +		return base  | 
 | 70 | +	}  | 
 | 71 | + | 
 | 72 | +	newCfg := &Config{  | 
 | 73 | +		MaxBytes:      partitionValue(c.MaxBytes),  | 
 | 74 | +		PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)),  | 
 | 75 | +	}  | 
 | 76 | + | 
 | 77 | +	for i, band := range c.PriorityBands {  | 
 | 78 | +		newBand := band                                  // Copy the original config  | 
 | 79 | +		newBand.MaxBytes = partitionValue(band.MaxBytes) // Overwrite with the partitioned value  | 
 | 80 | +		newCfg.PriorityBands[i] = newBand  | 
 | 81 | +	}  | 
 | 82 | + | 
 | 83 | +	return newCfg, nil  | 
 | 84 | +}  | 
 | 85 | + | 
 | 86 | +// validateAndApplyDefaults checks the configuration for validity and populates any empty fields with system defaults.  | 
 | 87 | +// This method should be called once by the registry before it initializes any shards.  | 
 | 88 | +func (c *Config) validateAndApplyDefaults() error {  | 
 | 89 | +	if len(c.PriorityBands) == 0 {  | 
 | 90 | +		return errors.New("config validation failed: at least one priority band must be defined")  | 
 | 91 | +	}  | 
 | 92 | + | 
 | 93 | +	priorities := make(map[uint]struct{}) // Keep track of seen priorities  | 
 | 94 | + | 
 | 95 | +	for i := range c.PriorityBands {  | 
 | 96 | +		band := &c.PriorityBands[i]  | 
 | 97 | +		if _, exists := priorities[band.Priority]; exists {  | 
 | 98 | +			return fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority)  | 
 | 99 | +		}  | 
 | 100 | +		priorities[band.Priority] = struct{}{}  | 
 | 101 | + | 
 | 102 | +		if band.PriorityName == "" {  | 
 | 103 | +			return errors.New("config validation failed: PriorityName is required for all priority bands")  | 
 | 104 | +		}  | 
 | 105 | +		if band.IntraFlowDispatchPolicy == "" {  | 
 | 106 | +			band.IntraFlowDispatchPolicy = fcfs.FCFSPolicyName  | 
 | 107 | +		}  | 
 | 108 | +		if band.InterFlowDispatchPolicy == "" {  | 
 | 109 | +			band.InterFlowDispatchPolicy = besthead.BestHeadPolicyName  | 
 | 110 | +		}  | 
 | 111 | +		if band.Queue == "" {  | 
 | 112 | +			band.Queue = listqueue.ListQueueName  | 
 | 113 | +		}  | 
 | 114 | + | 
 | 115 | +		// After defaulting, validate that the chosen plugins are compatible.  | 
 | 116 | +		if err := validateBandCompatibility(*band); err != nil {  | 
 | 117 | +			return err  | 
 | 118 | +		}  | 
 | 119 | +	}  | 
 | 120 | +	return nil  | 
 | 121 | +}  | 
 | 122 | + | 
 | 123 | +// validateBandCompatibility verifies that a band's default policy is compatible with its default queue type.  | 
 | 124 | +func validateBandCompatibility(band PriorityBandConfig) error {  | 
 | 125 | +	policy, err := intra.NewPolicyFromName(band.IntraFlowDispatchPolicy)  | 
 | 126 | +	if err != nil {  | 
 | 127 | +		return fmt.Errorf("failed to validate policy %q for priority band %d: %w",  | 
 | 128 | +			band.IntraFlowDispatchPolicy, band.Priority, err)  | 
 | 129 | +	}  | 
 | 130 | + | 
 | 131 | +	requiredCapabilities := policy.RequiredQueueCapabilities()  | 
 | 132 | +	if len(requiredCapabilities) == 0 {  | 
 | 133 | +		return nil // Policy has no specific requirements.  | 
 | 134 | +	}  | 
 | 135 | + | 
 | 136 | +	// Create a temporary queue instance to inspect its capabilities.  | 
 | 137 | +	tempQueue, err := queue.NewQueueFromName(band.Queue, nil)  | 
 | 138 | +	if err != nil {  | 
 | 139 | +		return fmt.Errorf("failed to inspect queue type %q for priority band %d: %w", band.Queue, band.Priority, err)  | 
 | 140 | +	}  | 
 | 141 | +	queueCapabilities := tempQueue.Capabilities()  | 
 | 142 | + | 
 | 143 | +	// Build a set of the queue's capabilities for efficient lookup.  | 
 | 144 | +	capabilitySet := make(map[framework.QueueCapability]struct{}, len(queueCapabilities))  | 
 | 145 | +	for _, cap := range queueCapabilities {  | 
 | 146 | +		capabilitySet[cap] = struct{}{}  | 
 | 147 | +	}  | 
 | 148 | + | 
 | 149 | +	// Check if all required capabilities are present.  | 
 | 150 | +	for _, req := range requiredCapabilities {  | 
 | 151 | +		if _, ok := capabilitySet[req]; !ok {  | 
 | 152 | +			return fmt.Errorf(  | 
 | 153 | +				"policy %q is not compatible with queue %q for priority band %d (%s): missing capability %q: %w",  | 
 | 154 | +				policy.Name(),  | 
 | 155 | +				tempQueue.Name(),  | 
 | 156 | +				band.Priority,  | 
 | 157 | +				band.PriorityName,  | 
 | 158 | +				req,  | 
 | 159 | +				contracts.ErrPolicyQueueIncompatible,  | 
 | 160 | +			)  | 
 | 161 | +		}  | 
 | 162 | +	}  | 
 | 163 | + | 
 | 164 | +	return nil  | 
 | 165 | +}  | 
 | 166 | + | 
 | 167 | +// PriorityBandConfig defines the configuration for a single priority band within the `FlowRegistry`. It establishes the  | 
 | 168 | +// default behaviors (such as queueing and dispatch policies) and capacity limits for all flows that operate at this  | 
 | 169 | +// priority level.  | 
 | 170 | +type PriorityBandConfig struct {  | 
 | 171 | +	// Priority is the numerical priority level for this band.  | 
 | 172 | +	// Convention: Lower numerical values indicate higher priority (e.g., 0 is highest).  | 
 | 173 | +	//  | 
 | 174 | +	// Required.  | 
 | 175 | +	Priority uint  | 
 | 176 | + | 
 | 177 | +	// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard", "Sheddable").  | 
 | 178 | +	//  | 
 | 179 | +	// Required.  | 
 | 180 | +	PriorityName string  | 
 | 181 | + | 
 | 182 | +	// IntraFlowDispatchPolicy specifies the default name of the registered policy used to select a specific request to  | 
 | 183 | +	// dispatch next from within a single flow's queue in this band. This default can be overridden on a per-flow basis.  | 
 | 184 | +	//  | 
 | 185 | +	// Optional: If empty, a system default (e.g., "FCFS") is used.  | 
 | 186 | +	IntraFlowDispatchPolicy intra.RegisteredPolicyName  | 
 | 187 | + | 
 | 188 | +	// InterFlowDispatchPolicy specifies the name of the registered policy used to select which flow's queue to service  | 
 | 189 | +	// next from this band.  | 
 | 190 | +	//  | 
 | 191 | +	// Optional: If empty, a system default (e.g., "BestHead") is used.  | 
 | 192 | +	InterFlowDispatchPolicy inter.RegisteredPolicyName  | 
 | 193 | + | 
 | 194 | +	// Queue specifies the default name of the registered SafeQueue implementation to be used for flow queues within this  | 
 | 195 | +	// band.  | 
 | 196 | +	//  | 
 | 197 | +	// Optional: If empty, a system default (e.g., "ListQueue") is used.  | 
 | 198 | +	Queue queue.RegisteredQueueName  | 
 | 199 | + | 
 | 200 | +	// MaxBytes defines the maximum total byte size for this specific priority band, aggregated across all shards.  | 
 | 201 | +	//  | 
 | 202 | +	// Optional: If not set, a system default (e.g., 1 GB) is applied.  | 
 | 203 | +	MaxBytes uint64  | 
 | 204 | +}  | 
0 commit comments