Skip to content

Xrequest自定义配置的时候无法关闭streams的锁 #1649

@mhfe123

Description

@mhfe123

重现步骤

// 这是使用fetch的方案
const [provider] = React.useState(
      new CustomProvider<CustomMessage, CustomInput, CustomOutput>({
        request: XRequest<CustomInput, CustomOutput>(
          url,
          {
            manual: true,
            headers: {
              Accept: "text/event-stream",
              "access-token": localStorage.getItem("access-token") || "",
            },
            callbacks: {
              onSuccess: () => {
                setIsSending(false);
                console.log("onSuccess");
              },
              onError: (error) => {
                setIsSending(false);
                console.error("onError", error);
              },
              onUpdate: () => {
                console.log("onUpdate");
              },
            },
            // 自定义流转换器
            transformStream: new TransformStream<string, CustomOutput>({
              transform(chunk, controller) {
                console.log("transformStream", chunk);
                controller.enqueue({ data: chunk });
              },
            }),
            fetch: async (url, options) => {
              const response = await fetch(url, options);
              if (!response.body) {
                throw new Error("Response body is null");
              }
              const sseChunks: string[] = [];
              const rawStream = response.body;
              const textStream = rawStream.pipeThrough(
                new TextDecoderStream("utf-8")
              );
              const lineStream = textStream.pipeThrough(
                new TransformStream({
                  transform(chunk, controller) {
                    // SSE流按换行符分割,保证每次拿到完整的一行SSE数据
                    chunk
                      .split("\n")
                      .forEach((line) => controller.enqueue(line));
                  },
                })
              );

              // ✅ 最终:读取管道处理后的「新流」,无锁冲突!
              const reader = lineStream.getReader();
              const reSSELine = /^(data|event|retry|id)(\s*)/; // 匹配SSE合法字段名
              try {
                while (true) {
                  const { done, value } = await reader.read();
                  if (done) break;

                  let sseLine = value.trim();
                  // ✅ 核心兼容逻辑:修复SSE行格式,补上缺失的冒号
                  if (sseLine && !sseLine.includes(":")) {
                    // 情况1:有字段名但无冒号 → 补上冒号
                    if (reSSELine.test(sseLine)) {
                      sseLine = sseLine.replace(reSSELine, "$1:");
                    }
                    // 情况2:纯文本无字段名 → 补上 data: 前缀+冒号
                    else {
                      sseLine = `data: ${sseLine}`;
                    }
                  }
                  // ✅ 情况3:替换中文全角冒号为英文半角冒号
                  sseLine = sseLine.replace(/:/g, ":");

                  // 解析业务数据
                  if (sseLine.startsWith("data:")) {
                    const jsonStr = sseLine.slice(5).trim();
                    sseChunks.push(jsonStr);
                    // const data = JSON.parse(jsonStr);
                    // console.log("解析后的业务数据:", data);
                  }
                }
              } catch (err) {
                console.error("SSE读取异常:", err);
                setIsSending(false);
                if (reader) reader.releaseLock();
              } finally {
                setIsSending(false);
                console.log("SSE读取完毕");
                if (reader && !reader.closed) reader.releaseLock();
              }
              return new Response(
                new ReadableStream({
                  start(controller) {
                    for (const chunk of sseChunks) {
                      controller.enqueue(new TextEncoder().encode(chunk));
                    }
                    controller.close();
                  },
                }),
                {
                  headers: {
                    "Content-Type": "text/event-stream",
                  },
                }
              );
            },
          }
        ),
      })
    );
// 这是没有用fetch的方案
new CustomProvider<CustomMessage, CustomInput, CustomOutput>({
      request: XRequest<CustomInput, CustomOutput>(
         url,
        {
          manual: true,
          headers: {
            Accept: "text/event-stream",
            "access-token": localStorage.getItem("access-token") || "",
          },
          callbacks: {
            onSuccess: () => {
              console.log("onSuccess");
            },
            onError: (error) => {
              console.error("onError", error);
            },
            onUpdate: () => {
              console.log("onUpdate");
            },
          },
          // 自定义流转换器:解析完整SSE格式数据,兼容Windows和Unix换行符
          transformStream: new TransformStream<string, CustomOutput>({
            transform(chunk, controller) {
              // 处理SSE事件:event: metadata\ndata: {...}\nid: 0\n\n
              // 统一转换Windows换行符(\r\n)为Unix换行符(\n),并移除单独的\r字符
              const normalizedChunk = chunk
                .replace(/\r\n/g, "\n")
                .replace(/\r/g, "");

              // 分割成独立的SSE事件(每个事件以\n\n结束)
              const events = normalizedChunk
                .split("\n\n")
                .filter((event) => event.trim());

              for (const event of events) {
                // 提取data字段内容
                const dataLines = event
                  .split("\n")
                  .filter((line) => line.startsWith("data: "))
                  .map((line) => line.replace("data: ", ""))
                  .join("");

                if (dataLines) {
                  try {
                    // 解析JSON数据
                    const jsonData = JSON.parse(dataLines);
                    // 转换为CustomOutput格式
                    controller.enqueue({ data: JSON.stringify(jsonData) });
                  } catch (error) {
                    // 如果解析失败,直接使用原始数据
                    controller.enqueue({ data: dataLines });
                  }
                } else {
                  console.log("No data field in event:", event);
                  // 如果没有data字段,则直接使用原始数据
                  controller.enqueue({ data: event });
                }
              }
            },
          }),
        }
      ),
    });

当前行为

我在使用Xrequest做自定义的封装的时候,因为我当前使用的后端数据可能会有点不标准返回内容,需要我做一个手动处理,我在Xrequest中只使用transformStream的时候会造成第二次输入请求的时候流被锁定的问题,如果配合fetch使用的话又会导致它要等接口请求完成以后才会进行输出,有没有什么好的解决方案
报错信息:onError TypeError: Failed to execute 'getReader' on 'ReadableStream': ReadableStreamDefaultReader constructor can only accept readable streams that are not yet locked to a reader

预期行为

因为我看demo中只有单独Xrequest的场景,没有实际Provider中的场景,不是很清楚是我写法上有问题还是什么原因导致的

上下文

No response

版本

2.1.3

您在哪些浏览器上遇到了这个问题?

No response

Metadata

Metadata

Assignees

Labels

documentationImprovements or additions to documentation

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions