Skip to content

Commit dbba990

Browse files
longcwtheomonnom
authored andcommitted
text input from datastream for v1.0 (#1521)
1 parent 4289429 commit dbba990

File tree

4 files changed

+49
-8
lines changed

4 files changed

+49
-8
lines changed

livekit-agents/livekit/agents/llm/realtime.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class InputSpeechStartedEvent:
1818

1919
@dataclass
2020
class InputSpeechStoppedEvent:
21-
pass
21+
user_transcription_enabled: bool
2222

2323

2424
@dataclass

livekit-agents/livekit/agents/pipeline/pipeline_agent.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ def generate_reply(
255255
allow_interruptions=allow_interruptions,
256256
)
257257

258+
def interrupt(self) -> None:
259+
if self._activity is None:
260+
raise ValueError("PipelineAgent isn't running")
261+
262+
self._activity.interrupt()
263+
258264
def update_task(self, task: AgentTask) -> None:
259265
self._agent_task = task
260266

livekit-agents/livekit/agents/pipeline/room_io.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
@dataclass(frozen=True)
1919
class RoomInputOptions:
20+
text_enabled: bool = True
21+
"""Whether to subscribe to text input"""
2022
audio_enabled: bool = True
2123
"""Whether to subscribe to audio"""
2224
video_enabled: bool = False
@@ -48,6 +50,7 @@ class RoomOutputOptions:
4850
DEFAULT_ROOM_INPUT_OPTIONS = RoomInputOptions()
4951
DEFAULT_ROOM_OUTPUT_OPTIONS = RoomOutputOptions()
5052
LK_PUBLISH_FOR_ATTR = "lk.publish_for"
53+
LK_TEXT_INPUT_TOPIC = "lk.room_text_input"
5154

5255

5356
class BaseStreamHandle:
@@ -226,6 +229,7 @@ def __init__(
226229
"""
227230
self._options = options
228231
self._room = room
232+
self._agent: Optional["PipelineAgent"] = None
229233
self._tasks: set[asyncio.Task] = set()
230234

231235
# target participant
@@ -263,6 +267,12 @@ def __init__(
263267
for participant in self._room.remote_participants.values():
264268
self._on_participant_connected(participant)
265269

270+
# text input from datastream
271+
if options.text_enabled:
272+
self._room.register_text_stream_handler(
273+
LK_TEXT_INPUT_TOPIC, self._on_text_input
274+
)
275+
266276
@property
267277
def audio(self) -> AsyncIterator[rtc.AudioFrame] | None:
268278
if not self._audio_handle:
@@ -287,7 +297,9 @@ async def start(self, agent: Optional["PipelineAgent"] = None) -> None:
287297
# link to the first connected participant if not set
288298
self.set_participant(participant.identity)
289299

290-
if not agent:
300+
# TODO(long): should we force the agent to be set or provide a set_agent method?
301+
self._agent = agent
302+
if not self._agent:
291303
return
292304

293305
agent.input.audio = self.audio
@@ -399,6 +411,28 @@ async def _capture_text():
399411
self._tasks.add(task)
400412
task.add_done_callback(self._tasks.discard)
401413

414+
def _on_text_input(
415+
self, reader: rtc.TextStreamReader, participant_identity: str
416+
) -> None:
417+
if participant_identity != self._participant_identity:
418+
return
419+
420+
async def _read_text():
421+
if not self._agent:
422+
return
423+
424+
text = await reader.read_all()
425+
logger.debug(
426+
"received text input",
427+
extra={"text": text, "participant": self._participant_identity},
428+
)
429+
self._agent.interrupt()
430+
self._agent.generate_reply(user_input=text)
431+
432+
task = asyncio.create_task(_read_text())
433+
self._tasks.add(task)
434+
task.add_done_callback(self._tasks.discard)
435+
402436
async def aclose(self) -> None:
403437
self._room.off("participant_connected", self._on_participant_connected)
404438
self._room.off("participant_disconnected", self._on_participant_disconnected)

livekit-agents/livekit/agents/pipeline/task_activity.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -319,14 +319,15 @@ def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None:
319319
log_event("input_speech_started")
320320
self.interrupt() # input_speech_started is also interrupting on the serverside realtime session
321321

322-
def _on_input_speech_stopped(self, _: llm.InputSpeechStoppedEvent) -> None:
322+
def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None:
323323
log_event("input_speech_stopped")
324-
self.on_interim_transcript(
325-
stt.SpeechEvent(
326-
stt.SpeechEventType.INTERIM_TRANSCRIPT,
327-
alternatives=[stt.SpeechData(text="", language="")],
324+
if ev.user_transcription_enabled:
325+
self.on_interim_transcript(
326+
stt.SpeechEvent(
327+
stt.SpeechEventType.INTERIM_TRANSCRIPT,
328+
alternatives=[stt.SpeechData(text="", language="")],
329+
)
328330
)
329-
)
330331

331332
def _on_input_audio_transcription_completed(
332333
self, ev: llm.InputTranscriptionCompleted

0 commit comments

Comments
 (0)