Skip to content

Commit f35221b

Browse files
committed
[CRE-941] Dynamic config update in TriggerPublisher
1 parent b4763c6 commit f35221b

File tree

3 files changed

+199
-81
lines changed

3 files changed

+199
-81
lines changed

core/capabilities/launcher.go

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ type launcher struct {
6161
type cachedShims struct {
6262
combinedClients map[string]remote.CombinedClient
6363
triggerSubscribers map[string]remote.TriggerSubscriber
64+
triggerPublishers map[string]remote.TriggerPublisher
6465
executableClients map[string]executable.Client
65-
66-
// TODO(CRE-942): add trigger publishers and executable servers
66+
// TODO(CRE-942): add executable servers
6767
}
6868

6969
func shimKey(capID string, donID uint32, method string) string {
@@ -109,6 +109,7 @@ func NewLauncher(
109109
cachedShims: cachedShims{
110110
combinedClients: make(map[string]remote.CombinedClient),
111111
triggerSubscribers: make(map[string]remote.TriggerSubscriber),
112+
triggerPublishers: make(map[string]remote.TriggerPublisher),
112113
executableClients: make(map[string]executable.Client),
113114
},
114115
registry: registry,
@@ -630,17 +631,20 @@ func (w *launcher) serveCapability(ctx context.Context, cid string, c registrysy
630631
if !ok {
631632
return nil, errors.New("capability does not implement TriggerCapability")
632633
}
633-
634-
publisher := remote.NewTriggerPublisher(
635-
capabilityConfig.RemoteTriggerConfig,
636-
triggerCapability,
637-
info,
638-
don.DON,
639-
idsToDONs,
640-
w.dispatcher,
641-
"", // empty method name for v1
642-
w.lggr,
643-
)
634+
shimKey := shimKey(capability.ID, don.ID, "") // empty method name for V1
635+
publisher, alreadyExists := w.cachedShims.triggerPublishers[shimKey]
636+
if !alreadyExists {
637+
publisher = remote.NewTriggerPublisher(
638+
capability.ID,
639+
"", // empty method name for v1
640+
w.dispatcher,
641+
w.lggr,
642+
)
643+
w.cachedShims.triggerPublishers[shimKey] = publisher
644+
}
645+
if errCfg := publisher.SetConfig(capabilityConfig.RemoteTriggerConfig, triggerCapability, don.DON, idsToDONs); errCfg != nil {
646+
return nil, fmt.Errorf("failed to set config for trigger publisher: %w", errCfg)
647+
}
644648
return publisher, nil
645649
}
646650

@@ -890,16 +894,21 @@ func (w *launcher) exposeCapabilityV2(ctx context.Context, capID string, methodC
890894
if !ok {
891895
return fmt.Errorf("capability %s does not implement TriggerCapability", capID)
892896
}
893-
receiver = remote.NewTriggerPublisher(
894-
config.RemoteTriggerConfig,
895-
underlyingTriggerCapability,
896-
info,
897-
myDON.DON,
898-
idsToDONs,
899-
w.dispatcher,
900-
method,
901-
w.lggr,
902-
)
897+
shimKey := shimKey(capID, myDON.ID, method)
898+
publisher, alreadyExists := w.cachedShims.triggerPublishers[shimKey]
899+
if !alreadyExists {
900+
publisher = remote.NewTriggerPublisher(
901+
capID,
902+
method,
903+
w.dispatcher,
904+
w.lggr,
905+
)
906+
w.cachedShims.triggerPublishers[shimKey] = publisher
907+
}
908+
if errCfg := publisher.SetConfig(config.RemoteTriggerConfig, underlyingTriggerCapability, myDON.DON, idsToDONs); errCfg != nil {
909+
return fmt.Errorf("failed to set config for trigger publisher: %w", errCfg)
910+
}
911+
receiver = publisher
903912
}
904913
if receiver == nil && config.RemoteExecutableConfig != nil {
905914
underlyingExecutableCapability, ok := (underlying).(capabilities.ExecutableCapability)

0 commit comments

Comments
 (0)