Skip to content

Commit 7979bd8

Browse files
authored
feat: provide access to session events from executor callbacks (#521)
* provide access to session events from executor callbacks * gemini nitpicks * test cleanup
1 parent 39c4210 commit 7979bd8

File tree

4 files changed

+126
-14
lines changed

4 files changed

+126
-14
lines changed

server/adka2a/executor.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,13 @@ func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q
110110
if err != nil {
111111
return fmt.Errorf("a2a message conversion failed: %w", err)
112112
}
113-
r, err := runner.New(e.config.RunnerConfig)
113+
114+
runnerCfg, executorPlugin, err := withExecutorPlugin(e.config.RunnerConfig)
115+
if err != nil {
116+
return fmt.Errorf("failed to install a2a-executor plugin: %w", err)
117+
}
118+
119+
r, err := runner.New(runnerCfg)
114120
if err != nil {
115121
return fmt.Errorf("failed to create a runner: %w", err)
116122
}
@@ -137,10 +143,10 @@ func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q
137143

138144
invocationMeta := toInvocationMeta(ctx, e.config, reqCtx)
139145

140-
session, err := e.prepareSession(ctx, invocationMeta)
146+
err = e.prepareSession(ctx, invocationMeta)
141147
if err != nil {
142148
event := toTaskFailedUpdateEvent(reqCtx, err, invocationMeta.eventMeta)
143-
execCtx := newExecutorContext(ctx, invocationMeta, emptySessionState{}, content)
149+
execCtx := newExecutorContext(ctx, invocationMeta, executorPlugin, content)
144150
return e.writeFinalTaskStatus(execCtx, queue, event, err)
145151
}
146152

@@ -151,7 +157,7 @@ func (e *Executor) Execute(ctx context.Context, reqCtx *a2asrv.RequestContext, q
151157
}
152158

153159
processor := newEventProcessor(reqCtx, invocationMeta, e.config.GenAIPartConverter)
154-
executorContext := newExecutorContext(ctx, invocationMeta, session.State(), content)
160+
executorContext := newExecutorContext(ctx, invocationMeta, executorPlugin, content)
155161
return e.process(executorContext, r, processor, queue)
156162
}
157163

@@ -209,26 +215,26 @@ func (e *Executor) writeFinalTaskStatus(ctx ExecutorContext, queue eventqueue.Qu
209215
return nil
210216
}
211217

212-
func (e *Executor) prepareSession(ctx context.Context, meta invocationMeta) (session.Session, error) {
218+
func (e *Executor) prepareSession(ctx context.Context, meta invocationMeta) error {
213219
service := e.config.RunnerConfig.SessionService
214220

215-
getResp, err := service.Get(ctx, &session.GetRequest{
221+
_, err := service.Get(ctx, &session.GetRequest{
216222
AppName: e.config.RunnerConfig.AppName,
217223
UserID: meta.userID,
218224
SessionID: meta.sessionID,
219225
})
220226
if err == nil {
221-
return getResp.Session, nil
227+
return nil
222228
}
223229

224-
createResp, err := service.Create(ctx, &session.CreateRequest{
230+
_, err = service.Create(ctx, &session.CreateRequest{
225231
AppName: e.config.RunnerConfig.AppName,
226232
UserID: meta.userID,
227233
SessionID: meta.sessionID,
228234
State: make(map[string]any),
229235
})
230236
if err != nil {
231-
return nil, fmt.Errorf("failed to create a session: %w", err)
237+
return fmt.Errorf("failed to create a session: %w", err)
232238
}
233-
return createResp.Session, nil
239+
return nil
234240
}

server/adka2a/executor_context.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ type ExecutorContext interface {
3838
AgentName() string
3939
// ReadonlyState provides a view of the current session state.
4040
ReadonlyState() session.ReadonlyState
41+
// Events provides a readonly view of the current session events.
42+
Events() session.Events
4143
// UserContent is a converted A2A message which is passed to runner.Run.
4244
UserContent() *genai.Content
4345
// RequestContext containts information about the original A2A Request, the current task and related tasks.
@@ -47,15 +49,15 @@ type ExecutorContext interface {
4749
type executorContext struct {
4850
context.Context
4951
meta invocationMeta
50-
session session.ReadonlyState
52+
plugin *executorPlugin
5153
userContent *genai.Content
5254
}
5355

54-
func newExecutorContext(ctx context.Context, meta invocationMeta, session session.ReadonlyState, userContent *genai.Content) ExecutorContext {
56+
func newExecutorContext(ctx context.Context, meta invocationMeta, plugin *executorPlugin, userContent *genai.Content) ExecutorContext {
5557
return &executorContext{
5658
Context: ctx,
5759
meta: meta,
58-
session: session,
60+
plugin: plugin,
5961
userContent: userContent,
6062
}
6163
}
@@ -73,7 +75,19 @@ func (ec *executorContext) AgentName() string {
7375
}
7476

7577
func (ec *executorContext) ReadonlyState() session.ReadonlyState {
76-
return ec.session
78+
session := ec.plugin.invocationSession
79+
if session == nil {
80+
return emptySessionState{}
81+
}
82+
return session.State()
83+
}
84+
85+
func (ec *executorContext) Events() session.Events {
86+
session := ec.plugin.invocationSession
87+
if session == nil {
88+
return emptySessionEvents{}
89+
}
90+
return session.Events()
7791
}
7892

7993
func (ec *executorContext) RequestContext() *a2asrv.RequestContext {
@@ -93,3 +107,17 @@ func (emptySessionState) Get(string) (any, error) {
93107
func (emptySessionState) All() iter.Seq2[string, any] {
94108
return func(yield func(string, any) bool) {}
95109
}
110+
111+
type emptySessionEvents struct{}
112+
113+
func (emptySessionEvents) At(i int) *session.Event {
114+
return nil
115+
}
116+
117+
func (emptySessionEvents) All() iter.Seq[*session.Event] {
118+
return func(yield func(*session.Event) bool) {}
119+
}
120+
121+
func (emptySessionEvents) Len() int {
122+
return 0
123+
}

server/adka2a/executor_plugin.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package adka2a
16+
17+
import (
18+
"slices"
19+
20+
"google.golang.org/genai"
21+
22+
"google.golang.org/adk/agent"
23+
"google.golang.org/adk/plugin"
24+
"google.golang.org/adk/runner"
25+
"google.golang.org/adk/session"
26+
)
27+
28+
type executorPlugin struct {
29+
plugin *plugin.Plugin
30+
31+
invocationSession session.Session
32+
}
33+
34+
func withExecutorPlugin(cfg runner.Config) (runner.Config, *executorPlugin, error) {
35+
executorPlugin, err := newExecutorPlugin()
36+
if err != nil {
37+
return cfg, nil, err
38+
}
39+
cfg.PluginConfig.Plugins = append(slices.Clone(cfg.PluginConfig.Plugins), executorPlugin.plugin)
40+
return cfg, executorPlugin, nil
41+
}
42+
43+
func newExecutorPlugin() (*executorPlugin, error) {
44+
execPlugin := &executorPlugin{}
45+
plugin, err := plugin.New(plugin.Config{
46+
Name: "a2a-executor",
47+
BeforeRunCallback: func(ic agent.InvocationContext) (*genai.Content, error) {
48+
execPlugin.invocationSession = ic.Session()
49+
return nil, nil
50+
},
51+
})
52+
if err != nil {
53+
return nil, err
54+
}
55+
execPlugin.plugin = plugin
56+
return execPlugin, nil
57+
}

server/adka2a/executor_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,27 @@ func TestExecutor_Callbacks(t *testing.T) {
404404
newFinalStatusUpdate(task, a2a.TaskStateCompleted, nil),
405405
},
406406
},
407+
{
408+
name: "can access session events",
409+
events: []*session.Event{
410+
{LLMResponse: modelResponseFromParts(genai.NewPartFromText("Hello"))},
411+
{LLMResponse: modelResponseFromParts(genai.NewPartFromText(", world!"))},
412+
},
413+
afterExecution: func(ctx ExecutorContext, finalUpdate *a2a.TaskStatusUpdateEvent, err error) error {
414+
eventCount := ctx.Events().Len()
415+
finalUpdate.Status.Message = a2a.NewMessage(a2a.MessageRoleAgent, a2a.TextPart{Text: fmt.Sprintf("event count = %d", eventCount)})
416+
return nil
417+
},
418+
wantEvents: []a2a.Event{
419+
a2a.NewStatusUpdateEvent(task, a2a.TaskStateWorking, nil),
420+
a2a.NewArtifactEvent(task, a2a.TextPart{Text: "Hello"}),
421+
a2a.NewArtifactUpdateEvent(task, a2a.NewArtifactID(), a2a.TextPart{Text: ", world!"}),
422+
newArtifactLastChunkEvent(task),
423+
newFinalStatusUpdate(task, a2a.TaskStateCompleted,
424+
a2a.NewMessage(a2a.MessageRoleAgent, a2a.TextPart{Text: "event count = 3"}),
425+
),
426+
},
427+
},
407428
{
408429
name: "abort execution",
409430
events: []*session.Event{

0 commit comments

Comments
 (0)