Skip to content

Commit cb82ba6

Browse files
committed
review feedback
1 parent 30c8fba commit cb82ba6

File tree

4 files changed

+65
-61
lines changed

4 files changed

+65
-61
lines changed

core/capabilities/launcher.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability
594594
var (
595595
// TODO: make this configurable
596596
defaultTargetRequestTimeout = 8 * time.Minute
597-
defaultMaxParallelCapabilityExecuteRequests = 1000
597+
defaultMaxParallelCapabilityExecuteRequests = uint32(1000)
598598
)
599599

600600
// serveCapabilities exposes capabilities that are available on this node, as part of the given DON.
@@ -683,7 +683,11 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
683683
w.cachedShims.executableServers[shimKey] = server
684684
}
685685

686-
remoteConfig := &capabilities.RemoteExecutableConfig{}
686+
remoteConfig := &capabilities.RemoteExecutableConfig{
687+
// deprecated defaults - v2 reads these from onchain config
688+
RequestTimeout: defaultTargetRequestTimeout,
689+
ServerMaxParallelRequests: defaultMaxParallelCapabilityExecuteRequests,
690+
}
687691
if capabilityConfig.RemoteTargetConfig != nil {
688692
remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes
689693
}
@@ -693,12 +697,10 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
693697
info,
694698
don.DON,
695699
idsToDONs,
696-
defaultTargetRequestTimeout,
697-
defaultMaxParallelCapabilityExecuteRequests,
698700
nil,
699701
)
700702
if errCfg != nil {
701-
return nil, fmt.Errorf("failed to set server config: %w", err)
703+
return nil, fmt.Errorf("failed to set server config: %w", errCfg)
702704
}
703705

704706
return server, nil
@@ -729,7 +731,11 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
729731
w.cachedShims.executableServers[shimKey] = server
730732
}
731733

732-
remoteConfig := &capabilities.RemoteExecutableConfig{}
734+
remoteConfig := &capabilities.RemoteExecutableConfig{
735+
// deprecated defaults - v2 reads these from onchain config
736+
RequestTimeout: defaultTargetRequestTimeout,
737+
ServerMaxParallelRequests: defaultMaxParallelCapabilityExecuteRequests,
738+
}
733739
if capabilityConfig.RemoteTargetConfig != nil {
734740
remoteConfig.RequestHashExcludedAttributes = capabilityConfig.RemoteTargetConfig.RequestHashExcludedAttributes
735741
}
@@ -739,12 +745,10 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
739745
info,
740746
don.DON,
741747
idsToDONs,
742-
defaultTargetRequestTimeout,
743-
defaultMaxParallelCapabilityExecuteRequests,
744748
nil,
745749
)
746750
if errCfg != nil {
747-
return nil, fmt.Errorf("failed to set server config: %w", err)
751+
return nil, fmt.Errorf("failed to set server config: %w", errCfg)
748752
}
749753

750754
return server, nil
@@ -991,8 +995,6 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC
991995
info,
992996
myDON.DON,
993997
idsToDONs,
994-
config.RemoteExecutableConfig.RequestTimeout,
995-
int(config.RemoteExecutableConfig.ServerMaxParallelRequests),
996998
requestHasher,
997999
)
9981000
if err != nil {

core/capabilities/remote/executable/endtoend_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,12 @@ func testRemoteExecutableCapability(ctx context.Context, t *testing.T, underlyin
294294
capabilityPeer := capabilityPeers[i]
295295
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer)
296296
capabilityNode := executable.NewServer(capInfo.ID, "", capabilityPeer, capabilityDispatcher, lggr)
297-
require.NoError(t, capabilityNode.SetConfig(&commoncap.RemoteExecutableConfig{RequestHashExcludedAttributes: []string{}}, underlying, capInfo, capDonInfo, workflowDONs,
298-
capabilityNodeResponseTimeout, 10, nil))
297+
cfg := &commoncap.RemoteExecutableConfig{
298+
RequestHashExcludedAttributes: []string{},
299+
RequestTimeout: capabilityNodeResponseTimeout,
300+
ServerMaxParallelRequests: 10,
301+
}
302+
require.NoError(t, capabilityNode.SetConfig(cfg, underlying, capInfo, capDonInfo, workflowDONs, nil))
299303
servicetest.Run(t, capabilityNode)
300304
broker.RegisterReceiverNode(capabilityPeer, capabilityNode)
301305
capabilityNodes[i] = capabilityNode

core/capabilities/remote/executable/server.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,14 @@ type dynamicServerConfig struct {
5454
capInfo commoncap.CapabilityInfo
5555
localDonInfo commoncap.DON
5656
workflowDONs map[uint32]commoncap.DON
57-
requestTimeout time.Duration
58-
maxParallelRequests int
5957
}
6058

6159
type Server interface {
6260
types.Receiver
6361
services.Service
6462
SetConfig(remoteExecutableConfig *commoncap.RemoteExecutableConfig, underlying commoncap.ExecutableCapability,
6563
capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON,
66-
requestTimeout time.Duration, maxParallelRequests int, messageHasher types.MessageHasher) error
64+
messageHasher types.MessageHasher) error
6765
}
6866

6967
var _ Server = &server{}
@@ -90,13 +88,14 @@ func NewServer(capabilityID, methodName string, peerID p2ptypes.PeerID, dispatch
9088

9189
// SetConfig sets the remote server configuration dynamically
9290
func (r *server) SetConfig(remoteExecutableConfig *commoncap.RemoteExecutableConfig, underlying commoncap.ExecutableCapability,
93-
capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON,
94-
requestTimeout time.Duration, maxParallelRequests int, messageHasher types.MessageHasher) error {
91+
capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, messageHasher types.MessageHasher) error {
92+
currCfg := r.cfg.Load()
9593
if remoteExecutableConfig == nil {
9694
r.lggr.Info("no remote config provided, using default values")
9795
remoteExecutableConfig = &commoncap.RemoteExecutableConfig{}
9896
}
9997
if messageHasher == nil {
98+
r.lggr.Warn("no message hasher provided, using default V1 hasher")
10099
messageHasher = NewV1Hasher(remoteExecutableConfig.RequestHashExcludedAttributes)
101100
}
102101
if capInfo.ID == "" || capInfo.ID != r.capabilityID {
@@ -108,12 +107,17 @@ func (r *server) SetConfig(remoteExecutableConfig *commoncap.RemoteExecutableCon
108107
if len(localDonInfo.Members) == 0 {
109108
return errors.New("empty localDonInfo provided")
110109
}
111-
if requestTimeout <= 0 {
112-
return errors.New("requestTimeout must be positive")
110+
if remoteExecutableConfig.RequestTimeout <= 0 {
111+
return errors.New("cfg.RequestTimeout must be positive")
113112
}
114-
// Use a sensible default if maxParallelRequests is not set or is 0
115-
if maxParallelRequests <= 0 {
116-
maxParallelRequests = 10 // default value
113+
if remoteExecutableConfig.ServerMaxParallelRequests <= 0 {
114+
return errors.New("cfg.ServerMaxParallelRequests must be positive")
115+
}
116+
117+
if currCfg != nil && currCfg.remoteExecutableConfig != nil &&
118+
currCfg.remoteExecutableConfig.ServerMaxParallelRequests > 0 &&
119+
remoteExecutableConfig.ServerMaxParallelRequests != currCfg.remoteExecutableConfig.ServerMaxParallelRequests {
120+
r.lggr.Warn("ServerMaxParallelRequests changed but it won't be applied until node restart")
117121
}
118122

119123
// always replace the whole dynamicServerConfig object to avoid inconsistent state
@@ -124,8 +128,6 @@ func (r *server) SetConfig(remoteExecutableConfig *commoncap.RemoteExecutableCon
124128
capInfo: capInfo,
125129
localDonInfo: localDonInfo,
126130
workflowDONs: workflowDONs,
127-
requestTimeout: requestTimeout,
128-
maxParallelRequests: maxParallelRequests,
129131
})
130132
return nil
131133
}
@@ -138,6 +140,9 @@ func (r *server) Start(ctx context.Context) error {
138140
if cfg == nil {
139141
return errors.New("config not set - call SetConfig() before Start()")
140142
}
143+
if cfg.remoteExecutableConfig == nil {
144+
return errors.New("remote executable config not set - call SetConfig() before Start()")
145+
}
141146
if cfg.underlying == nil {
142147
return errors.New("underlying capability not set - call SetConfig() before Start()")
143148
}
@@ -147,19 +152,18 @@ func (r *server) Start(ctx context.Context) error {
147152
if len(cfg.localDonInfo.Members) == 0 {
148153
return errors.New("local DON info not set - call SetConfig() before Start()")
149154
}
150-
if cfg.requestTimeout <= 0 {
151-
return errors.New("request timeout not set - call SetConfig() before Start()")
155+
if cfg.remoteExecutableConfig.RequestTimeout <= 0 {
156+
return errors.New("cfg.RequestTimeout not set - call SetConfig() before Start()")
152157
}
153-
// maxParallelRequests should always be positive after SetConfig, but just in case
154-
if cfg.maxParallelRequests <= 0 {
155-
return errors.New("maxParallelRequests not set properly - call SetConfig() before Start()")
158+
if cfg.remoteExecutableConfig.ServerMaxParallelRequests <= 0 {
159+
return errors.New("cfg.ServerMaxParallelRequests not set - call SetConfig() before Start()")
156160
}
157161
if r.dispatcher == nil {
158162
return errors.New("dispatcher set to nil, cannot start server")
159163
}
160164

161165
// Initialize parallel executor with the configured max parallel requests
162-
r.parallelExecutor = newParallelExecutor(cfg.maxParallelRequests)
166+
r.parallelExecutor = newParallelExecutor(int(cfg.remoteExecutableConfig.ServerMaxParallelRequests))
163167

164168
r.wg.Add(1)
165169
go func() {
@@ -188,8 +192,8 @@ func (r *server) Start(ctx context.Context) error {
188192
}
189193

190194
func getServerTickerInterval(cfg *dynamicServerConfig) time.Duration {
191-
if cfg != nil && cfg.requestTimeout > 0 {
192-
return cfg.requestTimeout
195+
if cfg.remoteExecutableConfig.RequestTimeout > 0 {
196+
return cfg.remoteExecutableConfig.RequestTimeout
193197
}
194198
return defaultExpiryCheckInterval
195199
}
@@ -284,7 +288,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
284288
}
285289

286290
sr, ierr := request.NewServerRequest(cfg.underlying, msg.Method, cfg.capInfo.ID, cfg.localDonInfo.ID, r.peerID,
287-
callingDon, messageID, r.dispatcher, cfg.requestTimeout, r.capMethodName, r.lggr)
291+
callingDon, messageID, r.dispatcher, cfg.remoteExecutableConfig.RequestTimeout, r.capMethodName, r.lggr)
288292
if ierr != nil {
289293
r.lggr.Errorw("failed to instantiate server request", "err", ierr)
290294
return

core/capabilities/remote/executable/server_test.go

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,9 @@ func testRemoteExecutableCapabilityServer(ctx context.Context, t *testing.T,
362362
capabilityPeer := capabilityPeers[i]
363363
capabilityDispatcher := broker.NewDispatcherForNode(capabilityPeer)
364364
capabilityNode := executable.NewServer(capInfo.ID, "", capabilityPeer, capabilityDispatcher, lggr)
365-
require.NoError(t, capabilityNode.SetConfig(config, underlying, capInfo, capDonInfo, workflowDONs,
366-
capabilityNodeResponseTimeout, 10, messageHasher))
365+
config.RequestTimeout = capabilityNodeResponseTimeout
366+
config.ServerMaxParallelRequests = 10
367+
require.NoError(t, capabilityNode.SetConfig(config, underlying, capInfo, capDonInfo, workflowDONs, messageHasher))
367368
require.NoError(t, capabilityNode.Start(ctx))
368369
broker.RegisterReceiverNode(capabilityPeer, capabilityNode)
369370
capabilityNodes[i] = capabilityNode
@@ -472,28 +473,16 @@ func Test_Server_SetConfig(t *testing.T) {
472473

473474
underlying := &TestCapability{}
474475
requestTimeout := 10 * time.Second
475-
maxParallelRequests := 5
476+
maxParallelRequests := uint32(5)
476477

477478
t.Run("valid config should succeed", func(t *testing.T) {
478479
config := &commoncap.RemoteExecutableConfig{
479480
RequestHashExcludedAttributes: []string{"test"},
481+
RequestTimeout: requestTimeout,
482+
ServerMaxParallelRequests: maxParallelRequests,
480483
}
481484

482-
err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs,
483-
requestTimeout, maxParallelRequests, nil)
484-
require.NoError(t, err)
485-
})
486-
487-
t.Run("nil config should use default", func(t *testing.T) {
488-
err := server.SetConfig(nil, underlying, capInfo, localDonInfo, workflowDONs,
489-
requestTimeout, maxParallelRequests, nil)
490-
require.NoError(t, err)
491-
})
492-
493-
t.Run("nil hasher should create default V1 hasher", func(t *testing.T) {
494-
config := &commoncap.RemoteExecutableConfig{}
495-
err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs,
496-
requestTimeout, maxParallelRequests, nil)
485+
err := server.SetConfig(config, underlying, capInfo, localDonInfo, workflowDONs, nil)
497486
require.NoError(t, err)
498487
})
499488

@@ -504,14 +493,14 @@ func Test_Server_SetConfig(t *testing.T) {
504493
}
505494

506495
err := server.SetConfig(&commoncap.RemoteExecutableConfig{}, underlying, invalidCapInfo,
507-
localDonInfo, workflowDONs, requestTimeout, maxParallelRequests, nil)
496+
localDonInfo, workflowDONs, nil)
508497
require.Error(t, err)
509498
require.Contains(t, err.Error(), "capability info provided does not match")
510499
})
511500

512501
t.Run("nil underlying capability should return error", func(t *testing.T) {
513502
err := server.SetConfig(&commoncap.RemoteExecutableConfig{}, nil, capInfo,
514-
localDonInfo, workflowDONs, requestTimeout, maxParallelRequests, nil)
503+
localDonInfo, workflowDONs, nil)
515504
require.Error(t, err)
516505
require.Contains(t, err.Error(), "underlying capability cannot be nil")
517506
})
@@ -550,9 +539,10 @@ func Test_Server_SetConfig_ConfigReplacement(t *testing.T) {
550539
// Set initial config
551540
config1 := &commoncap.RemoteExecutableConfig{
552541
RequestHashExcludedAttributes: []string{"attr1"},
542+
RequestTimeout: 5 * time.Second,
543+
ServerMaxParallelRequests: 3,
553544
}
554-
err := server.SetConfig(config1, underlying, capInfo, localDonInfo, workflowDONs,
555-
5*time.Second, 3, nil)
545+
err := server.SetConfig(config1, underlying, capInfo, localDonInfo, workflowDONs, nil)
556546
require.NoError(t, err)
557547

558548
// Verify server can start with valid config
@@ -563,9 +553,10 @@ func Test_Server_SetConfig_ConfigReplacement(t *testing.T) {
563553
// Replace with new config
564554
config2 := &commoncap.RemoteExecutableConfig{
565555
RequestHashExcludedAttributes: []string{"attr2", "attr3"},
556+
RequestTimeout: 10 * time.Second,
557+
ServerMaxParallelRequests: 5,
566558
}
567-
err = server.SetConfig(config2, underlying, capInfo, localDonInfo, workflowDONs,
568-
10*time.Second, 5, nil)
559+
err = server.SetConfig(config2, underlying, capInfo, localDonInfo, workflowDONs, nil)
569560
require.NoError(t, err)
570561

571562
// Clean up
@@ -618,9 +609,12 @@ func Test_Server_SetConfig_StartValidation(t *testing.T) {
618609
}
619610

620611
underlying := &TestCapability{}
621-
622-
err := server.SetConfig(&commoncap.RemoteExecutableConfig{}, underlying, capInfo,
623-
localDonInfo, workflowDONs, 10*time.Second, 5, nil)
612+
cfg := &commoncap.RemoteExecutableConfig{
613+
RequestTimeout: 10 * time.Second,
614+
ServerMaxParallelRequests: 5,
615+
}
616+
err := server.SetConfig(cfg, underlying, capInfo,
617+
localDonInfo, workflowDONs, nil)
624618
require.NoError(t, err)
625619

626620
err = server.Start(ctx)

0 commit comments

Comments
 (0)