Skip to content

Commit cbf0413

Browse files
committed
feat: usefetch support sse
1 parent 61eecc4 commit cbf0413

File tree

5 files changed

+726
-613
lines changed

5 files changed

+726
-613
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
"husky": "^9.1.6",
6565
"jsdom": "^25.0.1",
6666
"lint-staged": "^15.2.10",
67-
"msw": "1.0.1",
67+
"msw": "^2.10.5",
6868
"node-fetch": "^3.3.2",
6969
"prettier": "^3.3.3",
7070
"sass": "^1.81.0",

packages/core/useFetch/index.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,34 @@ export function useFetch<T>(
285285
response.value = fetchResponse;
286286
statusCode.value = fetchResponse.status;
287287

288-
responseData = await fetchResponse.clone()[config.type]();
288+
const isStream =
289+
fetchResponse.headers.get("Content-Type") === "text/event-stream" &&
290+
fetchResponse.body instanceof ReadableStream;
291+
292+
// handle stream response
293+
if (isStream) {
294+
const reader = fetchResponse
295+
.clone()
296+
.body?.getReader() as ReadableStreamDefaultReader<
297+
Uint8Array<ArrayBuffer>
298+
>;
299+
const decoder = new TextDecoder();
300+
let received = "";
301+
302+
while (true) {
303+
const { done, value } = await reader.read();
304+
305+
if (done) break;
306+
307+
const chunk = decoder.decode(value, { stream: true });
308+
promise$.next(chunk as T);
309+
received += chunk;
310+
}
311+
312+
responseData = received;
313+
} else {
314+
responseData = await fetchResponse.clone()[config.type]();
315+
}
289316

290317
// see: https://www.tjvantoll.com/2015/09/13/fetch-and-errors/
291318
if (!fetchResponse.ok) {
@@ -310,7 +337,9 @@ export function useFetch<T>(
310337
cacheSetting.expiration,
311338
);
312339
}
313-
promise$.next(Promise.resolve(responseData));
340+
if (!isStream) {
341+
promise$.next(Promise.resolve(responseData));
342+
}
314343

315344
responseEvent.trigger(fetchResponse);
316345
return responseData;

packages/core/useFetch/test/mockServer.ts

Lines changed: 151 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,127 @@
44
*/
55

66
import { setupServer } from "msw/node";
7-
import type { RestContext, RestRequest } from "msw";
8-
import { rest } from "msw";
7+
import { http, HttpResponse } from "msw";
98
import { afterAll, afterEach, beforeAll } from "vitest";
109

1110
const defaultJsonMessage = { hello: "world" };
1211
const defaultTextMessage = "Hello World";
1312
const baseUrl = "https://example.com";
1413

15-
function commonTransformers(req: RestRequest, _: any, ctx: RestContext) {
16-
const t: any[] = [];
17-
const qs = req.url.searchParams;
14+
/**
15+
* Create a Server-Sent Events (SSE) stream response
16+
* @param count Number of events to send (default: 3)
17+
* @param interval Interval between events in milliseconds (default: 1000)
18+
* @param customData Custom data to send in events
19+
*/
20+
function createSSEStream(count = 3, interval = 1000, customData?: string) {
21+
const encoder = new TextEncoder();
22+
23+
const stream = new ReadableStream({
24+
async start(controller) {
25+
try {
26+
for (let eventCount = 1; eventCount <= count; eventCount++) {
27+
let message;
28+
if (customData) {
29+
if (count <= customData.length) {
30+
// Split customData into count parts
31+
const partSize = Math.ceil(customData.length / count);
32+
const startIndex = (eventCount - 1) * partSize;
33+
const endIndex = Math.min(
34+
startIndex + partSize,
35+
customData.length,
36+
);
37+
message = customData.slice(startIndex, endIndex);
38+
} else {
39+
// Send one character at a time if count > customData length
40+
if (eventCount <= customData.length) {
41+
message = customData[eventCount - 1];
42+
} else {
43+
// If we've sent all characters, send empty or repeat last char
44+
message = "";
45+
}
46+
}
47+
} else {
48+
// Default structure
49+
message = `Event ${eventCount}`;
50+
}
51+
52+
const data = `{"type":"data","message":"${message}","timestamp":"${new Date().toISOString()}","count":${eventCount},"total":${count}}`;
53+
const event = `data: ${data}\n\n`;
54+
55+
controller.enqueue(encoder.encode(event));
56+
57+
// Wait for interval before next event (except for last event)
58+
if (eventCount < count) {
59+
await new Promise((resolve) => setTimeout(resolve, interval));
60+
}
61+
}
62+
63+
// Send final completion event
64+
const finalEvent = `data: {"type":"complete","message":"Stream completed","count":${count}}\n\n`;
65+
controller.enqueue(encoder.encode(finalEvent));
66+
controller.close();
67+
} catch (error) {
68+
controller.error(error);
69+
}
70+
},
71+
72+
cancel() {
73+
// Cleanup if stream is cancelled
74+
},
75+
});
76+
77+
return new Response(stream, {
78+
headers: {
79+
"Content-Type": "text/event-stream",
80+
"Cache-Control": "no-cache",
81+
Connection: "keep-alive",
82+
"Access-Control-Allow-Origin": "*",
83+
"Access-Control-Allow-Headers": "Cache-Control",
84+
},
85+
});
86+
}
87+
88+
function createResponse(request: Request) {
89+
const url = new URL(request.url);
90+
const qs = url.searchParams;
91+
92+
let response: Response;
1893

19-
if (qs.get("delay")) t.push(ctx.delay(Number(qs.get("delay"))));
20-
if (qs.get("status")) t.push(ctx.status(Number(qs.get("status"))));
21-
if (qs.get("text") != null) {
22-
t.push(ctx.text(qs.get("text") ?? defaultTextMessage));
94+
if (qs.get("stream") != null) {
95+
// Handle SSE stream response
96+
const count = qs.get("count") ? Number(qs.get("count")) : 3;
97+
const interval = qs.get("interval") ? Number(qs.get("interval")) : 1000;
98+
const customData = qs.get("data") || undefined;
99+
response = createSSEStream(count, interval, customData);
100+
} else if (qs.get("text") != null) {
101+
response = HttpResponse.text(qs.get("text") ?? defaultTextMessage);
23102
} else if (qs.get("json") != null) {
24103
const jsonVal = qs.get("json");
25-
const jsonTransformer = ctx.json(jsonVal?.length ? JSON.parse(jsonVal) : defaultJsonMessage);
26-
t.push(jsonTransformer);
104+
const jsonData = jsonVal?.length ? JSON.parse(jsonVal) : defaultJsonMessage;
105+
response = HttpResponse.json(jsonData);
106+
} else {
107+
response = HttpResponse.json(defaultJsonMessage);
108+
}
109+
110+
// Apply status code if specified
111+
if (qs.get("status")) {
112+
const status = Number(qs.get("status"));
113+
response = new HttpResponse(response.body, {
114+
status,
115+
headers: response.headers,
116+
});
27117
}
28-
return t;
118+
119+
// Apply delay if specified
120+
if (qs.get("delay")) {
121+
const delay = Number(qs.get("delay"));
122+
return new Promise<Response>((resolve) => {
123+
setTimeout(() => resolve(response), delay);
124+
});
125+
}
126+
127+
return response;
29128
}
30129

31130
/**
@@ -36,31 +135,57 @@ function commonTransformers(req: RestRequest, _: any, ctx: RestContext) {
36135
* @example https://example.com?delay=1000 will respond in 1000ms.
37136
* @example https://example.com?status=301&text=thanks&delay=1000
38137
* will respond in 1000ms with statusCode 300 and the response body "thanks" as a string
138+
* @example https://example.com?stream will respond with a Server-Sent Events (SSE) stream
139+
* @example https://example.com?stream&count=5&interval=500
140+
* will send 5 SSE events with 500ms interval between each event
141+
* @example https://example.com?stream&data=hello
142+
* will send SSE events with custom message "hello"
143+
* @example https://example.com?stream&count=5&data=hello
144+
* will send each character "h", "e", "l", "l", "o" in separate events
145+
* @example https://example.com?stream&count=2&data=hello
146+
* will split "hello" into 2 parts: "hel" and "lo"
39147
*/
40148
const server = setupServer(
41-
rest.post(baseUrl, (req, res, ctx) => {
42-
// Support all the normal examples (delay, status, text, and json)
43-
const t = commonTransformers(req, res, ctx);
149+
http.post(baseUrl, async ({ request }) => {
150+
const url = new URL(request.url);
151+
const qs = url.searchParams;
44152

45-
// Echo back the request payload
46-
if (typeof req.body === "number" || typeof req.body === "string") t.push(ctx.text(String(req.body)));
47-
else t.push(ctx.json(req.body));
153+
// Check if query parameters specify response format
154+
if (
155+
qs.get("text") != null ||
156+
qs.get("json") != null ||
157+
qs.get("stream") != null ||
158+
qs.get("status") ||
159+
qs.get("delay")
160+
) {
161+
const response = createResponse(request);
162+
return response instanceof Promise ? await response : response;
163+
}
48164

49-
return res(...t);
165+
// Default behavior: Echo back the request payload for POST requests
166+
const body = await request.text();
167+
try {
168+
const jsonBody = JSON.parse(body);
169+
return HttpResponse.json(jsonBody);
170+
} catch {
171+
return HttpResponse.text(body || defaultTextMessage);
172+
}
50173
}),
51174

52-
rest.get(baseUrl, (req, res, ctx) => {
53-
return res(...commonTransformers(req, res, ctx));
175+
http.get(baseUrl, async ({ request }) => {
176+
const response = createResponse(request);
177+
return response instanceof Promise ? await response : response;
54178
}),
55179

56180
// Another duplicate route for the sole purpose of re-triggering requests on url change.
57-
rest.get(`${baseUrl}/test`, (req, res, ctx) => {
58-
return res(...commonTransformers(req, res, ctx));
181+
http.get(`${baseUrl}/test`, async ({ request }) => {
182+
const response = createResponse(request);
183+
return response instanceof Promise ? await response : response;
59184
}),
60185

61-
rest.get(`${baseUrl}/url`, (req, res, ctx) => {
62-
return res(ctx.text(req.url.toString()));
63-
})
186+
http.get(`${baseUrl}/url`, ({ request }) => {
187+
return HttpResponse.text(request.url.toString());
188+
}),
64189
);
65190

66191
beforeAll(() => server.listen());

0 commit comments

Comments
 (0)