6868 LLMUserAggregatorParams ,
6969)
7070from pipecat .processors .frame_processor import FrameDirection , FrameProcessor
71- from pipecat .utils .string import concatenate_aggregated_text , match_endofsentence
71+ from pipecat .utils .string import concatenate_aggregated_text
72+ from pipecat .utils .text .base_text_aggregator import BaseTextAggregator
73+ from pipecat .utils .text .simple_text_aggregator import SimpleTextAggregator
7274from pipecat .utils .time import time_now_iso8601
7375
7476
@@ -589,7 +591,9 @@ def __init__(
589591 self ._function_calls_in_progress : Dict [str , Optional [FunctionCallInProgressFrame ]] = {}
590592 self ._context_updated_tasks : Set [asyncio .Task ] = set ()
591593
592- self ._llm_aggregation : str = ""
594+ self ._llm_text_aggregator : BaseTextAggregator = (
595+ self ._params .llm_text_aggregator or SimpleTextAggregator ()
596+ )
593597 self ._skip_tts : Optional [bool ] = None
594598
595599 @property
@@ -813,8 +817,6 @@ async def _handle_llm_start(self, frame: LLMFullResponseStartFrame):
813817
814818 async def _handle_llm_text (self , frame : LLMTextFrame ):
815819 await self ._handle_text (frame )
816- if self ._skip_tts or frame .skip_tts :
817- self ._llm_aggregation += frame .text
818820 await self ._maybe_push_llm_aggregation (frame )
819821
820822 async def _handle_llm_end (self , frame : LLMFullResponseEndFrame ):
@@ -825,30 +827,28 @@ async def _handle_llm_end(self, frame: LLMFullResponseEndFrame):
825827 async def _maybe_push_llm_aggregation (
826828 self , frame : LLMFullResponseStartFrame | LLMTextFrame | LLMFullResponseEndFrame
827829 ):
828- should_push = False
830+ aggregate = None
831+ should_reset_aggregator = False
829832 if self ._skip_tts and not frame .skip_tts :
830833 # if the skip_tts flag switches, to false, push the current aggregation
831- should_push = True
834+ aggregate = self ._llm_text_aggregator .text
835+ should_reset_aggregator = True
832836 self ._skip_tts = frame .skip_tts
833837 if self ._skip_tts :
834838 if self ._skip_tts and isinstance (frame , LLMFullResponseEndFrame ):
835839 # on end frame, always push the aggregation
836- should_push = True
837- elif len (self ._llm_aggregation ) > 0 and match_endofsentence (self ._llm_aggregation ):
838- # push aggregation on end of sentence
839- should_push = True
840-
841- if not should_push :
842- return
840+ aggregate = self ._llm_text_aggregator .text
841+ should_reset_aggregator = True
842+ elif isinstance (frame , LLMTextFrame ):
843+ aggregate = await self ._llm_text_aggregator .aggregate (frame .text )
843844
844- text = self ._llm_aggregation .lstrip ("\n " )
845- if not text .strip ():
846- # don't push empty text
845+ if not aggregate :
847846 return
848847
849- llm_frame = AggregatedLLMTextFrame (text = text , aggregated_by = "sentence" )
848+ llm_frame = AggregatedLLMTextFrame (text = aggregate . text , aggregated_by = aggregate . type )
850849 await self .push_frame (llm_frame )
851- self ._llm_aggregation = ""
850+ if should_reset_aggregator :
851+ await self ._llm_text_aggregator .reset ()
852852
853853 async def _handle_text (self , frame : TextFrame ):
854854 if not self ._started or not frame .append_to_context :
0 commit comments