Skip to content

Commit 30c8fba

Browse files
committed
unit tests
1 parent 3f0ed42 commit 30c8fba

File tree

7 files changed

+506
-63
lines changed

7 files changed

+506
-63
lines changed

core/capabilities/launcher.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
843843
if !alreadyExists {
844844
sub = remote.NewTriggerSubscriber(capID, method, w.dispatcher, w.lggr)
845845
cc.SetTriggerSubscriber(method, sub)
846+
// add to cachedShims later, only after startNewShim succeeds
846847
}
847848
// TODO(CRE-590): add support for SignedReportAggregator (needed by LLO Streams Trigger V2)
848849
agg := aggregation.NewDefaultModeAggregator(config.RemoteTriggerConfig.MinResponsesToAggregate)
@@ -864,6 +865,7 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
864865
if !alreadyExists {
865866
client = executable.NewClient(info.ID, method, w.dispatcher, w.lggr)
866867
cc.SetExecutableClient(method, client)
868+
// add to cachedShims later, only after startNewShim succeeds
867869
}
868870
// Update existing client config
869871
transmissionConfig := &transmission.TransmissionConfig{
@@ -897,7 +899,7 @@ func (w *launcher) addRemoteCapabilityV2(ctx context.Context, capID string, meth
897899
}
898900

899901
func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.ReceiverService, capID string, donID uint32, method string) error {
900-
w.lggr.Debugw("Enabling external access for capability method", "id", capID, "method", method, "donID", donID)
902+
w.lggr.Debugw("Starting new remote shim for capability method", "id", capID, "method", method, "donID", donID)
901903
if err := receiver.Start(ctx); err != nil {
902904
return fmt.Errorf("failed to start receiver for capability %s, method %s: %w", capID, method, err)
903905
}
@@ -906,6 +908,7 @@ func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.Receiv
906908
return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err)
907909
}
908910
w.subServices = append(w.subServices, receiver)
911+
w.lggr.Debugw("New remote shim started successfully for capability method", "id", capID, "method", method, "donID", donID)
909912
return nil
910913
}
911914

@@ -938,7 +941,7 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC
938941
w.dispatcher,
939942
w.lggr,
940943
)
941-
w.cachedShims.triggerPublishers[shimKey] = publisher
944+
// add to cachedShims later, only after startNewShim succeeds
942945
}
943946
if errCfg := publisher.SetConfig(config.RemoteTriggerConfig, underlyingTriggerCapability, myDON.DON, idsToDONs); errCfg != nil {
944947
return fmt.Errorf("failed to set config for trigger publisher: %w", errCfg)
@@ -969,7 +972,7 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC
969972
w.dispatcher,
970973
w.lggr,
971974
)
972-
w.cachedShims.executableServers[shimKey] = server
975+
// add to cachedShims later, only after startNewShim succeeds
973976
}
974977

975978
var requestHasher remotetypes.MessageHasher

core/capabilities/remote/executable/client.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ var _ Client = &client{}
5858
var _ types.Receiver = &client{}
5959
var _ services.Service = &client{}
6060

61-
const expiryCheckInterval = 30 * time.Second
61+
const defaultExpiryCheckInterval = 30 * time.Second
6262

6363
var (
6464
ErrRequestExpired = errors.New("request expired by executable client")
@@ -162,28 +162,26 @@ func (c *client) checkDispatcherReady() {
162162
}
163163

164164
func (c *client) checkForExpiredRequests() {
165-
cfg := c.cfg.Load()
166-
if cfg == nil {
167-
c.lggr.Errorw("config not set, cannot check for expired requests")
168-
return
169-
}
170-
171-
tickerInterval := expiryCheckInterval
172-
if cfg.requestTimeout < tickerInterval {
173-
tickerInterval = cfg.requestTimeout
174-
}
175-
ticker := time.NewTicker(tickerInterval)
165+
ticker := time.NewTicker(getClientTickerInterval(c.cfg.Load()))
176166
defer ticker.Stop()
177167
for {
178168
select {
179169
case <-c.stopCh:
180170
return
181171
case <-ticker.C:
172+
ticker.Reset(getClientTickerInterval(c.cfg.Load()))
182173
c.expireRequests()
183174
}
184175
}
185176
}
186177

178+
func getClientTickerInterval(cfg *dynamicConfig) time.Duration {
179+
if cfg != nil && cfg.requestTimeout > 0 {
180+
return cfg.requestTimeout
181+
}
182+
return defaultExpiryCheckInterval
183+
}
184+
187185
func (c *client) expireRequests() {
188186
c.mutex.Lock()
189187
defer c.mutex.Unlock()

core/capabilities/remote/executable/client_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,3 +370,151 @@ func (t *clientTestServer) sendResponse(messageID string, responseErr error,
370370
}
371371
}
372372
}
373+
374+
func TestClient_SetConfig(t *testing.T) {
375+
lggr := logger.Test(t)
376+
capabilityID := "[email protected]"
377+
378+
// Create broker and dispatcher like other tests
379+
broker := newTestAsyncMessageBroker(t, 100)
380+
peerID := NewP2PPeerID(t)
381+
dispatcher := broker.NewDispatcherForNode(peerID)
382+
client := executable.NewClient(capabilityID, "execute", dispatcher, lggr)
383+
384+
// Create valid test data
385+
validCapInfo := commoncap.CapabilityInfo{
386+
ID: capabilityID,
387+
CapabilityType: commoncap.CapabilityTypeAction,
388+
Description: "Test capability",
389+
}
390+
391+
validDonInfo := commoncap.DON{
392+
ID: 1,
393+
Members: []p2ptypes.PeerID{NewP2PPeerID(t)},
394+
F: 0,
395+
}
396+
397+
validTimeout := 30 * time.Second
398+
399+
t.Run("successful config set", func(t *testing.T) {
400+
transmissionConfig := &transmission.TransmissionConfig{
401+
Schedule: transmission.Schedule_OneAtATime,
402+
DeltaStage: 10 * time.Millisecond,
403+
}
404+
405+
err := client.SetConfig(validCapInfo, validDonInfo, validTimeout, transmissionConfig)
406+
require.NoError(t, err)
407+
408+
// Verify config was set
409+
info, err := client.Info(context.Background())
410+
require.NoError(t, err)
411+
assert.Equal(t, validCapInfo.ID, info.ID)
412+
})
413+
414+
t.Run("mismatched capability ID", func(t *testing.T) {
415+
invalidCapInfo := commoncap.CapabilityInfo{
416+
417+
CapabilityType: commoncap.CapabilityTypeAction,
418+
}
419+
420+
err := client.SetConfig(invalidCapInfo, validDonInfo, validTimeout, nil)
421+
require.Error(t, err)
422+
assert.Contains(t, err.Error(), "capability info provided does not match the client's capabilityID")
423+
assert.Contains(t, err.Error(), "[email protected] != [email protected]")
424+
})
425+
426+
t.Run("empty DON members", func(t *testing.T) {
427+
invalidDonInfo := commoncap.DON{
428+
ID: 1,
429+
Members: []p2ptypes.PeerID{},
430+
F: 0,
431+
}
432+
433+
err := client.SetConfig(validCapInfo, invalidDonInfo, validTimeout, nil)
434+
require.Error(t, err)
435+
assert.Contains(t, err.Error(), "empty localDonInfo provided")
436+
})
437+
438+
t.Run("successful config update", func(t *testing.T) {
439+
// Set initial config
440+
initialTimeout := 10 * time.Second
441+
err := client.SetConfig(validCapInfo, validDonInfo, initialTimeout, nil)
442+
require.NoError(t, err)
443+
444+
// Replace with new config
445+
newTimeout := 60 * time.Second
446+
newDonInfo := commoncap.DON{
447+
ID: 2,
448+
Members: []p2ptypes.PeerID{NewP2PPeerID(t), NewP2PPeerID(t)},
449+
F: 1,
450+
}
451+
452+
err = client.SetConfig(validCapInfo, newDonInfo, newTimeout, nil)
453+
require.NoError(t, err)
454+
455+
// Verify the config was completely replaced
456+
info, err := client.Info(context.Background())
457+
require.NoError(t, err)
458+
assert.Equal(t, validCapInfo.ID, info.ID)
459+
})
460+
}
461+
462+
func TestClient_SetConfig_StartClose(t *testing.T) {
463+
ctx := testutils.Context(t)
464+
lggr := logger.Test(t)
465+
capabilityID := "[email protected]"
466+
467+
// Create broker and dispatcher like other tests
468+
broker := newTestAsyncMessageBroker(t, 100)
469+
peerID := NewP2PPeerID(t)
470+
dispatcher := broker.NewDispatcherForNode(peerID)
471+
client := executable.NewClient(capabilityID, "execute", dispatcher, lggr)
472+
473+
validCapInfo := commoncap.CapabilityInfo{
474+
ID: capabilityID,
475+
CapabilityType: commoncap.CapabilityTypeAction,
476+
Description: "Test capability",
477+
}
478+
479+
validDonInfo := commoncap.DON{
480+
ID: 1,
481+
Members: []p2ptypes.PeerID{NewP2PPeerID(t)},
482+
F: 0,
483+
}
484+
485+
validTimeout := 30 * time.Second
486+
487+
t.Run("start fails without config", func(t *testing.T) {
488+
clientWithoutConfig := executable.NewClient(capabilityID, "execute", dispatcher, lggr)
489+
err := clientWithoutConfig.Start(ctx)
490+
require.Error(t, err)
491+
assert.Contains(t, err.Error(), "config not set - call SetConfig() before Start()")
492+
})
493+
494+
t.Run("start succeeds after config set", func(t *testing.T) {
495+
require.NoError(t, client.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
496+
require.NoError(t, client.Start(ctx))
497+
require.NoError(t, client.Close())
498+
})
499+
500+
t.Run("config can be updated after start", func(t *testing.T) {
501+
// Create a fresh client for this test since services can only be started once
502+
freshClient := executable.NewClient(capabilityID, "execute", dispatcher, lggr)
503+
504+
// Set initial config and start
505+
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
506+
require.NoError(t, freshClient.Start(ctx))
507+
508+
// Update config while running
509+
validCapInfo.Description = "new description"
510+
require.NoError(t, freshClient.SetConfig(validCapInfo, validDonInfo, validTimeout, nil))
511+
512+
// Verify config was updated
513+
info, err := freshClient.Info(ctx)
514+
require.NoError(t, err)
515+
assert.Equal(t, validCapInfo.Description, info.Description)
516+
517+
// Clean up
518+
require.NoError(t, freshClient.Close())
519+
})
520+
}

core/capabilities/remote/executable/server.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,7 @@ func (r *server) Start(ctx context.Context) error {
164164
r.wg.Add(1)
165165
go func() {
166166
defer r.wg.Done()
167-
tickerInterval := min(cfg.requestTimeout, expiryCheckInterval)
168-
ticker := time.NewTicker(tickerInterval)
167+
ticker := time.NewTicker(getServerTickerInterval(cfg))
169168
defer ticker.Stop()
170169

171170
r.lggr.Info("executable capability server started")
@@ -174,6 +173,7 @@ func (r *server) Start(ctx context.Context) error {
174173
case <-r.stopCh:
175174
return
176175
case <-ticker.C:
176+
ticker.Reset(getServerTickerInterval(cfg))
177177
r.expireRequests()
178178
}
179179
}
@@ -187,6 +187,13 @@ func (r *server) Start(ctx context.Context) error {
187187
})
188188
}
189189

190+
func getServerTickerInterval(cfg *dynamicServerConfig) time.Duration {
191+
if cfg != nil && cfg.requestTimeout > 0 {
192+
return cfg.requestTimeout
193+
}
194+
return defaultExpiryCheckInterval
195+
}
196+
190197
func (r *server) Close() error {
191198
return r.StopOnce(r.Name(), func() error {
192199
close(r.stopCh)
@@ -290,22 +297,14 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) {
290297
}
291298

292299
reqAndMsgID := r.requestIDToRequest[requestID]
293-
if r.parallelExecutor != nil {
294-
if executeTaskErr := r.parallelExecutor.ExecuteTask(ctx,
295-
func(ctx context.Context) {
296-
err = reqAndMsgID.request.OnMessage(ctx, msg)
297-
if err != nil {
298-
r.lggr.Errorw("failed to execute on message", "messageID", reqAndMsgID.messageID, "err", err)
299-
}
300-
}); executeTaskErr != nil {
301-
r.lggr.Errorw("failed to execute on message task", "messageID", messageID, "err", executeTaskErr)
302-
}
303-
} else {
304-
// Fallback to direct execution if parallel executor is not initialized
305-
err = reqAndMsgID.request.OnMessage(ctx, msg)
306-
if err != nil {
307-
r.lggr.Errorw("failed to execute on message", "messageID", reqAndMsgID.messageID, "err", err)
308-
}
300+
if executeTaskErr := r.parallelExecutor.ExecuteTask(ctx,
301+
func(ctx context.Context) {
302+
err = reqAndMsgID.request.OnMessage(ctx, msg)
303+
if err != nil {
304+
r.lggr.Errorw("failed to execute on message", "messageID", reqAndMsgID.messageID, "err", err)
305+
}
306+
}); executeTaskErr != nil {
307+
r.lggr.Errorw("failed to execute on message task", "messageID", messageID, "err", executeTaskErr)
309308
}
310309
}
311310

0 commit comments

Comments
 (0)