6767 LLMUserAggregatorParams ,
6868)
6969from pipecat .processors .frame_processor import FrameDirection , FrameProcessor
70- from pipecat .utils .string import match_endofsentence
70+ from pipecat .utils .text .base_text_aggregator import BaseTextAggregator
71+ from pipecat .utils .text .simple_text_aggregator import SimpleTextAggregator
7172from pipecat .utils .time import time_now_iso8601
7273
7374
@@ -568,7 +569,9 @@ def __init__(
568569 self ._function_calls_in_progress : Dict [str , Optional [FunctionCallInProgressFrame ]] = {}
569570 self ._context_updated_tasks : Set [asyncio .Task ] = set ()
570571
571- self ._llm_aggregation : str = ""
572+ self ._llm_text_aggregator : BaseTextAggregator = (
573+ self ._params .llm_text_aggregator or SimpleTextAggregator ()
574+ )
572575 self ._skip_tts : Optional [bool ] = None
573576
574577 @property
@@ -803,8 +806,6 @@ async def _handle_llm_start(self, frame: LLMFullResponseStartFrame):
803806
804807 async def _handle_llm_text (self , frame : LLMTextFrame ):
805808 await self ._handle_text (frame )
806- if self ._skip_tts or frame .skip_tts :
807- self ._llm_aggregation += frame .text
808809 await self ._maybe_push_llm_aggregation (frame )
809810
810811 async def _handle_llm_end (self , frame : LLMFullResponseEndFrame ):
@@ -815,30 +816,28 @@ async def _handle_llm_end(self, frame: LLMFullResponseEndFrame):
815816 async def _maybe_push_llm_aggregation (
816817 self , frame : LLMFullResponseStartFrame | LLMTextFrame | LLMFullResponseEndFrame
817818 ):
818- should_push = False
819+ aggregate = None
820+ should_reset_aggregator = False
819821 if self ._skip_tts and not frame .skip_tts :
820822 # if the skip_tts flag switches, to false, push the current aggregation
821- should_push = True
823+ aggregate = self ._llm_text_aggregator .text
824+ should_reset_aggregator = True
822825 self ._skip_tts = frame .skip_tts
823826 if self ._skip_tts :
824827 if self ._skip_tts and isinstance (frame , LLMFullResponseEndFrame ):
825828 # on end frame, always push the aggregation
826- should_push = True
827- elif len (self ._llm_aggregation ) > 0 and match_endofsentence (self ._llm_aggregation ):
828- # push aggregation on end of sentence
829- should_push = True
830-
831- if not should_push :
832- return
829+ aggregate = self ._llm_text_aggregator .text
830+ should_reset_aggregator = True
831+ elif isinstance (frame , LLMTextFrame ):
832+ aggregate = await self ._llm_text_aggregator .aggregate (frame .text )
833833
834- text = self ._llm_aggregation .lstrip ("\n " )
835- if not text .strip ():
836- # don't push empty text
834+ if not aggregate :
837835 return
838836
839- llm_frame = AggregatedLLMTextFrame (text = text , aggregated_by = "sentence" )
837+ llm_frame = AggregatedLLMTextFrame (text = aggregate . text , aggregated_by = aggregate . type )
840838 await self .push_frame (llm_frame )
841- self ._llm_aggregation = ""
839+ if should_reset_aggregator :
840+ await self ._llm_text_aggregator .reset ()
842841
843842 async def _handle_text (self , frame : TextFrame ):
844843 if not self ._started or not frame .append_to_context :
0 commit comments