Skip to content

Commit 4057c34

Browse files
committed
feat(obs): basic agent instrumentation
Adds a new `observability` property to Agents that receives events during the Agent's exeuction that can be used for whatever. The default observability implementation prints the events to console, but in the future this could be hooked up to a UI or some other observability tooling.
1 parent 8bf3250 commit 4057c34

File tree

5 files changed

+258
-3
lines changed

5 files changed

+258
-3
lines changed

packages/agents/package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@
5858
"import": "./dist/mcp/do-oauth-client-provider.js",
5959
"require": "./dist/mcp/do-oauth-client-provider.js"
6060
},
61+
"./observability": {
62+
"import": "./dist/observability/index.js",
63+
"require": "./dist/observability/index.js",
64+
"types": "./dist/observability/index.d.ts"
65+
},
6166
"./react": {
6267
"types": "./dist/react.d.ts",
6368
"import": "./dist/react.js",

packages/agents/scripts/build.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ async function main() {
1111
"src/mcp/index.ts",
1212
"src/mcp/client.ts",
1313
"src/mcp/do-oauth-client-provider.ts",
14+
"src/observability/index.ts",
1415
],
1516
external: [
1617
"cloudflare:workers",

packages/agents/src/ai-chat-agent.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,23 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
7676
},
7777
[connection.id]
7878
);
79+
80+
const incomingMessages = this._messagesNotAlreadyInAgent(messages);
7981
await this.persistMessages(messages, [connection.id]);
8082

83+
this.observability?.emit(
84+
{
85+
displayMessage: "Chat message request",
86+
id: data.id,
87+
payload: {
88+
message: incomingMessages,
89+
},
90+
timestamp: Date.now(),
91+
type: "message:request",
92+
},
93+
this.ctx
94+
);
95+
8196
const chatMessageId = data.id;
8297
const abortSignal = this._getAbortSignal(chatMessageId);
8398

@@ -89,8 +104,23 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
89104
responseMessages: response.messages,
90105
});
91106

107+
const outgoingMessages =
108+
this._messagesNotAlreadyInAgent(finalMessages);
92109
await this.persistMessages(finalMessages, [connection.id]);
93110
this._removeAbortController(chatMessageId);
111+
112+
this.observability?.emit(
113+
{
114+
displayMessage: "Chat message response",
115+
id: data.id,
116+
payload: {
117+
message: outgoingMessages,
118+
},
119+
timestamp: Date.now(),
120+
type: "message:response",
121+
},
122+
this.ctx
123+
);
94124
},
95125
abortSignal ? { abortSignal } : undefined
96126
);
@@ -219,6 +249,11 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
219249
);
220250
}
221251

252+
private _messagesNotAlreadyInAgent(messages: ChatMessage[]) {
253+
const existingIds = new Set(this.messages.map((message) => message.id));
254+
return messages.filter((message) => !existingIds.has(message.id));
255+
}
256+
222257
private async _reply(id: string, response: Response) {
223258
// now take chunks out from dataStreamResponse and send them to the client
224259
return this._tryCatchChat(async () => {

packages/agents/src/index.ts

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { camelCaseToKebabCase } from "./client";
2323
import { MCPClientManager } from "./mcp/client";
2424
// import type { MCPClientConnection } from "./mcp/client-connection";
2525
import { DurableObjectOAuthClientProvider } from "./mcp/do-oauth-client-provider";
26+
import { genericObservability, type Observability } from "./observability";
2627

2728
export type { Connection, ConnectionContext, WSMessage } from "partyserver";
2829

@@ -316,6 +317,11 @@ export class Agent<Env, State = unknown> extends Server<Env> {
316317
hibernate: true, // default to hibernate
317318
};
318319

320+
/**
321+
* The observability implementation to use for the Agent
322+
*/
323+
observability?: Observability = genericObservability;
324+
319325
/**
320326
* Execute SQL queries against the Agent's database
321327
* @template T Type of the returned rows
@@ -460,6 +466,23 @@ export class Agent<Env, State = unknown> extends Server<Env> {
460466

461467
// For regular methods, execute and send response
462468
const result = await methodFn.apply(this, args);
469+
470+
this.observability?.emit(
471+
{
472+
displayMessage: `RPC call to ${method}`,
473+
id: nanoid(),
474+
payload: {
475+
args,
476+
method,
477+
streaming: metadata?.streaming,
478+
success: true,
479+
},
480+
timestamp: Date.now(),
481+
type: "rpc",
482+
},
483+
this.ctx
484+
);
485+
463486
const response: RPCResponse = {
464487
done: true,
465488
id,
@@ -512,6 +535,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
512535
})
513536
);
514537

538+
this.observability?.emit(
539+
{
540+
displayMessage: "Connection established",
541+
id: nanoid(),
542+
payload: {
543+
connectionId: connection.id,
544+
},
545+
timestamp: Date.now(),
546+
type: "connect",
547+
},
548+
this.ctx
549+
);
515550
return this._tryCatch(() => _onConnect(connection, ctx));
516551
}, 20);
517552
}
@@ -561,6 +596,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
561596
state: State,
562597
source: Connection | "server" = "server"
563598
) {
599+
const previousState = this._state;
564600
this._state = state;
565601
this.sql`
566602
INSERT OR REPLACE INTO cf_agents_state (id, state)
@@ -582,6 +618,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
582618
return agentContext.run(
583619
{ agent: this, connection, request },
584620
async () => {
621+
this.observability?.emit(
622+
{
623+
displayMessage: "State updated",
624+
id: nanoid(),
625+
payload: {
626+
previousState,
627+
state,
628+
},
629+
timestamp: Date.now(),
630+
type: "state:update",
631+
},
632+
this.ctx
633+
);
585634
return this.onStateUpdate(state, source);
586635
}
587636
);
@@ -677,6 +726,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
677726
): Promise<Schedule<T>> {
678727
const id = nanoid(9);
679728

729+
const emitScheduleCreate = (schedule: Schedule<T>) =>
730+
this.observability?.emit(
731+
{
732+
displayMessage: `Schedule ${schedule.id} created`,
733+
id: nanoid(),
734+
payload: schedule,
735+
timestamp: Date.now(),
736+
type: "schedule:create",
737+
},
738+
this.ctx
739+
);
740+
680741
if (typeof callback !== "string") {
681742
throw new Error("Callback must be a string");
682743
}
@@ -696,13 +757,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
696757

697758
await this._scheduleNextAlarm();
698759

699-
return {
760+
const schedule: Schedule<T> = {
700761
callback: callback,
701762
id,
702763
payload: payload as T,
703764
time: timestamp,
704765
type: "scheduled",
705766
};
767+
768+
emitScheduleCreate(schedule);
769+
770+
return schedule;
706771
}
707772
if (typeof when === "number") {
708773
const time = new Date(Date.now() + when * 1000);
@@ -717,14 +782,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
717782

718783
await this._scheduleNextAlarm();
719784

720-
return {
785+
const schedule: Schedule<T> = {
721786
callback: callback,
722787
delayInSeconds: when,
723788
id,
724789
payload: payload as T,
725790
time: timestamp,
726791
type: "delayed",
727792
};
793+
794+
emitScheduleCreate(schedule);
795+
796+
return schedule;
728797
}
729798
if (typeof when === "string") {
730799
const nextExecutionTime = getNextCronTime(when);
@@ -739,14 +808,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
739808

740809
await this._scheduleNextAlarm();
741810

742-
return {
811+
const schedule: Schedule<T> = {
743812
callback: callback,
744813
cron: when,
745814
id,
746815
payload: payload as T,
747816
time: timestamp,
748817
type: "cron",
749818
};
819+
820+
emitScheduleCreate(schedule);
821+
822+
return schedule;
750823
}
751824
throw new Error("Invalid schedule type");
752825
}
@@ -822,6 +895,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
822895
* @returns true if the task was cancelled, false otherwise
823896
*/
824897
async cancelSchedule(id: string): Promise<boolean> {
898+
const schedule = await this.getSchedule(id);
899+
if (schedule) {
900+
this.observability?.emit(
901+
{
902+
displayMessage: `Schedule ${id} cancelled`,
903+
id: nanoid(),
904+
payload: schedule,
905+
timestamp: Date.now(),
906+
type: "schedule:cancel",
907+
},
908+
this.ctx
909+
);
910+
}
825911
this.sql`DELETE FROM cf_agents_schedules WHERE id = ${id}`;
826912

827913
await this._scheduleNextAlarm();
@@ -870,6 +956,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
870956
{ agent: this, connection: undefined, request: undefined },
871957
async () => {
872958
try {
959+
this.observability?.emit(
960+
{
961+
displayMessage: `Schedule ${row.id} executed`,
962+
id: nanoid(),
963+
payload: row,
964+
timestamp: Date.now(),
965+
type: "schedule:execute",
966+
},
967+
this.ctx
968+
);
969+
873970
await (
874971
callback as (
875972
payload: unknown,
@@ -914,6 +1011,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
9141011
await this.ctx.storage.deleteAlarm();
9151012
await this.ctx.storage.deleteAll();
9161013
this.ctx.abort("destroyed"); // enforce that the agent is evicted
1014+
1015+
this.observability?.emit(
1016+
{
1017+
displayMessage: "Agent destroyed",
1018+
id: nanoid(),
1019+
payload: {},
1020+
timestamp: Date.now(),
1021+
type: "destroy",
1022+
},
1023+
this.ctx
1024+
);
9171025
}
9181026

9191027
/**

0 commit comments

Comments
 (0)