Skip to content

Commit 1a14f7f

Browse files
committed
No need to use keys and a Map for stream management
1 parent 2246e6e commit 1a14f7f

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

packages/core/src/v3/realtimeStreams/manager.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
1919
private baseUrl: string,
2020
private debug: boolean = false
2121
) {}
22-
// Add a Map to track active streams with their abort controllers
23-
private activeStreams = new Map<
24-
string,
25-
{ wait: () => Promise<void>; abortController: AbortController }
26-
>();
22+
// Track active streams - using a Set allows multiple streams for the same key to coexist
23+
private activeStreams = new Set<{
24+
wait: () => Promise<void>;
25+
abortController: AbortController;
26+
}>();
2727

2828
reset(): void {
2929
this.activeStreams.clear();
@@ -85,10 +85,12 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
8585
maxRetries: parsedResponse.maxRetries,
8686
});
8787

88-
this.activeStreams.set(key, { wait: () => streamInstance.wait(), abortController });
88+
// Register this stream
89+
const streamInfo = { wait: () => streamInstance.wait(), abortController };
90+
this.activeStreams.add(streamInfo);
8991

9092
// Clean up when stream completes
91-
streamInstance.wait().finally(() => this.activeStreams.delete(key));
93+
streamInstance.wait().finally(() => this.activeStreams.delete(streamInfo));
9294

9395
return {
9496
wait: () => streamInstance.wait(),
@@ -108,7 +110,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
108110
return;
109111
}
110112

111-
const promises = Array.from(this.activeStreams.values()).map((stream) => stream.wait());
113+
const promises = Array.from(this.activeStreams).map((stream) => stream.wait());
112114

113115
// Create a timeout promise that resolves to a special sentinel value
114116
const TIMEOUT_SENTINEL = Symbol("timeout");
@@ -123,9 +125,9 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
123125
if (result === TIMEOUT_SENTINEL) {
124126
// Timeout occurred - abort all active streams
125127
const abortedCount = this.activeStreams.size;
126-
for (const [key, streamInfo] of this.activeStreams.entries()) {
128+
for (const streamInfo of this.activeStreams) {
127129
streamInfo.abortController.abort();
128-
this.activeStreams.delete(key);
130+
this.activeStreams.delete(streamInfo);
129131
}
130132

131133
throw new Error(

0 commit comments

Comments
 (0)