Skip to content

Pre-connect audio buffer #830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ios/Classes/AudioRenderer.swift
2 changes: 2 additions & 0 deletions lib/livekit_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ library livekit_client;

export 'src/constants.dart';
export 'src/core/room.dart';
export 'src/core/room_preconnect.dart';
export 'src/data_stream/stream_reader.dart';
export 'src/data_stream/stream_writer.dart';
export 'src/e2ee/e2ee_manager.dart';
Expand All @@ -34,6 +35,7 @@ export 'src/options.dart';
export 'src/participant/local.dart';
export 'src/participant/participant.dart';
export 'src/participant/remote.dart';
export 'src/preconnect/pre_connect_audio_buffer.dart';
export 'src/publication/local.dart';
export 'src/publication/remote.dart';
export 'src/publication/track_publication.dart';
Expand Down
4 changes: 4 additions & 0 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import '../options.dart';
import '../participant/local.dart';
import '../participant/participant.dart';
import '../participant/remote.dart';
import '../preconnect/pre_connect_audio_buffer.dart';
import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../support/disposable.dart';
Expand Down Expand Up @@ -135,6 +136,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {

final Map<String, TextStreamHandler> _textStreamHandlers = {};

@internal
late final preConnectAudioBuffer = PreConnectAudioBuffer(this);

// for testing
@internal
Map<String, RpcRequestHandler> get rpcHandlers => _rpcHandlers;
Expand Down
26 changes: 26 additions & 0 deletions lib/src/core/room_preconnect.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import 'dart:async';

import '../logger.dart';
import '../preconnect/pre_connect_audio_buffer.dart';
import 'room.dart';

extension RoomPreConnect on Room {
/// Wrap an async operation while a pre-connect audio buffer records.
/// Stops and flushes on error.
Future<T> withPreConnectAudio<T>(
Future<T> Function() operation, {
Duration timeout = const Duration(seconds: 10),
PreConnectOnError? onError,
}) async {
await preConnectAudioBuffer.startRecording(timeout: timeout);
try {
final result = await operation();
return result;
} catch (error) {
logger.warning('[Preconnect] operation failed with error: $error');
rethrow;
} finally {
await preConnectAudioBuffer.reset();
}
}
}
9 changes: 8 additions & 1 deletion lib/src/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,17 @@ class AudioPublishOptions extends PublishOptions {
/// max audio bitrate
final int audioBitrate;

/// Mark this audio as originating from a pre-connect buffer.
/// Used to populate protobuf audioFeatures (TF_PRECONNECT_BUFFER).
final bool preConnect;

const AudioPublishOptions({
super.name,
super.stream,
this.dtx = true,
this.red = true,
this.audioBitrate = AudioPreset.music,
this.preConnect = false,
});

AudioPublishOptions copyWith({
Expand All @@ -340,18 +345,20 @@ class AudioPublishOptions extends PublishOptions {
String? name,
String? stream,
bool? red,
bool? preConnect,
}) =>
AudioPublishOptions(
dtx: dtx ?? this.dtx,
audioBitrate: audioBitrate ?? this.audioBitrate,
name: name ?? this.name,
stream: stream ?? this.stream,
red: red ?? this.red,
preConnect: preConnect ?? this.preConnect,
);

@override
String toString() =>
'${runtimeType}(dtx: ${dtx}, audioBitrate: ${audioBitrate}, red: ${red})';
'${runtimeType}(dtx: ${dtx}, audioBitrate: ${audioBitrate}, red: ${red}, preConnect: ${preConnect})';
}

final backupCodecs = ['vp8', 'h264'];
Expand Down
8 changes: 8 additions & 0 deletions lib/src/participant/local.dart
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
encryption: room.roomOptions.lkEncryptionType,
);

// Populate audio features (e.g., TF_NO_DTX, TF_PRECONNECT_BUFFER)
req.audioFeatures.addAll([
if (!publishOptions.dtx)
lk_models.AudioTrackFeature.TF_NO_DTX,
if (publishOptions.preConnect)
lk_models.AudioTrackFeature.TF_PRECONNECT_BUFFER,
]);

Future<lk_models.TrackInfo> negotiate() async {
track.transceiver = await room.engine
.createTransceiverRTCRtpSender(track, publishOptions!, encodings);
Expand Down
205 changes: 205 additions & 0 deletions lib/src/preconnect/pre_connect_audio_buffer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2025 LiveKit, Inc.
// Lightweight pre-connect audio buffer (scaffold). Captures bytes externally
// and uploads via byte stream once an agent is ready.

import 'dart:async';
import 'dart:developer';
import 'dart:typed_data';

import 'package:flutter/services.dart';
import 'package:livekit_client/livekit_client.dart';
import 'package:uuid/uuid.dart';

import '../support/native.dart';

typedef PreConnectOnError = void Function(Object error);

class AudioFrame {
final List<Int16List> data;
final int sampleRate;
final int channelCount;
final int frameLength;
final String format;

AudioFrame({
required this.data,
required this.sampleRate,
required this.channelCount,
required this.frameLength,
required this.format,
});

factory AudioFrame.fromMap(Map<String, dynamic> map) => AudioFrame(
data: (map['data'] as List<dynamic>)
.map<Int16List>((channel) => (channel as List).map<int>((e) => e as int).toList() as Int16List)
.toList(),
sampleRate: (map['sampleRate'] as int),
channelCount: (map['channelCount'] as int),
frameLength: (map['frameLength'] as int),
format: map['format'] as String,
);
}

class PreConnectAudioBuffer {
static const String dataTopic = 'lk.agent.pre-connect-audio-buffer';

static const int defaultMaxSize = 10 * 1024 * 1024; // 10MB
static const int defaultSampleRate = 24000; // Hz

// Reference to the room
final Room _room;

// Internal states
bool _isRecording = false;
bool _isSent = false;
String? _rendererId;

LocalAudioTrack? _localTrack;
EventChannel? _eventChannel;
StreamSubscription? _streamSubscription;

final PreConnectOnError? _onError;
final int _sampleRate;

final BytesBuilder _bytes = BytesBuilder(copy: false);
Timer? _timeoutTimer;
CancelListenFunc? _participantStateListener;
CancelListenFunc? _remoteSubscribedListener;

PreConnectAudioBuffer(
this._room, {
PreConnectOnError? onError,
int sampleRate = defaultSampleRate,
}) : _onError = onError,
_sampleRate = sampleRate;

// Getters
bool get isRecording => _isRecording;
int get bufferedSize => _bytes.length;

Future<void> startRecording({
Duration timeout = const Duration(seconds: 10),
}) async {
if (_isRecording) {
logger.warning('Already recording');
return;
}
_isRecording = true;

_localTrack = await LocalAudioTrack.create();
print('localTrack: ${_localTrack!.mediaStreamTrack.id}');

final rendererId = Uuid().v4();
logger.info('Starting audio renderer with rendererId: $rendererId');

final result = await Native.startAudioRenderer(
trackId: _localTrack!.mediaStreamTrack.id!,
rendererId: rendererId,
);

_rendererId = rendererId;

logger.info('startAudioRenderer result: $result');

_eventChannel = EventChannel('io.livekit.audio.renderer/channel-$rendererId');
_streamSubscription = _eventChannel?.receiveBroadcastStream().listen((event) {
try {
// logger.info('event: $event');
// {sampleRate: 32000, format: int16, frameLength: 320, channelCount: 1}
final dataChannels = event['data'] as List<dynamic>;
final monoData = dataChannels[0].cast<int>();
_bytes.add(monoData);
log('bufferedSize: ${_bytes.length}');
} catch (e) {
logger.warning('Error parsing event: $e');
}
});

// Listen for agent readiness; when active, attempt to send buffer once.
_participantStateListener = _room.events.on<ParticipantStateUpdatedEvent>((event) async {
if (event.participant.kind == ParticipantKind.AGENT && event.state == ParticipantState.active) {
logger.info('Agent is active: ${event.participant.identity}');
try {
await sendAudioData(agents: [event.participant.identity]);
} catch (e) {
_onError?.call(e);
} finally {
await reset();
}
}
});

_remoteSubscribedListener = _room.events.on<LocalTrackSubscribedEvent>((event) async {
logger.info('Remote track subscribed: ${event.trackSid}');
await stopRecording();
});
}

Future<void> stopRecording() async {
if (!_isRecording) {
logger.warning('Not recording');
return;
}
_isRecording = false;

// Cancel the stream subscription.
await _streamSubscription?.cancel();
_streamSubscription = null;

// Dispose the event channel.
_eventChannel = null;

final rendererId = _rendererId;
if (rendererId == null) {
logger.warning('No rendererId');
return;
}

await Native.stopAudioRenderer(
rendererId: rendererId,
);

_rendererId = null;
}

// Clean-up & reset for re-use
Future<void> reset() async {
await stopRecording();
_timeoutTimer?.cancel();
_participantStateListener?.call();
_participantStateListener = null;
_remoteSubscribedListener?.call();
_remoteSubscribedListener = null;
_bytes.clear();
_localTrack = null;
}

Future<void> sendAudioData({
required List<String> agents,
String topic = dataTopic,
}) async {
if (_isSent) return;
if (agents.isEmpty) return;

final data = _bytes.takeBytes();
if (data.length <= 1024) {
throw StateError('Audio data too small to send (${data.length} bytes)');
}
_isSent = true;

final streamOptions = StreamBytesOptions(
topic: topic,
attributes: {
'sampleRate': '$_sampleRate',
'channels': '1',
'trackId': _localTrack!.mediaStreamTrack.id!,
},
destinationIdentities: agents,
);

final writer = await _room.localParticipant!.streamBytes(streamOptions);
await writer.write(data);
await writer.close();
logger.info('[preconnect] sent ${(data.length / 1024).toStringAsFixed(1)}KB of audio to ${agents.length} agent(s)');
}
}
36 changes: 36 additions & 0 deletions lib/src/support/native.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,42 @@ class Native {
}
}

@internal
static Future<bool> startAudioRenderer({
required String trackId,
required String rendererId,
}) async {
try {
final result = await channel.invokeMethod<bool>(
'startAudioRenderer',
<String, dynamic>{
'trackId': trackId,
'rendererId': rendererId,
},
);
return result == true;
} catch (error) {
logger.warning('startAudioRenderer did throw $error');
return false;
}
}

@internal
static Future<void> stopAudioRenderer({
required String rendererId,
}) async {
try {
await channel.invokeMethod<void>(
'stopAudioRenderer',
<String, dynamic>{
'rendererId': rendererId,
},
);
} catch (error) {
logger.warning('stopAudioRenderer did throw $error');
}
}

/// Returns OS's version as a string
/// Currently only for iOS, macOS
@internal
Expand Down
1 change: 1 addition & 0 deletions macos/Classes/AudioRenderer.swift
8 changes: 4 additions & 4 deletions pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ packages:
dependency: "direct main"
description:
name: dart_webrtc
sha256: "5b76fd85ac95d6f5dee3e7d7de8d4b51bfbec1dc73804647c6aebb52d6297116"
sha256: a2ae542cdadc21359022adedc26138fa3487cc3b3547c24ff4f556681869e28c
url: "https://pub.dev"
source: hosted
version: "1.5.3+hotfix.2"
version: "1.5.3+hotfix.4"
dbus:
dependency: transitive
description:
Expand Down Expand Up @@ -220,10 +220,10 @@ packages:
dependency: "direct main"
description:
name: flutter_webrtc
sha256: dd47ca103b5b6217771e6277882674276d9621bbf9eb23da3c03898b507844e3
sha256: "69095ba39b83da3de48286dfc0769aa8e9f10491f70058dc8d8ecc960ef7a260"
url: "https://pub.dev"
source: hosted
version: "0.14.1"
version: "1.0.0"
glob:
dependency: transitive
description:
Expand Down
Loading
Loading