Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/pipecat/processors/frame_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ async def __start(self, frame: StartFrame):
self._interruption_strategies = frame.interruption_strategies
self._report_only_initial_ttfb = frame.report_only_initial_ttfb

# Create process task now that we know the direct mode setting
self.__create_process_task()

async def __cancel(self, frame: CancelFrame):
Expand Down Expand Up @@ -886,14 +887,21 @@ async def __input_frame_task_handler(self):

(frame, direction, callback) = await self.__input_queue.get()

if isinstance(frame, SystemFrame):
if isinstance(frame, SystemFrame) or self._enable_direct_mode:
await self.__process_frame(frame, direction, callback)
elif self.__process_queue:
await self.__process_queue.put((frame, direction, callback))
else:
raise RuntimeError(
f"{self}: __process_queue is None when processing frame {frame.name}"
)
# Create process task lazily if it doesn't exist yet
if not hasattr(self, '_FrameProcessor__process_queue') or not self.__process_queue:
logger.debug(f"{self}: Creating process task lazily, direct_mode={self._enable_direct_mode}")
self.__create_process_task()

if hasattr(self, '_FrameProcessor__process_queue') and self.__process_queue:
await self.__process_queue.put((frame, direction, callback))
else:
logger.error(f"{self}: __process_queue still None after creation attempt, direct_mode={self._enable_direct_mode}, frame={frame.name}")
raise RuntimeError(
f"{self}: __process_queue is None when processing frame {frame.name}"
)

self.__input_queue.task_done()

Expand Down