Skip to content

Commit 34929b9

Browse files
committed
Improve SimpleTextAggregator to handle multi-sentence chunks
1 parent e83ac82 commit 34929b9

File tree

4 files changed

+92
-20
lines changed

4 files changed

+92
-20
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1818

1919
### Changed
2020

21+
- Improved `SimpleTextAggregator` and `TTSService` to properly handle buffered
22+
sentences at the end of LLM responses. Previously, when an LLM response ended,
23+
any complete sentences remaining in the aggregator's buffer would be sent to
24+
TTS as one large chunk. Now these sentences are flushed individually, providing
25+
better interruption points. Added `flush_next_sentence()` method to
26+
`SimpleTextAggregator` to extract buffered sentences without adding new text.
27+
2128
- Added Hindi support for Rime TTS services.
2229

2330
- Updated `GeminiTTSService` to use Google Cloud Text-to-Speech streaming API

src/pipecat/services/tts_service.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
368368
# pause to avoid audio overlapping.
369369
await self._maybe_pause_frame_processing()
370370

371-
sentence = self._text_aggregator.text
371+
# Flush any remaining complete sentences from the aggregator.
372+
# This ensures all buffered sentences are sent to TTS individually.
373+
text = await self._text_aggregator.flush_next_sentence()
374+
while text:
375+
await self._push_tts_frames(text)
376+
text = await self._text_aggregator.flush_next_sentence()
377+
378+
# Send any remaining incomplete text
379+
remaining_text = self._text_aggregator.text
380+
if remaining_text:
381+
await self._push_tts_frames(remaining_text)
382+
372383
await self._text_aggregator.reset()
373384
self._processing_text = False
374-
await self._push_tts_frames(sentence)
375385
if isinstance(frame, LLMFullResponseEndFrame):
376386
if self._push_text_frames:
377387
await self.push_frame(frame, direction)

src/pipecat/utils/text/simple_text_aggregator.py

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,30 +41,57 @@ def text(self) -> str:
4141
"""
4242
return self._text
4343

44+
def _extract_next_sentence(self) -> Optional[str]:
45+
"""Extract the next complete sentence from the buffer.
46+
47+
Returns:
48+
The first complete sentence if a sentence boundary is found,
49+
or None if the buffer is empty or contains only incomplete text.
50+
"""
51+
eos_end_marker = match_endofsentence(self._text)
52+
if eos_end_marker:
53+
# Extract the first complete sentence
54+
sentence = self._text[:eos_end_marker]
55+
# Remove it from buffer
56+
self._text = self._text[eos_end_marker:]
57+
return sentence
58+
59+
return None
60+
4461
async def aggregate(self, text: str) -> Optional[str]:
45-
"""Aggregate text and return completed sentences.
62+
"""Aggregate text and return the first completed sentence.
4663
47-
Adds the new text to the buffer and checks for end-of-sentence markers.
48-
When a sentence boundary is found, returns the completed sentence and
49-
removes it from the buffer.
64+
Adds the new text to the buffer and checks for sentence boundaries.
65+
When a sentence boundary is found, returns the first completed sentence
66+
and removes it from the buffer. Subsequent calls (even with empty strings)
67+
will return additional complete sentences if they exist in the buffer.
68+
69+
This handles varying input patterns from different LLM providers:
70+
- Word-by-word tokens (e.g., 'Hello', '!', ' I', ' am', ' Doug.')
71+
- Chunks with one or more sentences (e.g., 'Hello! I am Doug. Nice to meet you!')
5072
5173
Args:
5274
text: New text to add to the aggregation buffer.
5375
5476
Returns:
55-
A complete sentence if an end-of-sentence marker is found,
77+
The first complete sentence if a sentence boundary is found,
5678
or None if more text is needed to complete a sentence.
5779
"""
58-
result: Optional[str] = None
59-
6080
self._text += text
81+
return self._extract_next_sentence()
6182

62-
eos_end_marker = match_endofsentence(self._text)
63-
if eos_end_marker:
64-
result = self._text[:eos_end_marker]
65-
self._text = self._text[eos_end_marker:]
83+
async def flush_next_sentence(self) -> Optional[str]:
84+
"""Retrieve the next complete sentence from the buffer without adding new text.
85+
86+
This method extracts the next complete sentence from the internal buffer
87+
without requiring new input text. It's useful for draining multiple
88+
complete sentences that were received in a single chunk.
6689
67-
return result
90+
Returns:
91+
The next complete sentence if one exists in the buffer, or None if
92+
the buffer is empty or contains only incomplete text.
93+
"""
94+
return self._extract_next_sentence()
6895

6996
async def handle_interruption(self):
7097
"""Handle interruptions by clearing the text buffer.

tests/test_simple_text_aggregator.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,39 @@ async def test_reset_aggregations(self):
1919
await self.aggregator.reset()
2020
assert self.aggregator.text == ""
2121

22-
async def test_simple_sentence(self):
23-
assert await self.aggregator.aggregate("Hello ") == None
24-
assert await self.aggregator.aggregate("Pipecat!") == "Hello Pipecat!"
22+
async def test_word_by_word(self):
23+
"""Test word-by-word token aggregation (e.g., OpenAI)."""
24+
assert await self.aggregator.aggregate("Hello") == None
25+
assert await self.aggregator.aggregate("!") == "Hello!"
26+
assert await self.aggregator.aggregate(" I") == None
27+
assert await self.aggregator.aggregate(" am") == None
28+
assert await self.aggregator.aggregate(" Doug.") == " I am Doug."
2529
assert self.aggregator.text == ""
2630

27-
async def test_multiple_sentences(self):
28-
assert await self.aggregator.aggregate("Hello Pipecat! How are ") == "Hello Pipecat!"
29-
assert await self.aggregator.aggregate("you?") == " How are you?"
31+
async def test_chunks_with_partial_sentences(self):
32+
"""Test chunks with partial sentences."""
33+
assert await self.aggregator.aggregate("Hey!") == "Hey!"
34+
assert await self.aggregator.aggregate(" Nice to meet you! So") == " Nice to meet you!"
35+
assert self.aggregator.text == " So"
36+
assert await self.aggregator.aggregate(" what") == None
37+
assert await self.aggregator.aggregate("'d you like?") == " So what'd you like?"
38+
39+
async def test_multi_sentence_chunk(self):
40+
"""Test chunks with multiple complete sentences."""
41+
result = await self.aggregator.aggregate("Hello! I am Doug. Nice to meet you!")
42+
assert result == "Hello!"
43+
# Drain remaining sentences
44+
assert await self.aggregator.flush_next_sentence() == " I am Doug."
45+
assert await self.aggregator.flush_next_sentence() == " Nice to meet you!"
46+
assert await self.aggregator.flush_next_sentence() == None
47+
assert self.aggregator.text == ""
48+
49+
async def test_flush_next_sentence_with_incomplete(self):
50+
"""Test flush_next_sentence with incomplete sentence in buffer."""
51+
assert await self.aggregator.aggregate("Hello! I am") == "Hello!"
52+
assert await self.aggregator.flush_next_sentence() == None
53+
assert self.aggregator.text == " I am"
54+
55+
async def test_flush_next_sentence_empty_buffer(self):
56+
"""Test flush_next_sentence with empty buffer."""
57+
assert await self.aggregator.flush_next_sentence() == None

0 commit comments

Comments
 (0)