Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 118 additions & 70 deletions agents/src/stream/deferred_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,126 @@
import { type ReadableStream } from 'node:stream/web';
import { IdentityTransform } from './identity_transform.js';

/**
* Check if error is related to reader.read after release lock
*
* Invalid state: Releasing reader
* Invalid state: The reader is not attached to a stream
*/
function isStreamReaderReleaseError(e: unknown) {
const allowedMessages = [
'Invalid state: Releasing reader',
'Invalid state: The reader is not attached to a stream',
];

if (e instanceof TypeError) {
return allowedMessages.some((message) => e.message.includes(message));
}
// /**
// * Check if error is related to reader.read after release lock
// *
// * Invalid state: Releasing reader
// * Invalid state: The reader is not attached to a stream
// */
// function isStreamReaderReleaseError(e: unknown) {
// const allowedMessages = [
// 'Invalid state: Releasing reader',
// 'Invalid state: The reader is not attached to a stream',
// ];

return false;
}
// if (e instanceof TypeError) {
// return allowedMessages.some((message) => e.message.includes(message));
// }

export class DeferredReadableStream<T> {
private transform: IdentityTransform<T>;
private writer: WritableStreamDefaultWriter<T>;
private sourceReader?: ReadableStreamDefaultReader<T>;
// return false;
// }

// export class DeferredReadableStream<T> {
// private transform: IdentityTransform<T>;
// private writer: WritableStreamDefaultWriter<T>;
// private sourceReader?: ReadableStreamDefaultReader<T>;

// constructor() {
// this.transform = new IdentityTransform<T>();
// this.writer = this.transform.writable.getWriter();
// }

// get stream() {
// return this.transform.readable;
// }

// get isSourceSet() {
// return !!this.sourceReader;
// }

// /**
// * Call once the actual source is ready.
// */
// setSource(source: ReadableStream<T>) {
// if (this.isSourceSet) {
// throw new Error('Stream source already set');
// }

// this.sourceReader = source.getReader();
// this.pump();
// }

// private async pump() {
// let sourceError: unknown;

// try {
// while (true) {
// const { done, value } = await this.sourceReader!.read();
// if (done) break;
// await this.writer.write(value);
// }
// } catch (e) {
// if (isStreamReaderReleaseError(e)) return;
// sourceError = e;
// } finally {
// // any other error from source will be propagated to the consumer
// if (sourceError) {
// this.writer.abort(sourceError);
// return;
// }

// // release lock so this.stream.getReader().read() will terminate with done: true
// this.writer.releaseLock();

// // we only close the writable stream after done
// try {
// await this.transform.writable.close();
// // NOTE: we do not cancel this.transform.readable as there might be access to
// // this.transform.readable.getReader() outside that blocks this cancellation
// // hence, user is responsible for canceling reader on their own
// } catch (e) {
// // ignore TypeError: Invalid state: WritableStream is closed
// // in case stream reader is already closed, this will throw
// // but we ignore it as we are closing the stream anyway
// }
// }
// }

// /**
// * Detach the source stream and clean up resources.
// */
// async detachSource() {
// if (!this.isSourceSet) {
// throw new Error('Source not set');
// }

// // release lock will make any pending read() throw TypeError
// // which are expected, and we intentionally catch those error
// // using isStreamReaderReleaseError
// // this will unblock any pending read() inside the async for loop
// this.sourceReader!.releaseLock();
// }
// }

class SourceDetachedError extends Error {
constructor() {
this.transform = new IdentityTransform<T>();
this.writer = this.transform.writable.getWriter();
super('Source detached');
this.name = 'SourceDetachedError';
}
}

export class DeferredReadableStream<T> {
private transform: IdentityTransform<T>;
private abortController: AbortController;
private source?: ReadableStream<T>;

get stream() {
return this.transform.readable;
}

get isSourceSet() {
return !!this.sourceReader;
constructor() {
this.abortController = new AbortController();
this.transform = new IdentityTransform<T>();
}

/**
Expand All @@ -48,59 +133,22 @@ export class DeferredReadableStream<T> {
if (this.isSourceSet) {
throw new Error('Stream source already set');
}

this.sourceReader = source.getReader();
this.pump();
source.pipeTo(this.transform.writable, {
signal: this.abortController.signal,
});
this.source = source;
}

private async pump() {
let sourceError: unknown;

try {
while (true) {
const { done, value } = await this.sourceReader!.read();
if (done) break;
await this.writer.write(value);
}
} catch (e) {
if (isStreamReaderReleaseError(e)) return;
sourceError = e;
} finally {
// any other error from source will be propagated to the consumer
if (sourceError) {
this.writer.abort(sourceError);
return;
}

// release lock so this.stream.getReader().read() will terminate with done: true
this.writer.releaseLock();

// we only close the writable stream after done
try {
await this.transform.writable.close();
// NOTE: we do not cancel this.transform.readable as there might be access to
// this.transform.readable.getReader() outside that blocks this cancellation
// hence, user is responsible for canceling reader on their own
} catch (e) {
// ignore TypeError: Invalid state: WritableStream is closed
// in case stream reader is already closed, this will throw
// but we ignore it as we are closing the stream anyway
}
}
get isSourceSet() {
return !!this.source;
}

/**
* Detach the source stream and clean up resources.
*/
async detachSource() {
if (!this.isSourceSet) {
throw new Error('Source not set');
}

// release lock will make any pending read() throw TypeError
// which are expected, and we intentionally catch those error
// using isStreamReaderReleaseError
// this will unblock any pending read() inside the async for loop
this.sourceReader!.releaseLock();
this.abortController.abort(new SourceDetachedError());
this.source = undefined;
}
}
14 changes: 11 additions & 3 deletions agents/src/stream/identity_transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ import { TransformStream } from 'node:stream/web';

export class IdentityTransform<T> extends TransformStream<T, T> {
constructor() {
super({
transform: (chunk, controller) => controller.enqueue(chunk),
});
super(
{
transform: (chunk, controller) => controller.enqueue(chunk),
},
{
highWaterMark: 1,
},
{
highWaterMark: 1,
},
);
}
}
62 changes: 30 additions & 32 deletions agents/tests/stream/deferred_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { ReadableStream } from 'node:stream/web';
import { describe, expect, it } from 'vitest';
import { DeferredReadableStream } from '../../src/stream/deferred_stream.js';


describe('DeferredReadableStream', { timeout: 2000 }, () => {
it('should create a readable stream that can be read after setting source', async () => {
const deferred = new DeferredReadableStream<string>();
Expand Down Expand Up @@ -490,11 +489,11 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {

// The read should reject with the error
try {
await readPromise;
expect.fail('readPromise should have rejected');
await readPromise;
expect.fail('readPromise should have rejected');
} catch (e: any) {
expect(e).toBeInstanceOf(Error);
expect(e.message).toBe('Source error');
expect(e).toBeInstanceOf(Error);
expect(e.message).toBe('Source error');
}

reader.releaseLock();
Expand Down Expand Up @@ -540,7 +539,7 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
start(controller) {
controller.enqueue('data');
controller.close();
}
},
});

deferred.setSource(source);
Expand All @@ -554,8 +553,7 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
reader.releaseLock();
});


it('reads after detaching source should return undefined', async () => {
it('reads after detaching source should throw SourceDetachedError', async () => {
const deferred = new DeferredReadableStream<string>();
const reader = deferred.stream.getReader();
const readPromise = reader.read();
Expand All @@ -565,7 +563,7 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
controller.enqueue('first');
controller.enqueue('second');
controller.close();
}
},
});

deferred.setSource(source);
Expand All @@ -577,20 +575,18 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
expect(result.done).toBe(false);
expect(result.value).toBe('first');

const result2 = await reader.read();
expect(result2.done).toBe(true);
expect(result2.value).toBeUndefined();
reader.releaseLock();
await expect(reader.read()).rejects.toThrow('Source detached');

const reader2 = source.getReader();
const result3 = await reader2.read();
expect(result3.done).toBe(false);
expect(result3.value).toBe('second');
// TODO: not sure what expected behavior is here after detaching
// const reader2 = source.getReader();
// const result3 = await reader2.read();
// expect(result3.done).toBe(false);
// expect(result3.value).toBe('second');

const result4 = await reader2.read();
expect(result4.done).toBe(true);
expect(result4.value).toBeUndefined();
reader.releaseLock();
// // const result4 = await reader2.read();
// // expect(result4.done).toBe(true);
// // expect(result4.value).toBeUndefined();
// reader2.releaseLock();
});

it('should handle empty source stream', async () => {
Expand Down Expand Up @@ -619,7 +615,7 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {

it('source can be set by another deferred stream after calling detach', async () => {
const deferred = new DeferredReadableStream<string>();

// Create a new source stream
const source = new ReadableStream<string>({
start(controller) {
Expand All @@ -643,6 +639,8 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {

// read second chunk
const result2 = await result2Promise;

// TODO: I don't think we would want a stream to transition
expect(result2.done).toBe(true);
expect(result2.value).toBeUndefined();

Expand All @@ -666,19 +664,19 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
reader2.releaseLock();
});

it("a non-terminating source reader releases lock after detaching", async () => {
it('a non-terminating source reader releases lock after detaching', async () => {
const deferred = new DeferredReadableStream<string>();
const reader = deferred.stream.getReader();
const readPromise = reader.read();
let resumeSource = false;

const source = new ReadableStream<string>({
async start(controller) {
while (!resumeSource) await delay(10);
async start(controller) {
while (!resumeSource) await delay(10);

controller.enqueue('data');
controller.close();
}
controller.enqueue('data');
controller.close();
},
});

deferred.setSource(source);
Expand All @@ -703,12 +701,12 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
expect(result3.value).toBeUndefined();

reader2.releaseLock();
})
});

it("should transfer source between deferred streams while reading is ongoing", async () => {
it('should transfer source between deferred streams while reading is ongoing', async () => {
const deferred1 = new DeferredReadableStream<string>();
const deferred2 = new DeferredReadableStream<string>();

// Create a source that slowly emits data
let emitCount = 0;
const source = new ReadableStream<string>({
Expand All @@ -720,7 +718,7 @@ describe('DeferredReadableStream', { timeout: 2000 }, () => {
await delay(20); // Small delay between chunks
}
controller.close();
}
},
});

deferred1.setSource(source);
Expand Down
Loading