diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 0b55082aab..805433223d 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -702,6 +702,7 @@ async def __start(self, frame: StartFrame): self._interruption_strategies = frame.interruption_strategies self._report_only_initial_ttfb = frame.report_only_initial_ttfb + # Create process task now that we know the direct mode setting self.__create_process_task() async def __cancel(self, frame: CancelFrame): @@ -886,14 +887,21 @@ async def __input_frame_task_handler(self): (frame, direction, callback) = await self.__input_queue.get() - if isinstance(frame, SystemFrame): + if isinstance(frame, SystemFrame) or self._enable_direct_mode: await self.__process_frame(frame, direction, callback) - elif self.__process_queue: - await self.__process_queue.put((frame, direction, callback)) else: - raise RuntimeError( - f"{self}: __process_queue is None when processing frame {frame.name}" - ) + # Create process task lazily if it doesn't exist yet + if not hasattr(self, '_FrameProcessor__process_queue') or not self.__process_queue: + logger.debug(f"{self}: Creating process task lazily, direct_mode={self._enable_direct_mode}") + self.__create_process_task() + + if hasattr(self, '_FrameProcessor__process_queue') and self.__process_queue: + await self.__process_queue.put((frame, direction, callback)) + else: + logger.error(f"{self}: __process_queue still None after creation attempt, direct_mode={self._enable_direct_mode}, frame={frame.name}") + raise RuntimeError( + f"{self}: __process_queue is None when processing frame {frame.name}" + ) self.__input_queue.task_done()