Skip to content

Commit b46cc86

Browse files
committed
Add getResponse API with multiple consumption patterns
Implements a new client.getResponse() method that provides flexible ways to consume streaming responses: - await response.text - Get complete text - await response.message - Get complete AssistantMessage - response.textStream - Stream text deltas - response.newMessagesStream - Stream incremental AssistantMessage updates - response.reasoningStream - Stream reasoning deltas - response.toolStream - Stream tool call deltas - response.fullResponsesStream - Stream all raw events - response.fullChatStream - Stream in chat-compatible format All consumption patterns support both concurrent and sequential access, allowing users to mix and match approaches as needed. Key implementation details: - ReusableReadableStream enables multiple concurrent consumers without blocking - ResponseWrapper provides lazy initialization with cached promises - Stream transformers convert ResponsesAPI events to different formats - Returns AssistantMessage type for consistency with chat API Tests: 19 passing, 1 skipped (model unavailable)
1 parent 94c987e commit b46cc86

File tree

8 files changed

+1357
-1
lines changed

8 files changed

+1357
-1
lines changed

src/funcs/getResponse.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { OpenRouterCore } from "../core.js";
2+
import { RequestOptions } from "../lib/sdks.js";
3+
import { ResponseWrapper } from "../lib/response-wrapper.js";
4+
import * as models from "../models/index.js";
5+
6+
/**
7+
* Get a response with multiple consumption patterns
8+
*
9+
* @remarks
10+
* Creates a response using the OpenResponses API in streaming mode and returns
11+
* a wrapper that allows consuming the response in multiple ways:
12+
*
13+
* - `await response.message` - Get the completed message
14+
* - `await response.text` - Get just the text content
15+
* - `for await (const delta of response.textStream)` - Stream text deltas
16+
* - `for await (const delta of response.reasoningStream)` - Stream reasoning deltas
17+
* - `for await (const delta of response.toolStream)` - Stream tool call argument deltas
18+
* - `for await (const msg of response.newMessagesStream)` - Stream incremental message updates
19+
* - `for await (const event of response.fullResponsesStream)` - Stream all response events
20+
* - `for await (const chunk of response.fullChatStream)` - Stream in chat-compatible format
21+
*
22+
* All consumption patterns can be used concurrently on the same response.
23+
*
24+
* @example
25+
* ```typescript
26+
* // Simple text extraction
27+
* const response = await openrouter.beta.responses.get({
28+
* model: "anthropic/claude-3-opus",
29+
* input: [{ role: "user", content: "Hello!" }]
30+
* });
31+
* const text = await response.text;
32+
* console.log(text);
33+
*
34+
* // Streaming text
35+
* const response = openrouter.beta.responses.get({
36+
* model: "anthropic/claude-3-opus",
37+
* input: [{ role: "user", content: "Hello!" }]
38+
* });
39+
* for await (const delta of response.textStream) {
40+
* process.stdout.write(delta);
41+
* }
42+
*
43+
* // Full message with metadata
44+
* const response = openrouter.beta.responses.get({
45+
* model: "anthropic/claude-3-opus",
46+
* input: [{ role: "user", content: "Hello!" }]
47+
* });
48+
* const message = await response.message;
49+
* console.log(message.content);
50+
* ```
51+
*/
52+
export function getResponse(
53+
client: OpenRouterCore,
54+
request: Omit<models.OpenResponsesRequest, "stream">,
55+
options?: RequestOptions,
56+
): ResponseWrapper {
57+
return new ResponseWrapper({
58+
client,
59+
request: { ...request },
60+
options: options ?? {},
61+
});
62+
}

src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,9 @@ export * from "./lib/config.js";
66
export * as files from "./lib/files.js";
77
export { HTTPClient } from "./lib/http.js";
88
export type { Fetcher, HTTPClientOptions } from "./lib/http.js";
9+
// #region imports
10+
export { ResponseWrapper } from "./lib/response-wrapper.js";
11+
export type { GetResponseOptions } from "./lib/response-wrapper.js";
12+
export { ReusableReadableStream } from "./lib/reusable-stream.js";
13+
// #endregion
914
export * from "./sdk/sdk.js";

src/lib/response-wrapper.ts

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
import { OpenRouterCore } from "../core.js";
2+
import { EventStream } from "./event-streams.js";
3+
import { RequestOptions } from "./sdks.js";
4+
import * as models from "../models/index.js";
5+
import { betaResponsesSend } from "../funcs/betaResponsesSend.js";
6+
import { ReusableReadableStream } from "./reusable-stream.js";
7+
import {
8+
extractTextDeltas,
9+
extractReasoningDeltas,
10+
extractToolDeltas,
11+
buildMessageStream,
12+
consumeStreamForCompletion,
13+
extractMessageFromResponse,
14+
extractTextFromResponse,
15+
} from "./stream-transformers.js";
16+
17+
export interface GetResponseOptions {
18+
request: models.OpenResponsesRequest;
19+
client: OpenRouterCore;
20+
options?: RequestOptions;
21+
}
22+
23+
/**
24+
* A wrapper around a streaming response that provides multiple consumption patterns.
25+
*
26+
* Allows consuming the response in multiple ways:
27+
* - `await response.message` - Get the completed message
28+
* - `await response.text` - Get just the text
29+
* - `for await (const delta of response.textStream)` - Stream text deltas
30+
* - `for await (const msg of response.messageStream)` - Stream incremental message updates
31+
* - `for await (const event of response.fullResponsesStream)` - Stream all response events
32+
*
33+
* All consumption patterns can be used concurrently thanks to the underlying
34+
* ReusableReadableStream implementation.
35+
*/
36+
export class ResponseWrapper {
37+
private reusableStream: ReusableReadableStream<models.OpenResponsesStreamEvent> | null = null;
38+
private streamPromise: Promise<EventStream<models.OpenResponsesStreamEvent>> | null = null;
39+
private messagePromise: Promise<models.AssistantMessage> | null = null;
40+
private textPromise: Promise<string> | null = null;
41+
private options: GetResponseOptions;
42+
private initPromise: Promise<void> | null = null;
43+
44+
constructor(options: GetResponseOptions) {
45+
this.options = options;
46+
}
47+
48+
/**
49+
* Initialize the stream if not already started
50+
* This is idempotent - multiple calls will return the same promise
51+
*/
52+
private initStream(): Promise<void> {
53+
if (this.initPromise) {
54+
return this.initPromise;
55+
}
56+
57+
this.initPromise = (async () => {
58+
// Force stream mode
59+
const request = { ...this.options.request, stream: true as const };
60+
61+
// Create the stream promise
62+
this.streamPromise = betaResponsesSend(
63+
this.options.client,
64+
request,
65+
this.options.options,
66+
).then((result) => {
67+
if (!result.ok) {
68+
throw result.error;
69+
}
70+
return result.value;
71+
});
72+
73+
// Wait for the stream and create the reusable stream
74+
const eventStream = await this.streamPromise;
75+
this.reusableStream = new ReusableReadableStream(eventStream);
76+
})();
77+
78+
return this.initPromise;
79+
}
80+
81+
/**
82+
* Get the completed message from the response.
83+
* This will consume the stream until completion and extract the first message.
84+
* Returns an AssistantMessage in chat format.
85+
*/
86+
get message(): Promise<models.AssistantMessage> {
87+
if (this.messagePromise) {
88+
return this.messagePromise;
89+
}
90+
91+
this.messagePromise = (async (): Promise<models.AssistantMessage> => {
92+
await this.initStream();
93+
if (!this.reusableStream) {
94+
throw new Error("Stream not initialized");
95+
}
96+
97+
const completedResponse = await consumeStreamForCompletion(this.reusableStream);
98+
return extractMessageFromResponse(completedResponse);
99+
})();
100+
101+
return this.messagePromise;
102+
}
103+
104+
/**
105+
* Get just the text content from the response.
106+
* This will consume the stream until completion and extract the text.
107+
*/
108+
get text(): Promise<string> {
109+
if (this.textPromise) {
110+
return this.textPromise;
111+
}
112+
113+
this.textPromise = (async () => {
114+
await this.initStream();
115+
if (!this.reusableStream) {
116+
throw new Error("Stream not initialized");
117+
}
118+
119+
const completedResponse = await consumeStreamForCompletion(this.reusableStream);
120+
return extractTextFromResponse(completedResponse);
121+
})();
122+
123+
return this.textPromise;
124+
}
125+
126+
/**
127+
* Stream all response events as they arrive.
128+
* Multiple consumers can iterate over this stream concurrently.
129+
*/
130+
get fullResponsesStream(): AsyncIterableIterator<models.OpenResponsesStreamEvent> {
131+
return (async function* (this: ResponseWrapper) {
132+
await this.initStream();
133+
if (!this.reusableStream) {
134+
throw new Error("Stream not initialized");
135+
}
136+
137+
const consumer = this.reusableStream.createConsumer();
138+
yield* consumer;
139+
}.call(this));
140+
}
141+
142+
/**
143+
* Stream only text deltas as they arrive.
144+
* This filters the full event stream to only yield text content.
145+
*/
146+
get textStream(): AsyncIterableIterator<string> {
147+
return (async function* (this: ResponseWrapper) {
148+
await this.initStream();
149+
if (!this.reusableStream) {
150+
throw new Error("Stream not initialized");
151+
}
152+
153+
yield* extractTextDeltas(this.reusableStream);
154+
}.call(this));
155+
}
156+
157+
/**
158+
* Stream incremental message updates as content is added.
159+
* Each iteration yields an updated version of the message with new content.
160+
* Returns AssistantMessage in chat format.
161+
*/
162+
get newMessagesStream(): AsyncIterableIterator<models.AssistantMessage> {
163+
return (async function* (this: ResponseWrapper) {
164+
await this.initStream();
165+
if (!this.reusableStream) {
166+
throw new Error("Stream not initialized");
167+
}
168+
169+
yield* buildMessageStream(this.reusableStream);
170+
}.call(this));
171+
}
172+
173+
/**
174+
* Stream only reasoning deltas as they arrive.
175+
* This filters the full event stream to only yield reasoning content.
176+
*/
177+
get reasoningStream(): AsyncIterableIterator<string> {
178+
return (async function* (this: ResponseWrapper) {
179+
await this.initStream();
180+
if (!this.reusableStream) {
181+
throw new Error("Stream not initialized");
182+
}
183+
184+
yield* extractReasoningDeltas(this.reusableStream);
185+
}.call(this));
186+
}
187+
188+
/**
189+
* Stream only tool call argument deltas as they arrive.
190+
* This filters the full event stream to only yield function call arguments.
191+
*/
192+
get toolStream(): AsyncIterableIterator<string> {
193+
return (async function* (this: ResponseWrapper) {
194+
await this.initStream();
195+
if (!this.reusableStream) {
196+
throw new Error("Stream not initialized");
197+
}
198+
199+
yield* extractToolDeltas(this.reusableStream);
200+
}.call(this));
201+
}
202+
203+
/**
204+
* Stream events in chat format (compatibility layer).
205+
* Note: This transforms responses API events into a chat-like format.
206+
*
207+
* @remarks
208+
* This is a compatibility method that attempts to transform the responses API
209+
* stream into a format similar to the chat API. Due to differences in the APIs,
210+
* this may not be a perfect mapping.
211+
*/
212+
get fullChatStream(): AsyncIterableIterator<any> {
213+
return (async function* (this: ResponseWrapper) {
214+
await this.initStream();
215+
if (!this.reusableStream) {
216+
throw new Error("Stream not initialized");
217+
}
218+
219+
const consumer = this.reusableStream.createConsumer();
220+
221+
for await (const event of consumer) {
222+
if (!("type" in event)) continue;
223+
224+
// Transform responses events to chat-like format
225+
// This is a simplified transformation - you may need to adjust based on your needs
226+
if (event.type === "response.output_text.delta") {
227+
const deltaEvent = event as models.OpenResponsesStreamEventResponseOutputTextDelta;
228+
yield {
229+
type: "content.delta",
230+
delta: deltaEvent.delta,
231+
};
232+
} else if (event.type === "response.completed") {
233+
const completedEvent = event as models.OpenResponsesStreamEventResponseCompleted;
234+
yield {
235+
type: "message.complete",
236+
response: completedEvent.response,
237+
};
238+
} else {
239+
// Pass through other events
240+
yield {
241+
type: event.type,
242+
event,
243+
};
244+
}
245+
}
246+
}.call(this));
247+
}
248+
249+
/**
250+
* Cancel the underlying stream and all consumers
251+
*/
252+
async cancel(): Promise<void> {
253+
if (this.reusableStream) {
254+
await this.reusableStream.cancel();
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)