feat: WebSocket streaming with VAD, backpressure and Multilingual Support#29
feat: WebSocket streaming with VAD, backpressure and Multilingual Support#29OmPradnya wants to merge 1 commit intosusiai:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a real-time Speech-to-Text (STT) streaming capability via a new WebSocket endpoint (/stt/stream) in the Flask service, alongside queue backpressure controls, worker-pool configurability, and updates intended to improve transcript validity filtering (including multilingual support).
Changes:
- Add
/stt/streamWebSocket streaming flow with session tracking and server→client transcript events (plus basic VAD and queue-overload signaling). - Introduce bounded queue ingestion (
try_enqueue) with overflow policies and configurable worker pool sizing via environment variables. - Add local regression / manual test scripts for HTTP endpoints and WebSocket streaming.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 21 comments.
Show a summary per file
| File | Description |
|---|---|
flask/transcribe_server.py |
Adds worker-pool startup, bounded queue ingestion/backpressure, WS route wiring, locking changes, validity filter updates, and latency logging. |
flask/streaming_stt_ws.py |
Implements the /stt/stream WebSocket handler, protocol parsing, VAD for JSON audio, and queue-overload signaling. |
flask/websocket_manager.py |
Adds session registry and per-session outbound queues used to decouple worker threads from WS I/O. |
flask/stt_ingest.py |
Adds normalized enqueue tuples with monotonic timestamps and bounded-queue overflow handling. |
flask/stt_config.py |
Adds env-driven configuration for workers, queue size, and overflow policy. |
flask/stress_ingest_test.py |
Adds a small stress helper to exercise queue overflow behavior. |
test/run_tests.py |
Adds a regression test runner covering HTTP endpoints and the WS streaming endpoint. |
test/test_stt_stream_client.py |
Adds a manual WS client for streaming audio (fake or WAV-based). |
requirements.txt |
Adds server-side WebSocket dependencies (flask-sock, simple-websocket). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| p = argparse.ArgumentParser() | ||
| p.add_argument("--base", default="http://127.0.0.1:5040") | ||
| p.add_argument("--requests", type=int, default=30) |
There was a problem hiding this comment.
This helper defaults to port 5040 (--base default), but transcribe_server.py now runs on 5055 by default. As written, running it without --base will hit the wrong port. Update the default (and the example in the docstring) to match the server port or read it from env.
| sentences = request.args.get('sentences', default='false') == 'true' | ||
| if sentences == 'true': t = merge_and_split_transcripts(t) | ||
| if sentences == 'true': | ||
| t = merge_and_split_transcripts(t) |
There was a problem hiding this comment.
Same sentences flag bug as above: sentences is a boolean but is compared to 'true', so sentence merging never runs. Use if sentences: (and ensure merge_and_split_transcripts handles the dict shape in transcriptd).
| sentences = request.args.get('sentences', default='false') == 'true' | ||
| if sentences == 'true': t = merge_and_split_transcripts(t) | ||
| if sentences == 'true': | ||
| t = merge_and_split_transcripts(t) |
There was a problem hiding this comment.
Same sentences flag bug: sentences is computed as a boolean then compared to 'true', so merge/split is never applied. Use if sentences:.
| list = {k: v for k, v in t.items() if int(fromid) <= int(k) <= int(untilid)} | ||
| return jsonify(list) |
There was a problem hiding this comment.
Avoid using list as a variable name since it shadows Python’s built-in list type, making debugging and type tooling harder. Rename to something like transcripts_filtered or items.
| list = {k: v for k, v in t.items() if int(fromid) <= int(k) <= int(untilid)} | |
| return jsonify(list) | |
| transcripts_filtered = {k: v for k, v in t.items() if int(fromid) <= int(k) <= int(untilid)} | |
| return jsonify(transcripts_filtered) |
| if msg_type == "ping": | ||
| out = session_out_queue(session_id) | ||
| if out: | ||
| out.put_nowait(json.dumps({"type": "pong", "session_id": session_id})) |
There was a problem hiding this comment.
out.put_nowait(...) can raise queue.Full when the per-session outbound queue is saturated (e.g., client spams pings or is slow to read). Right now that exception will bubble up and terminate the WS handler loop. Catch queue_mod.Full here (similar to _notify_queue_overload) to avoid disconnecting clients due to backpressure.
| out.put_nowait(json.dumps({"type": "pong", "session_id": session_id})) | |
| try: | |
| out.put_nowait(json.dumps({"type": "pong", "session_id": session_id})) | |
| except queue_mod.Full: | |
| pass |
| sentences = request.args.get('sentences', default='false') == 'true' | ||
| if sentences == 'true': t = merge_and_split_transcripts(t) | ||
| if sentences == 'true': | ||
| t = merge_and_split_transcripts(t) |
There was a problem hiding this comment.
Same sentences flag bug: sentences is computed as a boolean then compared to 'true'. This prevents sentence mode from running. Use if sentences:.
| sentences = request.args.get('sentences', default='false') == 'true' | ||
| if sentences == 'true': | ||
| t = merge_and_split_transcripts(t) |
There was a problem hiding this comment.
Same sentences flag bug: sentences is a boolean but compared to 'true', so sentence mode never triggers. Use if sentences:.
| sentences = request.args.get('sentences', default='false') == 'true' | ||
| if sentences == 'true': | ||
| t = merge_and_split_transcripts(t) |
There was a problem hiding this comment.
Same sentences flag bug: sentences is a boolean but compared to 'true', so the flag is ignored. Use if sentences:.
| transcripts = transcriptd[tenant_id] | ||
| to_delete = [chunk_id for chunk_id in transcripts if int(chunk_id) < two_hours_ago] | ||
| for chunk_id in to_delete: | ||
| old_chunks = [chunk_id for chunk_id in transcripts if int(chunk_id) < two_hours_ago] |
There was a problem hiding this comment.
clean_old_transcripts() assumes chunk_id is always numeric (int(chunk_id)), but chunk IDs come from client input (including WebSocket) and can be non-numeric. A non-numeric key will raise ValueError and break cleanup (and potentially other endpoints that do int(k)). Consider validating chunk_id at ingest time, or filtering with chunk_id.isdigit() before converting.
| old_chunks = [chunk_id for chunk_id in transcripts if int(chunk_id) < two_hours_ago] | |
| old_chunks = [ | |
| chunk_id | |
| for chunk_id in transcripts | |
| if (isinstance(chunk_id, int) or (isinstance(chunk_id, str) and chunk_id.isdigit())) | |
| and int(chunk_id) < two_hours_ago | |
| ] |
| # Step 2 — send set_chunk control message | ||
| await ws.send(json.dumps({ | ||
| "type": "set_chunk", | ||
| "chunk_id": WS_CHUNK_ID, | ||
| "tenant_id": TENANT_ID | ||
| })) | ||
| await asyncio.sleep(0.2) | ||
|
|
||
| # Step 3 — send binary PCM chunks | ||
| for i in range(0, len(pcm), chunk_size): | ||
| await ws.send(pcm[i:i + chunk_size]) | ||
| await asyncio.sleep(0.1) | ||
|
|
There was a problem hiding this comment.
The WS test never sends a finalize_chunk control message. Since the server only guarantees a final transcript on finalize, this test can be flaky (it relies on an interim emit_stream_update being produced and not dropped). Consider sending {"type":"finalize_chunk","chunk_id":...} after audio is sent and waiting specifically for is_final=true for that chunk.
summary:
This PR adds a Speech-To-Text Websocket streaming endpoint('/stt/stream'). Production readiness is also achieved, improvements in the existing pipeline of HTTP
This PR adds:
1)Websocket streaming endpoint -
/stt/stream-client opens a websocket connection and receive transcript events as Whisper completes its each job (no polling needed)
Protocol:
Both JSON (base64) and raw binary (int16 LE PCM) audio are supported.
before enqueueing audio to whisper added energy based silence detection in
streaming_stt_ws.pywhich results in: reduced whisper hallucinations on silence and saves CPU on noise live event audiostt_config.py—STT_NUM_WORKERS(default 1 for CPU safety),STT_MAX_QUEUE_SIZE(default 1000),STT_QUEUE_OVERFLOW_POLICY(
rejectordrop_oldest)stt_ingest.py—try_enqueue()with bounded queue and overflowhandling; monotonic timestamp stamped at enqueue for latency tracking
transcribe_server.py— global_transcript_lockand_dequeue_coalesce_lockreplace per-callthreading.Lock()(previous code created a new lock per call, providing no protection)
NOTE FOR CPU :
STT_NUM_WORKERS=1is kept as default for local. This is done to avoid KV ache thread collisions. Can be set to 2+ when using GPUPOST /transcribe: rejected jobs now return"status": "rejected"with"error": "queue_full"instead ofsilently dropping
{"type": "error", "code": "queue_overload"}is sent toclient when queue is full
drop_oldestpolicy is available via env var for live streamingscenarios where recency matters more than completeness
Previous filter blocked legitimate speech like you (Exact match), thank you(substring)
NOW only know whisper hallucinations are blocked :
{"thanks for watching", "click, click", "click click", "cough cough"}Exact-match forbidden strings reduced to:
{"eh.", "bye.", "it's fine"}ASCII check removed — multilingual transcription now works correctly.
Every Whisper job logs:
stt_latency_ms= chunk_id= session_id= qsize= workers=
What was NOT changed
/transcribe,/get_transcript,/get_first_transcript,/pop_first_transcript,/get_latest_transcript,/pop_latest_transcript,/delete_transcript,/list_transcripts,/transcripts_size)WHISPER_SERVER_USE,WHISPER_MODEL,WHISPER_SERVER)New dependencies
Added to
requirements.txt.Environment variables (new)
| Variable | Default | Description |
|
STT_NUM_WORKERS|1| Worker threads for Whisper ||
STT_MAX_QUEUE_SIZE|1000| Max queued jobs ||
STT_QUEUE_OVERFLOW_POLICY|reject|rejectordrop_oldest|Detailed changes in
transcribe_server.pyThis was the only file modified. All other files in this PR are new.
Imports added
flask_sock.Sock— WebSocket supportfrom websocket_manager import emit_stream_updatefrom stt_config import STT_NUM_WORKERS, STT_MAX_QUEUE_SIZE, STT_QUEUE_OVERFLOW_POLICYfrom stt_ingest import try_enqueueThread safety — fixed critical bug
Before:
This created a brand new lock object on every call to
process_audio,meaning it never actually synchronized anything across threads.
After:
Queue — unbounded → bounded with overflow policy
Before:
After:
Worker pool — single hardcoded thread → configurable
Before:
After:
Whisper server branch — fixed silent crash
Before: When
use_whisper_server=trueandqsize > 20, the codeposted to HTTP but never set
result, then immediately triedresult['text']→UnboundLocalErrorcrash.After: HTTP server path always builds
resultwith atextfieldbefore accessing it, for both load regimes.
is_valid() — multilingual and accuracy fix
Before: Blocked
"you"(exact),"thank you"(substring),all non-Latin scripts (ASCII-only check).
After: Only blocks known Whisper hallucinations. ASCII check
removed. Multilingual transcription works correctly.
WebSocket endpoint added
Latency logging added
Timestamp captured at enqueue in
stt_ingest.pyso queue waittime is included in the measurement.
debug mode and port
Before:
app.run(host='0.0.0.0', port=5055, debug=True)After:
app.run(host='0.0.0.0', port=5055, debug=False)Real-Time STT Architecture
A complete WebSocket-based real-time STT pipeline is implemented.
Audio is streamed from client to server continuously over a persistent
WebSocket connection, processed by Whisper, and transcripts are pushed
back to the client instantly — no polling required.
Current status:
The system is real-time capable inference latency is the only bottleneck and is purely a hardware constraint, not an architectural one.
PIPLE LINE BUILT:
Client streams audio → Server receives instantly →
Whisper processes → Result pushed back to client immediately
TESTS
I have checked all the endpoints, nothing has broken the core logic, all the HTTP endpoints are working, infact i have also created a additional test file:
run_tests.pywhich have 9 TEST CASES.The approach was the test file was tested before the feature was added where the 8th TEST FAILED( of the new WEBSOCKET feature), the same test file was tested after the new feature was added, all 9 TEST CASES PASSED.
Another Test Method:
To reproduce:
python susi_translator/test/test_stt_stream_client.py --url ws://127.0.0.1:5055/stt/stream --wav test_audio.wavwhere test_audio.wav is your audio file.
Requirements: 16kHz mono 16-bit WAV file.
Convert any audio with:
ffmpeg -i input.wav -ar 16000 -ac 1 -sample_fmt s16 output.wav[PORT MAY DIFFER]
Noise Conditions
Tested against real world noise conditions:
-sudden loud sounds (SNEEZE)
-Rustling a wrapper infront of the mic
-natural speech in the presence of background noises
In all the above 3 conditions the pipeline transcribed accurate output. No degradation in output quality. Noise robustness confirmed.
Multilingual Support - Actually Verified
Removing the ASCII-only check in
is_valid()unblocks Whisper'snative multilingual capability. Tested and verified:
OUTPUT IS FROM ACTUAL LOGS
For Hindi audio, output was:
अपका नाम क्या है?For Marathi audio, output was:
Hello तुज़ा नाओ काय आहे?Both transcribed correctly with VALID status in server logs.
The previous ASCII filter was silently dropping all non-Latin output.
@Orbiter would appreciate your review, especially on the:
-VAD threshold (currently 200)
-the STT_NUM_WORKERS default for production.
Should I proceed with documentation next?