Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion livekit-agents/livekit/agents/voice/speech_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,17 @@ async def wait_for_playout(self) -> None:
"To wait for the assistant’s spoken response prior to running this tool, use `RunContext.wait_for_playout()` instead."
)

await asyncio.shield(self._done_fut)
# Wait for whichever resolves first: full playout (_done_fut) or
# interruption (_interrupt_fut). Without the interrupt branch,
# callers of wait_for_playout() block until INTERRUPTION_TIMEOUT
# force-kills them when the user interrupts during playout.
done, _ = await asyncio.wait(
[
asyncio.ensure_future(asyncio.shield(self._done_fut)),
asyncio.ensure_future(asyncio.shield(self._interrupt_fut)),
],
return_when=asyncio.FIRST_COMPLETED,
)
Comment on lines +186 to +192
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Pending wrapper tasks from asyncio.wait are never cancelled, causing task leaks

Each call to wait_for_playout() creates two wrapper tasks via asyncio.ensure_future(asyncio.shield(...)). When asyncio.wait returns after the first future completes, the second wrapper task is left pending and never cancelled or cleaned up. In the common case (speech completes normally without interruption), the shield(self._interrupt_fut) wrapper task will never complete because _interrupt_fut is never resolved for non-interrupted speech — it is only set in _cancel() at speech_handle.py:226. These orphaned tasks accumulate over the lifetime of the process and produce Task was destroyed but it is pending! warnings when garbage collected.

Contrast with the established cleanup pattern in the same file

The wait_if_not_interrupted method at livekit-agents/livekit/agents/voice/speech_handle.py:211-219 follows the correct pattern: after asyncio.wait, it explicitly cancels and awaits the pending wrapper future. The new code in wait_for_playout should do the same for its pending wrapper tasks.

Suggested change
done, _ = await asyncio.wait(
[
asyncio.ensure_future(asyncio.shield(self._done_fut)),
asyncio.ensure_future(asyncio.shield(self._interrupt_fut)),
],
return_when=asyncio.FIRST_COMPLETED,
)
shield_done = asyncio.ensure_future(asyncio.shield(self._done_fut))
shield_interrupt = asyncio.ensure_future(asyncio.shield(self._interrupt_fut))
_, pending = await asyncio.wait(
[shield_done, shield_interrupt],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
with contextlib.suppress(asyncio.CancelledError):
task.cancel()
await task
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


def __await__(self) -> Generator[None, None, SpeechHandle]:
async def _await_impl() -> SpeechHandle:
Expand Down
Loading