Skip to content

Conversation

@balthazur
Copy link
Contributor

@balthazur balthazur commented Nov 11, 2025

Description

This PR adds a new way of providing frames for our webrtc streaming implemention by creating a new datachannel where we can send extracted frames from a recorded video, and are guaranteed to have them all processed in the provided quality.

Update, implemented binary chunk streaming for frame ingestion and sending back data via the datachannel. both directions:

Corresponding app PR: https://github.com/roboflow/roboflow/pull/8537

Overview

Both directions (client → server and server → client) use the same binary chunking protocol with a 12-byte header followed by payload data. This ensures:

  • ✅ Consistent protocol in both directions
  • ✅ Automatic handling of large messages (>48KB)
  • ✅ Efficient binary transmission (no base64 overhead)
  • ✅ Simple reassembly logic

Binary Message Format

┌─────────────┬───────────────┬────────────────┬──────────────┐
│  frame_id   │ chunk_index   │ total_chunks   │   payload    │
│   4 bytes   │   4 bytes     │   4 bytes      │   N bytes    │
│  (uint32)   │  (uint32)     │  (uint32)      │  (binary)    │
└─────────────┴───────────────┴────────────────┴──────────────┘

All integers are uint32 little-endian.

Header Fields:

  • frame_id: Sequential frame identifier
  • chunk_index: 0-based index of this chunk (0 for first chunk)
  • total_chunks: Total number of chunks for this frame (1 if no chunking needed)
  • payload: Chunk data (JPEG bytes or JSON UTF-8 bytes)

Payload Types:

  • Client → Server: JPEG image bytes
  • Server → Client: JSON response as UTF-8 bytes

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • This change requires a documentation update

How has this change been tested, please provide a testcase or example of how you tested the change?

See comment below

Any specific deployment considerations

For example, documentation changes, usability, usage/costs, secrets, etc.

Docs

  • Docs updated? What were the changes:

Note

Implements a binary chunked data-channel protocol for client→server frame ingestion and server→client responses, with a new use_data_channel_frames mode enabling data-only processing without a media track.

  • WebRTC data channel protocol
    • Add binary chunking helpers: create_chunked_binary_message, parse_chunked_binary_message, ChunkReassembler, send_chunked_data (48KB chunks).
    • Switch outbound responses to binary JSON via send_chunked_data (replaces plain text .send).
  • Frame ingestion via data channel
    • New use_data_channel_frames flag in WebRTCWorkerRequest and processing classes.
    • Handle upstream_frames data channel: reassemble JPEG chunks, decode with OpenCV, enqueue frames for processing.
    • Start process_frames_data_only() when using data-channel frames and no video track.
  • Processing pipeline updates
    • VideoFrameProcessor/VideoTransformTrackWithLoop accept use_data_channel_frames; manage internal frame queue and received frame IDs.
    • Data-only loop can consume frames from data channel or media track; maintains termination and error handling.
    • Continue supporting dynamic data_output and stream_output adjustments via control channel.

Written by Cursor Bugbot for commit d7e3295. This will update automatically on new commits. Configure here.

@balthazur
Copy link
Contributor Author

Run a test with this small test script and the following terminal command (you need to reference a video on your machine):

Command:

python examples/webrtc_sdk/test_dc_minimal.py \
  ~/Downloads/times_square_2025-08-10_07-02-07.mp4 \
  100

Script:

"""Minimal test - just 10 frames."""
import sys
print("Starting...", flush=True)

import asyncio
import base64
import cv2
import json
import requests
from aiortc import RTCPeerConnection, RTCSessionDescription

print("Imports done", flush=True)

VIDEO_PATH = sys.argv[1] if len(sys.argv) > 1 else "/Users/balthasar/Downloads/times_square_2025-08-10_07-02-07.mp4"
MAX_FRAMES = int(sys.argv[2]) if len(sys.argv) > 2 else 100

async def test():
    print(f"Testing with {MAX_FRAMES} frames from {VIDEO_PATH}", flush=True)
    
    pc = RTCPeerConnection()
    received = [0]
    
    inference_ch = pc.createDataChannel("inference")
    upstream_ch = pc.createDataChannel("upstream_frames")
    
    @inference_ch.on("message")
    def on_msg(msg):
        received[0] += 1
        print(f"✓ Response {received[0]}", flush=True)
    
    @upstream_ch.on("open")
    def send():
        print("Channel open, sending frames...", flush=True)
        cap = cv2.VideoCapture(VIDEO_PATH)
        for i in range(1, MAX_FRAMES + 1):
            ret, frame = cap.read()
            if not ret:
                break
            _, buf = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
            msg = json.dumps({"type": "frame", "frame_id": i, "image": base64.b64encode(buf).decode()})
            upstream_ch.send(msg)
            print(f"  Sent frame {i}", flush=True)
        cap.release()
        print(f"All {MAX_FRAMES} frames sent!", flush=True)
    
    print("Creating offer...", flush=True)
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)
    
    while pc.iceGatheringState != "complete":
        await asyncio.sleep(0.1)
    
    print("Posting to server...", flush=True)
    resp = requests.post("http://localhost:9001/initialise_webrtc_worker", json={
        "api_key": "LKgvRJqgdbCml2ONofEx",
        "workflow_configuration": {
            "type": "WorkflowConfiguration",
            "workflow_id": "custom-workflow-3",
            "workspace_name": "leandro-starter",
            "image_input_name": "image",
        },
        "webrtc_offer": {"type": pc.localDescription.type, "sdp": pc.localDescription.sdp},
        "output_mode": "data_only",
        "use_data_channel_frames": True,
        "data_output": None,
    })
    
    if resp.status_code != 200:
        print(f"ERROR: {resp.status_code} - {resp.text}", flush=True)
        return False
    
    print(f"Server responded: {resp.status_code}", flush=True)
    answer = resp.json()
    
    await pc.setRemoteDescription(RTCSessionDescription(sdp=answer["sdp"], type=answer["type"]))
    print("Session established!", flush=True)
    
    # Wait for responses (must keep connection alive!)
    timeout_seconds = 300  # 5 minutes
    for i in range(timeout_seconds):
        await asyncio.sleep(1)
        if received[0] >= MAX_FRAMES:
            print(f"All responses received after {i}s!", flush=True)
            break
        if i > 0 and i % 30 == 0:
            print(f"  Still waiting... {received[0]}/{MAX_FRAMES} after {i}s", flush=True)
    
    print(f"\nFINAL: {received[0]}/{MAX_FRAMES} responses", flush=True)
    
    # Keep connection alive a bit longer before closing
    await asyncio.sleep(2)
    await pc.close()
    return received[0] == MAX_FRAMES

if __name__ == "__main__":
    success = asyncio.run(test())
    print(f"{'✅ SUCCESS' if success else '❌ FAILED'}", flush=True)
    sys.exit(0 if success else 1)

@balthazur balthazur requested a review from lrosemberg November 11, 2025 16:13
@balthazur balthazur self-assigned this Nov 11, 2025
@balthazur balthazur marked this pull request as ready for review November 11, 2025 16:13
@balthazur balthazur marked this pull request as draft November 13, 2025 11:22
@balthazur
Copy link
Contributor Author

not ready yet

Base automatically changed from lean/webrtc-data-decoupling to main November 13, 2025 12:00
@grzegorz-roboflow grzegorz-roboflow changed the base branch from main to feat/modal-tags November 18, 2025 18:24
Base automatically changed from feat/modal-tags to main November 18, 2025 19:02
@balthazur balthazur marked this pull request as ready for review November 19, 2025 07:50
@balthazur
Copy link
Contributor Author

bugbot run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants