Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ flagd start [flags]
-H, --context-from-header stringToString add key-value pairs to map header values to context values, where key is Header name, value is context key (default [])
-X, --context-value stringToString add arbitrary key value pairs to the flag evaluation context (default [])
-C, --cors-origin strings CORS allowed origins, * will allow all origins
--enable-sync-context Enable or disable sync context. Defaults to false.
-h, --help help for start
-z, --log-format string Set the logging format, e.g. console or json (default "console")
-m, --management-port int32 Port for management operations (default 8014)
Expand Down
4 changes: 4 additions & 0 deletions flagd/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
syncPortFlagName = "sync-port"
syncSocketPathFlagName = "sync-socket-path"
uriFlagName = "uri"
enableSyncContext = "enable-sync-context"
contextValueFlagName = "context-value"
headerToContextKeyFlagName = "context-from-header"
streamDeadlineFlagName = "stream-deadline"
Expand Down Expand Up @@ -89,6 +90,7 @@ func init() {
flags.StringToStringP(headerToContextKeyFlagName, "H", map[string]string{}, "add key-value pairs to map "+
"header values to context values, where key is Header name, value is context key")
flags.Duration(streamDeadlineFlagName, 0, "Set a server-side deadline for flagd sync and event streams (default 0, means no deadline).")
flags.Bool(enableSyncContext, false, "Enable or disable sync context. Defaults to false.")

bindFlags(flags)
}
Expand All @@ -114,6 +116,7 @@ func bindFlags(flags *pflag.FlagSet) {
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
_ = viper.BindPFlag(headerToContextKeyFlagName, flags.Lookup(headerToContextKeyFlagName))
_ = viper.BindPFlag(streamDeadlineFlagName, flags.Lookup(streamDeadlineFlagName))
_ = viper.BindPFlag(enableSyncContext, flags.Lookup(enableSyncContext))
}

// startCmd represents the start command
Expand Down Expand Up @@ -186,6 +189,7 @@ var startCmd = &cobra.Command{
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
StreamDeadline: viper.GetDuration(streamDeadlineFlagName),
EnableSyncContext: viper.GetBool(enableSyncContext),
SyncProviders: syncProviders,
ContextValues: contextValuesToMap,
HeaderToContextKeyMappings: headerToContextKeyMappings,
Expand Down
20 changes: 11 additions & 9 deletions flagd/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Config struct {
SyncServicePort uint16
SyncServiceSocketPath string
StreamDeadline time.Duration
EnableSyncContext bool

SyncProviders []sync.SourceConfig
CORS []string
Expand Down Expand Up @@ -116,15 +117,16 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,

// flag sync service
flagSyncService, err := flagsync.NewSyncService(flagsync.SvcConfigurations{
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
Logger: logger.WithFields(zap.String("component", "FlagSyncService")),
Port: config.SyncServicePort,
Sources: sources,
Store: s,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
StreamDeadline: config.StreamDeadline,
EnableSyncContext: config.EnableSyncContext,
})
if err != nil {
return nil, fmt.Errorf("error creating sync service: %w", err)
Expand Down
47 changes: 26 additions & 21 deletions flagd/pkg/service/flag-sync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"errors"
"fmt"
"maps"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"maps"
"time"

"buf.build/gen/go/open-feature/flagd/grpc/go/flagd/sync/v1/syncv1grpc"
Expand All @@ -17,10 +17,11 @@ import (

// syncHandler implements the sync contract
type syncHandler struct {
mux *Multiplexer
log *logger.Logger
contextValues map[string]any
deadline time.Duration
mux *Multiplexer
log *logger.Logger
contextValues map[string]any
deadline time.Duration
enableSyncContext bool
}

func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.FlagSyncService_SyncFlagsServer) error {
Expand All @@ -44,24 +45,25 @@ func (s syncHandler) SyncFlags(req *syncv1.SyncFlagsRequest, server syncv1grpc.F
for {
select {
case payload := <-muxPayload:

metadataSrc := make(map[string]any)
maps.Copy(metadataSrc, s.contextValues)

if sources := s.mux.SourcesAsMetadata(); sources != "" {
metadataSrc["sources"] = sources
response := &syncv1.SyncFlagsResponse{FlagConfiguration: payload.flags}

if s.enableSyncContext {
metadataSrc := make(map[string]any)
maps.Copy(metadataSrc, s.contextValues)

if sources := s.mux.SourcesAsMetadata(); sources != "" {
metadataSrc["sources"] = sources
}

metadata, err := structpb.NewStruct(metadataSrc)
if err != nil {
s.log.Error(fmt.Sprintf("error from struct creation: %v", err))
return fmt.Errorf("error constructing metadata response")
}
response.SyncContext = metadata
}

metadata, err := structpb.NewStruct(metadataSrc)
if err != nil {
s.log.Error(fmt.Sprintf("error from struct creation: %v", err))
return fmt.Errorf("error constructing metadata response")
}

err = server.Send(&syncv1.SyncFlagsResponse{
FlagConfiguration: payload.flags,
SyncContext: metadata,
})
err = server.Send(response)
if err != nil {
s.log.Debug(fmt.Sprintf("error sending stream response: %v", err))
return fmt.Errorf("error sending stream response: %w", err)
Expand Down Expand Up @@ -97,6 +99,9 @@ func (s syncHandler) FetchAllFlags(_ context.Context, req *syncv1.FetchAllFlagsR
func (s syncHandler) GetMetadata(_ context.Context, _ *syncv1.GetMetadataRequest) (
*syncv1.GetMetadataResponse, error,
) {
if s.enableSyncContext {
return nil, fmt.Errorf("metadata endpoint disabled")
}
metadataSrc := make(map[string]any)
for k, v := range s.contextValues {
metadataSrc[k] = v
Expand Down
100 changes: 53 additions & 47 deletions flagd/pkg/service/flag-sync/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,54 +54,60 @@ func TestSyncHandler_SyncFlags(t *testing.T) {
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore := store.NewFlags()
mp, err := NewMux(flagStore, tt.sources)
require.NoError(t, err)

handler := syncHandler{
mux: mp,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
}

// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)

// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}

go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()

select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)

syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)

// Check the two metadatas are equal
for _, enableSyncContext := range []bool{true, false} {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Shared handler for testing both GetMetadata & SyncFlags methods
flagStore := store.NewFlags()
mp, err := NewMux(flagStore, tt.sources)
require.NoError(t, err)

handler := syncHandler{
mux: mp,
contextValues: tt.contextValues,
log: logger.NewLogger(nil, false),
enableSyncContext: enableSyncContext,
}

// Test getting metadata from `GetMetadata` (deprecated)
// remove when `GetMetadata` is full removed and deprecated
assert.Equal(t, respMetadata, syncMetadata)
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}

})
metaResp, err := handler.GetMetadata(context.Background(), &syncv1.GetMetadataRequest{})
if !enableSyncContext {
require.NoError(t, err)
respMetadata := metaResp.GetMetadata().AsMap()
assert.Equal(t, tt.wantMetadata, respMetadata)
} else {
assert.NotNil(t, err)
}

// Test metadata from sync_context
stream := &mockSyncFlagsServer{
ctx: context.Background(),
mu: sync.Mutex{},
respReady: make(chan struct{}, 1),
}

go func() {
err := handler.SyncFlags(&syncv1.SyncFlagsRequest{}, stream)
assert.NoError(t, err)
}()

select {
case <-stream.respReady:
syncResp := stream.GetLastResponse()
assert.NotNil(t, syncResp)

if enableSyncContext {
syncMetadata := syncResp.GetSyncContext().AsMap()
assert.Equal(t, tt.wantMetadata, syncMetadata)
} else {
assert.Nil(t, syncResp.GetSyncContext())
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for response")
}
})
}
}
}

Expand Down
28 changes: 15 additions & 13 deletions flagd/pkg/service/flag-sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ type ISyncService interface {
}

type SvcConfigurations struct {
Logger *logger.Logger
Port uint16
Sources []string
Store *store.State
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
Logger *logger.Logger
Port uint16
Sources []string
Store *store.State
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
StreamDeadline time.Duration
EnableSyncContext bool
}

type Service struct {
Expand Down Expand Up @@ -82,10 +83,11 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
}

syncv1grpc.RegisterFlagSyncServiceServer(server, &syncHandler{
mux: mux,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
mux: mux,
log: l,
contextValues: cfg.ContextValues,
deadline: cfg.StreamDeadline,
enableSyncContext: cfg.EnableSyncContext,
})

var lis net.Listener
Expand Down
Loading
Loading