Skip to content

🐛 The streaming output was cut off due to the truncation of Chinese characters (likely a UTF-8 encoding issue). #104

@jousinse

Description

@jousinse

Bug description

Usage Scenario: Streaming Chinese Character Reception
The following are partial logs:
Chunk 10] Event: Chunk, len=184, total=893
[Chunk 11] Event: Chunk, len=204, total=1097
[Chunk 12] Event: Chunk, len=231, total=1328
[Chunk 13] Event: Chunk, len=199, total=1527
[Chunk 14] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: incomplete utf-8 byte sequence from index 6
[Chunk 15] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: invalid utf-8 sequence of 1 bytes from index 0
[Chunk 16] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: invalid utf-8 sequence of 1 bytes from index 0
[Chunk 17] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: incomplete utf-8 byte sequence from index 3
[Chunk 18] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: invalid utf-8 sequence of 1 bytes from index 0
[Chunk 19] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: invalid utf-8 sequence of 1 bytes from index 0
[Chunk 20] Event: Chunk, len=191, total=1718
[Chunk 21] Event: Chunk, len=215, total=1933
[Chunk 22] Event: Chunk, len=236, total=2169
[Chunk 23] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: incomplete utf-8 byte sequence from index 3
[Chunk 24] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: invalid utf-8 sequence of 1 bytes from index 0
[Chunk 25] ERROR: Web stream error for model 'gemini-2.5-flash (adapter: Gemini)'.
Cause: invalid utf-8 sequence of 1 bytes from index 0
[Chunk 26] Event: Chunk, len=166, total=2335
[Chunk 27] Event: Chunk, len=198, total=2533
Follow Code:

pub async fn chat_with_context_stream_genai(
        &self,
        text: &str,
        context: &str,
        tx: tokio::sync::mpsc::UnboundedSender<Result<String, SystemError>>,
    ) -> Result<(), SystemError> {
        use futures::StreamExt;

        let chat_req =
            ChatRequest::new(vec![ChatMessage::system(context), ChatMessage::user(text)]);

        let chat_stream_response = self
            .client
            .exec_chat_stream(&self.model, chat_req, None)
            .await
            .map_err(|e| SystemError::LlmError(format!("Gemini stream error: {}", e)))?;

        let mut stream = chat_stream_response.stream;

        let mut error_log: Vec<String> = Vec::new();
        let mut chunk_count = 0;
        let mut total_text_len = 0;

        while let Some(stream_result) = stream.next().await {
            chunk_count += 1;

            match stream_result {
                Ok(chat_event) => {
                    use genai::chat::ChatStreamEvent;

                    match chat_event {
                        ChatStreamEvent::Start => {
                            error_log.push(format!("[Chunk {}] Event: Start", chunk_count));
                        }
                        ChatStreamEvent::Chunk(chunk) => {
                            let text = &chunk.content;
                            total_text_len += text.len();
                            error_log.push(format!(
                                "[Chunk {}] Event: Chunk, len={}, total={}",
                                chunk_count,
                                text.len(),
                                total_text_len
                            ));

                            if !text.is_empty() {
                                if tx.send(Ok(text.clone())).is_err() {
                                    return Err(SystemError::ChannelSendError(
                                        "Stream receiver closed".to_string(),
                                    ));
                                }
                            }
                        }
                        ChatStreamEvent::ReasoningChunk(chunk) => {
                            let text = &chunk.content;
                            error_log.push(format!(
                                "[Chunk {}] Event: ReasoningChunk, len={}",
                                chunk_count,
                                text.len()
                            ));

                            if !text.is_empty() {
                                if tx.send(Ok(text.clone())).is_err() {
                                    return Err(SystemError::ChannelSendError(
                                        "Stream receiver closed".to_string(),
                                    ));
                                }
                            }
                        }
                        ChatStreamEvent::ToolCallChunk(_) => {
                            error_log.push(format!("[Chunk {}] Event: ToolCallChunk", chunk_count));
                        }
                        ChatStreamEvent::End(end_data) => {
                            error_log.push(format!(
                                "[Chunk {}] Event: End, data={:?}",
                                chunk_count, end_data
                            ));
                            break;
                        }
                    }
                }
                Err(e) => {
                    let error_msg = e.to_string();
                    error_log.push(format!("[Chunk {}] ERROR: {}", chunk_count, error_msg));

                    eprintln!("\n========== GENAI STREAM ERROR LOG ==========");
                    eprintln!("Model: {}", self.model);
                    eprintln!("Total chunks received: {}", chunk_count);
                    eprintln!("Total text length: {} bytes", total_text_len);
                    eprintln!("\n--- Event Log ---");
                    for log in &error_log {
                        eprintln!("{}", log);
                    }
                    eprintln!("\n--- Error Details ---");
                    eprintln!("Error type: {:?}", std::any::type_name_of_val(&e));
                    eprintln!("Error message: {}", error_msg);
                    eprintln!("=============================================\n");


                    let error_msg_lower = error_msg.to_lowercase();
                    let is_utf8_error = error_msg_lower.contains("utf-8")
                        || error_msg_lower.contains("utf8")
                        || (error_msg_lower.contains("invalid")
                            && error_msg_lower.contains("sequence"));

                    if is_utf8_error {
                        continue;
                    }

                    let error = SystemError::LlmError(format!("Stream error: {}", e));
                    let _ = tx.send(Err(error.clone()));
                    return Err(error);
                }
            }
        }

        eprintln!("\n========== GENAI STREAM SUMMARY ==========");
        eprintln!("Model: {}", self.model);
        eprintln!("Total chunks: {}", chunk_count);
        eprintln!("Total text length: {} bytes", total_text_len);
        eprintln!("==========================================\n");

        Ok(())
    }

Adapter

No response

Model

No response

Suggested Resolution

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions