@@ -20,6 +20,7 @@ import (
2020
2121 goakt "github.com/tochemey/goakt/v3/actor"
2222 "github.com/tochemey/goakt/v3/goaktpb"
23+ "github.com/tochemey/goakt/v3/passivation"
2324
2425 "github.com/rs/xid"
2526 "google.golang.org/protobuf/proto"
@@ -89,15 +90,23 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
8990 initializing : initializing ,
9091 }
9192
93+ agentIdleTimeout := utils .GetDurationFromEnv ("MODUS_AGENT_IDLE_TIMEOUT_SECONDS" , 2 , time .Second )
94+ var agentPassivationStrategy = passivation .NewTimeBasedStrategy (agentIdleTimeout )
95+
9296 actorName := getActorName (agentId )
9397 _ , err := _actorSystem .Spawn (ctx , actorName , actor ,
94- goakt .WithLongLived ( ),
98+ goakt .WithPassivationStrategy ( agentPassivationStrategy ),
9599 goakt .WithDependencies (& wasmAgentInfo {
96100 AgentName : agentName ,
97101 PluginName : pluginName ,
98102 }),
99103 )
100104
105+ if err != nil {
106+ sentryutils .CaptureError (ctx , err , "Error spawning agent actor" ,
107+ sentryutils .WithData ("agent_id" , agentId ))
108+ }
109+
101110 return err
102111}
103112
@@ -195,60 +204,73 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data
195204
196205 actorName := getActorName (agentId )
197206
207+ // Pause passivation to ensure the actor is not passivated while processing the message.
208+ if err := tell (ctx , actorName , & goaktpb.PausePassivation {}); errors .Is (err , goakt .ErrActorNotFound ) {
209+ state , err := db .GetAgentState (ctx , agentId )
210+ if errors .Is (err , db .ErrAgentNotFound ) {
211+ return newAgentMessageErrorResponse (fmt .Sprintf ("agent %s not found" , agentId )), nil
212+ } else if err != nil {
213+ return nil , fmt .Errorf ("error getting agent state for %s: %w" , agentId , err )
214+ }
215+
216+ switch AgentStatus (state .Status ) {
217+ case AgentStatusStopping , AgentStatusTerminated :
218+ return newAgentMessageErrorResponse ("agent is no longer available" ), nil
219+ }
220+
221+ // Restart the agent actor locally if it is not running.
222+ var pluginName string
223+ if plugin , ok := plugins .GetPluginFromContext (ctx ); ok {
224+ pluginName = plugin .Name ()
225+ } else {
226+ return nil , errors .New ("no plugin found in context" )
227+ }
228+ agentName := state .Name
229+ if err := spawnActorForAgent (ctx , pluginName , agentId , agentName , false ); err != nil {
230+ return nil , fmt .Errorf ("error spawning actor for agent %s: %w" , agentId , err )
231+ }
232+
233+ // Try again.
234+ if err := tell (ctx , actorName , & goaktpb.PausePassivation {}); err != nil {
235+ return nil , fmt .Errorf ("error sending message to agent: %w" , err )
236+ }
237+ } else if err != nil {
238+ sentryutils .CaptureError (ctx , err , "Error pausing passivation for agent" ,
239+ sentryutils .WithData ("agent_id" , agentId ))
240+ return nil , fmt .Errorf ("error sending message to agent: %w" , err )
241+ }
242+
243+ defer func () {
244+ // Resume passivation after the message is sent.
245+ if err := tell (ctx , actorName , & goaktpb.ResumePassivation {}); err != nil {
246+ const msg = "Error resuming passivation after sending message to agent."
247+ logger .Error (ctx , err ).Str ("agent_id" , agentId ).Msg (msg )
248+ sentryutils .CaptureError (ctx , err , msg ,
249+ sentryutils .WithData ("agent_id" , agentId ))
250+ }
251+ }()
252+
198253 msg := & messages.AgentRequest {
199254 Name : msgName ,
200255 Data : data ,
201256 Respond : timeout > 0 ,
202257 }
203258
204259 var err error
205- const maxRetries = 3
206- for attempt := 1 ; attempt <= maxRetries ; attempt ++ {
260+ var res proto.Message
261+ if timeout == 0 {
262+ err = tell (ctx , actorName , msg )
263+ } else {
264+ res , err = ask (ctx , actorName , msg , time .Duration (timeout ))
265+ }
207266
208- var res proto.Message
209- if timeout == 0 {
210- err = tell (ctx , actorName , msg )
267+ if err == nil {
268+ if res == nil {
269+ return newAgentMessageDataResponse (nil ), nil
270+ } else if response , ok := res .(* messages.AgentResponse ); ok {
271+ return newAgentMessageDataResponse (response .Data ), nil
211272 } else {
212- res , err = ask (ctx , actorName , msg , time .Duration (timeout ))
213- }
214-
215- if err == nil {
216- if res == nil {
217- return newAgentMessageDataResponse (nil ), nil
218- } else if response , ok := res .(* messages.AgentResponse ); ok {
219- return newAgentMessageDataResponse (response .Data ), nil
220- } else {
221- return nil , fmt .Errorf ("unexpected agent response type: %T" , res )
222- }
223- }
224-
225- if errors .Is (err , goakt .ErrActorNotFound ) {
226- state , err := db .GetAgentState (ctx , agentId )
227- if errors .Is (err , db .ErrAgentNotFound ) {
228- return newAgentMessageErrorResponse (fmt .Sprintf ("agent %s not found" , agentId )), nil
229- } else if err != nil {
230- return nil , fmt .Errorf ("error getting agent state for %s: %w" , agentId , err )
231- }
232-
233- switch AgentStatus (state .Status ) {
234- case AgentStatusStopping , AgentStatusTerminated :
235- return newAgentMessageErrorResponse ("agent is no longer available" ), nil
236- }
237-
238- // Restart the agent actor locally if it is not running.
239- var pluginName string
240- if plugin , ok := plugins .GetPluginFromContext (ctx ); ! ok {
241- return nil , fmt .Errorf ("no plugin found in context" )
242- } else {
243- pluginName = plugin .Name ()
244- }
245- agentName := state .Name
246- if err := spawnActorForAgent (ctx , pluginName , agentId , agentName , false ); err != nil {
247- return nil , fmt .Errorf ("error spawning actor for agent %s: %w" , agentId , err )
248- }
249-
250- // Retry sending the message to the agent actor.
251- continue
273+ return nil , fmt .Errorf ("unexpected agent response type: %T" , res )
252274 }
253275 }
254276
0 commit comments