Skip to content

feat(flowcontrol): Implement registry shard #1187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/epp/flowcontrol/contracts/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package contracts

import "errors"

// Registry Errors
var (
// ErrFlowInstanceNotFound indicates that a requested flow instance (a `ManagedQueue`) does not exist in the registry
// shard, either because the flow is not registered or the specific instance (e.g., a draining queue at a particular
// priority) is not present.
ErrFlowInstanceNotFound = errors.New("flow instance not found")

// ErrPriorityBandNotFound indicates that a requested priority band does not exist in the registry because it was not
// part of the initial configuration.
ErrPriorityBandNotFound = errors.New("priority band not found")

// ErrPolicyQueueIncompatible indicates that a selected policy is not compatible with the capabilities of the queue it
// is intended to operate on. For example, a policy requiring priority-based peeking is used with a simple FIFO queue.
ErrPolicyQueueIncompatible = errors.New("policy is not compatible with queue capabilities")
)
124 changes: 119 additions & 5 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,78 @@ limitations under the License.
// primary dependencies. In alignment with a "Ports and Adapters" (or "Hexagonal") architectural style, these
// interfaces represent the "ports" through which the engine communicates.
//
// This package contains the primary service contracts for the Flow Registry and Saturation Detector.
// This package contains the primary service contracts for the Flow Registry, which acts as the control plane for all
// flow state and configuration.
package contracts

import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
)

// RegistryShard defines the read-oriented interface that a `controller.FlowController` worker uses to access its
// specific slice (shard) of the `FlowRegistry`'s state. It provides the necessary methods for a worker to perform its
// dispatch operations by accessing queues and policies in a concurrent-safe manner.
//
// # Conformance
//
// All methods MUST be goroutine-safe.
type RegistryShard interface {
// ID returns a unique identifier for this shard, which must remain stable for the shard's lifetime.
ID() string

// IsActive returns true if the shard should accept new requests for enqueueing. A false value indicates the shard is
// being gracefully drained and should not be given new work.
IsActive() bool

// ActiveManagedQueue returns the currently active `ManagedQueue` for a given flow on this shard. This is the queue to
// which new requests for the flow should be enqueued.
// Returns an error wrapping `ErrFlowInstanceNotFound` if no active instance exists for the given `flowID`.
ActiveManagedQueue(flowID string) (ManagedQueue, error)

// ManagedQueue retrieves a specific (potentially draining) `ManagedQueue` instance from this shard. This allows a
// worker to continue dispatching items from queues that are draining as part of a flow update.
// Returns an error wrapping `ErrFlowInstanceNotFound` if no instance for the given flowID and priority exists.
ManagedQueue(flowID string, priority uint) (ManagedQueue, error)

// IntraFlowDispatchPolicy retrieves a flow's configured `framework.IntraFlowDispatchPolicy` for this shard.
// The registry guarantees that a non-nil default policy (as configured at the priority-band level) is returned if
// none is specified on the flow itself.
// Returns an error wrapping `ErrFlowInstanceNotFound` if the flow instance does not exist.
IntraFlowDispatchPolicy(flowID string, priority uint) (framework.IntraFlowDispatchPolicy, error)

// InterFlowDispatchPolicy retrieves a priority band's configured `framework.InterFlowDispatchPolicy` for this shard.
// The registry guarantees that a non-nil default policy is returned if none is configured for the band.
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
InterFlowDispatchPolicy(priority uint) (framework.InterFlowDispatchPolicy, error)

// PriorityBandAccessor retrieves a read-only accessor for a given priority level, providing a view of the band's
// state as seen by this specific shard. This is the primary entry point for inter-flow dispatch policies that
// need to inspect and compare multiple flow queues within the same priority band.
// Returns an error wrapping `ErrPriorityBandNotFound` if the priority level is not configured.
PriorityBandAccessor(priority uint) (framework.PriorityBandAccessor, error)

// AllOrderedPriorityLevels returns all configured priority levels that this shard is aware of, sorted in ascending
// numerical order. This order corresponds to highest priority (lowest numeric value) to lowest priority (highest
// numeric value).
// The returned slice provides a definitive, ordered list of priority levels for iteration, for example, by a
// `controller.FlowController` worker's dispatch loop.
AllOrderedPriorityLevels() []uint

// Stats returns a snapshot of the statistics for this specific shard.
Stats() ShardStats
}

// ManagedQueue defines the interface for a flow's queue instance on a specific shard.
// It wraps an underlying `framework.SafeQueue`, augmenting it with lifecycle validation against the `FlowRegistry` and
// integrating atomic statistics updates.
//
// Conformance:
// # Conformance
//
// - All methods (including those embedded from `framework.SafeQueue`) MUST be goroutine-safe.
// - Mutating methods (`Add()`, `Remove()`, `CleanupExpired()`, `Drain()`) MUST ensure the flow instance still exists
// and is valid within the `FlowRegistry` before proceeding. They MUST also atomically update relevant statistics
// (e.g., queue length, byte size) at both the queue and priority-band levels.
// - The `Add()` method MUST reject new items if the queue has been marked as "draining" by the `FlowRegistry`,
// ensuring that lifecycle changes are respected even by consumers holding a stale pointer to the queue.
// - All mutating methods (`Add()`, `Remove()`, `Cleanup()`, `Drain()`) MUST atomically update relevant statistics
// (e.g., queue length, byte size).
type ManagedQueue interface {
framework.SafeQueue

Expand All @@ -43,3 +99,61 @@ type ManagedQueue interface {
// Conformance: This method MUST NOT return nil.
FlowQueueAccessor() framework.FlowQueueAccessor
}

// ShardStats holds statistics for a single internal shard within the `FlowRegistry`.
type ShardStats struct {
// TotalCapacityBytes is the optional, maximum total byte size limit aggregated across all priority bands within this
// shard. Its value represents the globally configured limit for the `FlowRegistry` partitioned for this shard.
// The `controller.FlowController` enforces this limit in addition to any per-band capacity limits.
// A value of 0 signifies that this global limit is ignored, and only per-band limits apply.
TotalCapacityBytes uint64
// TotalByteSize is the total byte size of all items currently queued across all priority bands within this shard.
TotalByteSize uint64
// TotalLen is the total number of items currently queued across all priority bands within this shard.
TotalLen uint64
// PerPriorityBandStats maps each configured priority level to its statistics within this shard.
// The key is the numerical priority level.
// All configured priority levels are guaranteed to be represented.
PerPriorityBandStats map[uint]PriorityBandStats
}

// DeepCopy returns a deep copy of the `ShardStats`.
func (s *ShardStats) DeepCopy() ShardStats {
if s == nil {
return ShardStats{}
}
newStats := *s
if s.PerPriorityBandStats != nil {
newStats.PerPriorityBandStats = make(map[uint]PriorityBandStats, len(s.PerPriorityBandStats))
for k, v := range s.PerPriorityBandStats {
newStats.PerPriorityBandStats[k] = v.DeepCopy()
}
}
return newStats
}

// PriorityBandStats holds aggregated statistics for a single priority band.
type PriorityBandStats struct {
// Priority is the numerical priority level this struct describes.
Priority uint
// PriorityName is an optional, human-readable name for the priority level (e.g., "Critical", "Sheddable").
PriorityName string
// CapacityBytes is the configured maximum total byte size for this priority band, aggregated across all items in
// all flow queues within this band. If scoped to a shard, its value represents the configured band limit for the
// `FlowRegistry` partitioned for this shard.
// The `controller.FlowController` enforces this limit.
// A default non-zero value is guaranteed if not configured.
CapacityBytes uint64
// ByteSize is the total byte size of items currently queued in this priority band.
ByteSize uint64
// Len is the total number of items currently queued in this priority band.
Len uint64
}

// DeepCopy returns a deep copy of the `PriorityBandStats`.
func (s *PriorityBandStats) DeepCopy() PriorityBandStats {
if s == nil {
return PriorityBandStats{}
}
return *s
}
204 changes: 204 additions & 0 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package registry

import (
"errors"
"fmt"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue/listqueue"
)

// Config holds the master configuration for the entire `FlowRegistry`. It serves as the top-level blueprint, defining
// global capacity limits and the structure of its priority bands.
//
// This master configuration is validated and defaulted once at startup. It is then partitioned and distributed to each
// internal `registryShard`, ensuring a consistent and predictable state across the system.
type Config struct {
// MaxBytes defines an optional, global maximum total byte size limit aggregated across all priority bands and shards.
// The `controller.FlowController` enforces this limit in addition to per-band capacity limits.
//
// Optional: Defaults to 0, which signifies that the global limit is ignored.
MaxBytes uint64

// PriorityBands defines the set of priority bands managed by the `FlowRegistry`. The configuration for each band,
// including its default policies and queue types, is specified here.
//
// Required: At least one `PriorityBandConfig` must be provided for a functional registry.
PriorityBands []PriorityBandConfig
}

// partition calculates and returns a new `Config` with capacity values partitioned for a specific shard.
// This method ensures that the total capacity is distributed as evenly as possible across all shards.
func (c *Config) partition(shardIndex, totalShards int) (*Config, error) {
if totalShards <= 0 || shardIndex < 0 || shardIndex >= totalShards {
return nil, fmt.Errorf("invalid shard partitioning arguments: shardIndex=%d, totalShards=%d",
shardIndex, totalShards)
}

partitionValue := func(total uint64) uint64 {
if total == 0 {
return 0
}
base := total / uint64(totalShards)
remainder := total % uint64(totalShards)
if uint64(shardIndex) < remainder {
return base + 1
}
return base
}

newCfg := &Config{
MaxBytes: partitionValue(c.MaxBytes),
PriorityBands: make([]PriorityBandConfig, len(c.PriorityBands)),
}

for i, band := range c.PriorityBands {
newBand := band // Copy the original config
newBand.MaxBytes = partitionValue(band.MaxBytes) // Overwrite with the partitioned value
newCfg.PriorityBands[i] = newBand
}

return newCfg, nil
}

// validateAndApplyDefaults checks the configuration for validity and populates any empty fields with system defaults.
// This method should be called once by the registry before it initializes any shards.
func (c *Config) validateAndApplyDefaults() error {
if len(c.PriorityBands) == 0 {
return errors.New("config validation failed: at least one priority band must be defined")
}

priorities := make(map[uint]struct{}) // Keep track of seen priorities

for i := range c.PriorityBands {
band := &c.PriorityBands[i]
if _, exists := priorities[band.Priority]; exists {
return fmt.Errorf("config validation failed: duplicate priority level %d found", band.Priority)
}
priorities[band.Priority] = struct{}{}

if band.PriorityName == "" {
return errors.New("config validation failed: PriorityName is required for all priority bands")
}
if band.IntraFlowDispatchPolicy == "" {
band.IntraFlowDispatchPolicy = fcfs.FCFSPolicyName
}
if band.InterFlowDispatchPolicy == "" {
band.InterFlowDispatchPolicy = besthead.BestHeadPolicyName
}
if band.Queue == "" {
band.Queue = listqueue.ListQueueName
}

// After defaulting, validate that the chosen plugins are compatible.
if err := validateBandCompatibility(*band); err != nil {
return err
}
}
return nil
}

// validateBandCompatibility verifies that a band's default policy is compatible with its default queue type.
func validateBandCompatibility(band PriorityBandConfig) error {
policy, err := intra.NewPolicyFromName(band.IntraFlowDispatchPolicy)
if err != nil {
return fmt.Errorf("failed to validate policy %q for priority band %d: %w",
band.IntraFlowDispatchPolicy, band.Priority, err)
}

requiredCapabilities := policy.RequiredQueueCapabilities()
if len(requiredCapabilities) == 0 {
return nil // Policy has no specific requirements.
}

// Create a temporary queue instance to inspect its capabilities.
tempQueue, err := queue.NewQueueFromName(band.Queue, nil)
if err != nil {
return fmt.Errorf("failed to inspect queue type %q for priority band %d: %w", band.Queue, band.Priority, err)
}
queueCapabilities := tempQueue.Capabilities()

// Build a set of the queue's capabilities for efficient lookup.
capabilitySet := make(map[framework.QueueCapability]struct{}, len(queueCapabilities))
for _, cap := range queueCapabilities {
capabilitySet[cap] = struct{}{}
}

// Check if all required capabilities are present.
for _, req := range requiredCapabilities {
if _, ok := capabilitySet[req]; !ok {
return fmt.Errorf(
"policy %q is not compatible with queue %q for priority band %d (%s): missing capability %q: %w",
policy.Name(),
tempQueue.Name(),
band.Priority,
band.PriorityName,
req,
contracts.ErrPolicyQueueIncompatible,
)
}
}

return nil
}

// PriorityBandConfig defines the configuration for a single priority band within the `FlowRegistry`. It establishes the
// default behaviors (such as queueing and dispatch policies) and capacity limits for all flows that operate at this
// priority level.
type PriorityBandConfig struct {
// Priority is the numerical priority level for this band.
// Convention: Lower numerical values indicate higher priority (e.g., 0 is highest).
//
// Required.
Priority uint

// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard", "Sheddable").
//
// Required.
PriorityName string

// IntraFlowDispatchPolicy specifies the default name of the registered policy used to select a specific request to
// dispatch next from within a single flow's queue in this band. This default can be overridden on a per-flow basis.
//
// Optional: If empty, a system default (e.g., "FCFS") is used.
IntraFlowDispatchPolicy intra.RegisteredPolicyName

// InterFlowDispatchPolicy specifies the name of the registered policy used to select which flow's queue to service
// next from this band.
//
// Optional: If empty, a system default (e.g., "BestHead") is used.
InterFlowDispatchPolicy inter.RegisteredPolicyName

// Queue specifies the default name of the registered SafeQueue implementation to be used for flow queues within this
// band.
//
// Optional: If empty, a system default (e.g., "ListQueue") is used.
Queue queue.RegisteredQueueName

// MaxBytes defines the maximum total byte size for this specific priority band, aggregated across all shards.
//
// Optional: If not set, a system default (e.g., 1 GB) is applied.
MaxBytes uint64
}
Loading