8
8
import sounddevice as sd
9
9
10
10
from agents import function_tool
11
- from agents .realtime import RealtimeAgent , RealtimeRunner , RealtimeSession , RealtimeSessionEvent
11
+ from agents .realtime import (
12
+ RealtimeAgent ,
13
+ RealtimePlaybackTracker ,
14
+ RealtimeRunner ,
15
+ RealtimeSession ,
16
+ RealtimeSessionEvent ,
17
+ )
18
+ from agents .realtime .model import RealtimeModelConfig
12
19
13
20
# Audio configuration
14
- CHUNK_LENGTH_S = 0.05 # 50ms
21
+ CHUNK_LENGTH_S = 0.04 # 40ms aligns with realtime defaults
15
22
SAMPLE_RATE = 24000
16
23
FORMAT = np .int16
17
24
CHANNELS = 1
@@ -49,11 +56,16 @@ def __init__(self) -> None:
49
56
self .audio_player : sd .OutputStream | None = None
50
57
self .recording = False
51
58
59
+ # Playback tracker lets the model know our real playback progress
60
+ self .playback_tracker = RealtimePlaybackTracker ()
61
+
52
62
# Audio output state for callback system
53
- self .output_queue : queue .Queue [Any ] = queue .Queue (maxsize = 10 ) # Buffer more chunks
63
+ # Store tuples: (samples_np, item_id, content_index)
64
+ self .output_queue : queue .Queue [Any ] = queue .Queue (maxsize = 100 )
54
65
self .interrupt_event = threading .Event ()
55
- self .current_audio_chunk : np .ndarray [Any , np .dtype [Any ]] | None = None
66
+ self .current_audio_chunk : tuple [ np .ndarray [Any , np .dtype [Any ]], str , int ] | None = None
56
67
self .chunk_position = 0
68
+ self .bytes_per_sample = np .dtype (FORMAT ).itemsize
57
69
58
70
def _output_callback (self , outdata , frames : int , time , status ) -> None :
59
71
"""Callback for audio output - handles continuous audio stream from server."""
@@ -92,20 +104,29 @@ def _output_callback(self, outdata, frames: int, time, status) -> None:
92
104
93
105
# Copy data from current chunk to output buffer
94
106
remaining_output = len (outdata ) - samples_filled
95
- remaining_chunk = len (self .current_audio_chunk ) - self .chunk_position
107
+ samples , item_id , content_index = self .current_audio_chunk
108
+ remaining_chunk = len (samples ) - self .chunk_position
96
109
samples_to_copy = min (remaining_output , remaining_chunk )
97
110
98
111
if samples_to_copy > 0 :
99
- chunk_data = self .current_audio_chunk [
100
- self .chunk_position : self .chunk_position + samples_to_copy
101
- ]
112
+ chunk_data = samples [self .chunk_position : self .chunk_position + samples_to_copy ]
102
113
# More efficient: direct assignment for mono audio instead of reshape
103
114
outdata [samples_filled : samples_filled + samples_to_copy , 0 ] = chunk_data
104
115
samples_filled += samples_to_copy
105
116
self .chunk_position += samples_to_copy
106
117
118
+ # Inform playback tracker about played bytes
119
+ try :
120
+ self .playback_tracker .on_play_bytes (
121
+ item_id = item_id ,
122
+ item_content_index = content_index ,
123
+ bytes = chunk_data .tobytes (),
124
+ )
125
+ except Exception :
126
+ pass
127
+
107
128
# If we've used up the entire chunk, reset for next iteration
108
- if self .chunk_position >= len (self . current_audio_chunk ):
129
+ if self .chunk_position >= len (samples ):
109
130
self .current_audio_chunk = None
110
131
self .chunk_position = 0
111
132
@@ -125,7 +146,15 @@ async def run(self) -> None:
125
146
126
147
try :
127
148
runner = RealtimeRunner (agent )
128
- async with await runner .run () as session :
149
+ # Attach playback tracker and disable server-side response interruption,
150
+ # which can truncate assistant audio when mic picks up speaker output.
151
+ model_config : RealtimeModelConfig = {
152
+ "playback_tracker" : self .playback_tracker ,
153
+ "initial_model_settings" : {
154
+ "turn_detection" : {"type" : "semantic_vad" , "interrupt_response" : False },
155
+ },
156
+ }
157
+ async with await runner .run (model_config = model_config ) as session :
129
158
self .session = session
130
159
print ("Connected. Starting audio recording..." )
131
160
@@ -170,6 +199,14 @@ async def capture_audio(self) -> None:
170
199
read_size = int (SAMPLE_RATE * CHUNK_LENGTH_S )
171
200
172
201
try :
202
+ # Simple energy-based barge-in: if user speaks while audio is playing, interrupt.
203
+ def rms_energy (samples : np .ndarray [Any , np .dtype [Any ]]) -> float :
204
+ if samples .size == 0 :
205
+ return 0.0
206
+ # Normalize int16 to [-1, 1]
207
+ x = samples .astype (np .float32 ) / 32768.0
208
+ return float (np .sqrt (np .mean (x * x )))
209
+
173
210
while self .recording :
174
211
# Check if there's enough data to read
175
212
if self .audio_stream .read_available < read_size :
@@ -182,8 +219,12 @@ async def capture_audio(self) -> None:
182
219
# Convert numpy array to bytes
183
220
audio_bytes = data .tobytes ()
184
221
185
- # Send audio to session
186
- await self .session .send_audio (audio_bytes )
222
+ # Half-duplex gating: do not send mic while assistant audio is playing
223
+ assistant_playing = (
224
+ self .current_audio_chunk is not None or not self .output_queue .empty ()
225
+ )
226
+ if not assistant_playing :
227
+ await self .session .send_audio (audio_bytes )
187
228
188
229
# Yield control back to event loop
189
230
await asyncio .sleep (0 )
@@ -212,17 +253,19 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
212
253
elif event .type == "audio_end" :
213
254
print ("Audio ended" )
214
255
elif event .type == "audio" :
215
- # Enqueue audio for callback-based playback
256
+ # Enqueue audio for callback-based playback with metadata
216
257
np_audio = np .frombuffer (event .audio .data , dtype = np .int16 )
217
258
try :
218
- self .output_queue .put_nowait (np_audio )
259
+ self .output_queue .put_nowait (( np_audio , event . item_id , event . content_index ) )
219
260
except queue .Full :
220
261
# Queue is full - only drop if we have significant backlog
221
262
# This prevents aggressive dropping that could cause choppiness
222
263
if self .output_queue .qsize () > 8 : # Keep some buffer
223
264
try :
224
265
self .output_queue .get_nowait ()
225
- self .output_queue .put_nowait (np_audio )
266
+ self .output_queue .put_nowait (
267
+ (np_audio , event .item_id , event .content_index )
268
+ )
226
269
except queue .Empty :
227
270
pass
228
271
# If queue isn't too full, just skip this chunk to avoid blocking
0 commit comments