-
Notifications
You must be signed in to change notification settings - Fork 173
Single Peer Connection #919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
de1dae8
4e909c6
161f5d8
9149d2e
10862f9
8cffacc
53d7c3f
bbb7d8b
1b9d016
48b0a0f
8cd0335
8735aae
4810ec3
a54ffb3
2b98cf8
1069328
a55f7f5
3fc6556
a5aa3f5
78fb374
dc828be
0b6e733
505a594
39702c0
7b2130f
fb20e80
cecef05
cc9f0b7
b64d296
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| minor type="added" "Single peer connection mode via `RoomOptions.singlePeerConnection`. Requires LiveKit Cloud or LiveKit OSS >= 1.9.2." |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| .DS_Store | ||
| /.build | ||
| .build | ||
| .claude | ||
| /Packages | ||
| /*.xcodeproj | ||
| .swiftpm/ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,16 +44,11 @@ extension Room { | |
| publisherDataChannel.reset() | ||
| subscriberDataChannel.reset() | ||
|
|
||
| let (subscriber, publisher) = _state.read { ($0.subscriber, $0.publisher) } | ||
|
|
||
| // Close transports | ||
| await publisher?.close() | ||
| await subscriber?.close() | ||
| await _state.transport?.close() | ||
|
|
||
| // Reset publish state | ||
| _state.mutate { | ||
| $0.subscriber = nil | ||
| $0.publisher = nil | ||
| $0.transport = nil | ||
| $0.hasPublished = false | ||
| } | ||
| } | ||
|
|
@@ -75,7 +70,10 @@ extension Room { | |
|
|
||
| func send(dataPacket packet: Livekit_DataPacket) async throws { | ||
| func ensurePublisherConnected() async throws { | ||
| guard _state.isSubscriberPrimary else { return } | ||
| // Only needed when subscriber is primary in dual PC mode | ||
| guard case .subscriberPrimary = _state.transport else { | ||
| return | ||
| } | ||
|
|
||
| let publisher = try requirePublisher() | ||
|
|
||
|
|
@@ -91,7 +89,7 @@ extension Room { | |
| try await ensurePublisherConnected() | ||
|
|
||
| // At this point publisher should be .connected and dc should be .open | ||
| if await !(_state.publisher?.isConnected ?? false) { | ||
| if await !(_state.transport?.publisher.isConnected ?? false) { | ||
| log("publisher is not .connected", .error) | ||
| } | ||
|
|
||
|
|
@@ -116,7 +114,7 @@ extension Room { | |
|
|
||
| extension Room { | ||
| // swiftlint:disable:next function_body_length | ||
| func configureTransports(connectResponse: SignalClient.ConnectResponse) async throws { | ||
| func configureTransports(connectResponse: SignalClient.ConnectResponse, singlePeerConnection: Bool) async throws { | ||
| func makeConfiguration() -> LKRTCConfiguration { | ||
| let connectOptions = _state.connectOptions | ||
|
|
||
|
|
@@ -147,23 +145,20 @@ extension Room { | |
| if case let .join(joinResponse) = connectResponse { | ||
| log("Configuring transports with JOIN response...") | ||
|
|
||
| guard _state.subscriber == nil, _state.publisher == nil else { | ||
| guard _state.transport == nil else { | ||
| log("Transports are already configured") | ||
| return | ||
| } | ||
|
|
||
| // protocol v3 | ||
| let isSubscriberPrimary = joinResponse.subscriberPrimary | ||
| log("subscriberPrimary: \(joinResponse.subscriberPrimary)") | ||
|
|
||
| let subscriber = try Transport(config: rtcConfiguration, | ||
| target: .subscriber, | ||
| primary: isSubscriberPrimary, | ||
| delegate: self) | ||
| let isSinglePC = singlePeerConnection | ||
| let isSubscriberPrimary = isSinglePC ? false : joinResponse.subscriberPrimary | ||
| log("subscriberPrimary: \(isSubscriberPrimary), singlePeerConnection: \(isSinglePC)") | ||
|
|
||
| // Publisher always created; is primary in single PC mode | ||
| let publisher = try Transport(config: rtcConfiguration, | ||
| target: .publisher, | ||
| primary: !isSubscriberPrimary, | ||
| primary: isSinglePC || !isSubscriberPrimary, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curiously, what does primary mean here ? why primary is true when isSinglePC is true ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it maps the inverse relationship between the func transport(_ transport: Transport, didUpdateState pcState: LKRTCPeerConnectionState) {
log("target: \(transport.target), connectionState: \(pcState.description)")
// primary connected
if transport.isPrimary {
if pcState.isConnected {
primaryTransportConnectedCompleter.resume(returning: ()) |
||
| singlePCMode: isSinglePC, | ||
| delegate: self) | ||
|
|
||
| await publisher.set { [weak self] offer, offerId in | ||
|
|
@@ -186,23 +181,29 @@ extension Room { | |
| log("dataChannel.\(String(describing: reliableDataChannel?.label)) : \(String(describing: reliableDataChannel?.channelId))") | ||
| log("dataChannel.\(String(describing: lossyDataChannel?.label)) : \(String(describing: lossyDataChannel?.channelId))") | ||
|
|
||
| _state.mutate { | ||
| $0.subscriber = subscriber | ||
| $0.publisher = publisher | ||
| $0.isSubscriberPrimary = isSubscriberPrimary | ||
| let subscriber = isSinglePC ? nil : try Transport(config: rtcConfiguration, | ||
| target: .subscriber, | ||
| primary: isSubscriberPrimary, | ||
| delegate: self) | ||
|
|
||
| let transport: TransportMode = if let subscriber, isSubscriberPrimary { | ||
| .subscriberPrimary(publisher: publisher, subscriber: subscriber) | ||
| } else if let subscriber { | ||
| .publisherPrimary(publisher: publisher, subscriber: subscriber) | ||
| } else { | ||
| .publisherOnly(publisher: publisher) | ||
| } | ||
| _state.mutate { $0.transport = transport } | ||
|
|
||
| log("[Connect] Fast publish enabled: \(joinResponse.fastPublish ? "true" : "false")") | ||
| if !isSubscriberPrimary || joinResponse.fastPublish { | ||
| // lazy negotiation for protocol v3+ | ||
| if isSinglePC || !isSubscriberPrimary || joinResponse.fastPublish { | ||
| // In single PC mode or when publisher is primary, negotiate immediately | ||
| try await publisherShouldNegotiate() | ||
| } | ||
|
|
||
| } else if case let .reconnect(reconnectResponse) = connectResponse { | ||
| log("[Connect] Configuring transports with RECONNECT response...") | ||
| let (subscriber, publisher) = _state.read { ($0.subscriber, $0.publisher) } | ||
| try await subscriber?.set(configuration: rtcConfiguration) | ||
| try await publisher?.set(configuration: rtcConfiguration) | ||
| try await _state.transport?.set(configuration: rtcConfiguration) | ||
| publisherDataChannel.retryReliable(lastSequence: reconnectResponse.lastMessageSeq) | ||
| } | ||
| } | ||
|
|
@@ -249,16 +250,32 @@ public enum StartReconnectReason: Sendable { | |
| extension Room { | ||
| // full connect sequence, doesn't update connection state | ||
| func fullConnectSequence(_ url: URL, _ token: String) async throws { | ||
| let connectResponse = try await signalClient.connect(url, | ||
| var singlePC = _state.roomOptions.singlePeerConnection | ||
|
|
||
| let connectResponse: SignalClient.ConnectResponse | ||
| do { | ||
| connectResponse = try await signalClient.connect(url, | ||
| token, | ||
| connectOptions: _state.connectOptions, | ||
| reconnectMode: _state.isReconnectingWithMode, | ||
| adaptiveStream: _state.roomOptions.adaptiveStream) | ||
| adaptiveStream: _state.roomOptions.adaptiveStream, | ||
| singlePeerConnection: singlePC) | ||
| } catch let error as LiveKitError where error.type == .serviceNotFound && singlePC { | ||
| log("v1 RTC path not supported, retrying with legacy path", .warning) | ||
| singlePC = false | ||
| connectResponse = try await signalClient.connect(url, | ||
| token, | ||
| connectOptions: _state.connectOptions, | ||
| reconnectMode: _state.isReconnectingWithMode, | ||
| adaptiveStream: _state.roomOptions.adaptiveStream, | ||
| singlePeerConnection: false) | ||
| } | ||
|
|
||
| // Check cancellation after WebSocket connected | ||
| try Task.checkCancellation() | ||
|
|
||
| _state.mutate { $0.connectStopwatch.split(label: "signal") } | ||
| try await configureTransports(connectResponse: connectResponse) | ||
| try await configureTransports(connectResponse: connectResponse, singlePeerConnection: singlePC) | ||
| // Check cancellation after configuring transports | ||
| try Task.checkCancellation() | ||
|
|
||
|
|
@@ -288,8 +305,8 @@ extension Room { | |
| throw LiveKitError(.invalidState) | ||
| } | ||
|
|
||
| guard _state.subscriber != nil, _state.publisher != nil else { | ||
| log("[Connect] Publisher or subscriber is nil", .error) | ||
| guard _state.transport != nil else { | ||
| log("[Connect] Transport is nil", .error) | ||
| throw LiveKitError(.invalidState) | ||
| } | ||
|
|
||
|
|
@@ -308,16 +325,19 @@ extension Room { | |
| @Sendable func quickReconnectSequence() async throws { | ||
| log("[Connect] Starting .quick reconnect sequence...") | ||
|
|
||
| let singlePC = await !signalClient.useV0SignalPath | ||
| let connectResponse = try await signalClient.connect(url, | ||
| token, | ||
| connectOptions: _state.connectOptions, | ||
| reconnectMode: _state.isReconnectingWithMode, | ||
| participantSid: localParticipant.sid, | ||
| adaptiveStream: _state.roomOptions.adaptiveStream) | ||
| adaptiveStream: _state.roomOptions.adaptiveStream, | ||
| singlePeerConnection: singlePC) | ||
| try Task.checkCancellation() | ||
|
|
||
| // Update configuration | ||
| try await configureTransports(connectResponse: connectResponse) | ||
| try await configureTransports(connectResponse: connectResponse, | ||
| singlePeerConnection: singlePC) | ||
| try Task.checkCancellation() | ||
|
|
||
| // Resume after configuring transports... | ||
|
|
@@ -337,9 +357,9 @@ extension Room { | |
| // send SyncState before offer | ||
| try await sendSyncState() | ||
|
|
||
| await _state.subscriber?.setIsRestartingIce() | ||
| await _state.transport?.setSubscriberRestartingIce() | ||
|
|
||
| if let publisher = _state.publisher, _state.hasPublished { | ||
| if let publisher = _state.transport?.publisher, _state.hasPublished { | ||
| // Only if published, wait for publisher to connect... | ||
| log("[Connect] Waiting for publisher to connect...") | ||
| try await publisher.createAndSendOffer(iceRestart: true) | ||
|
|
@@ -473,13 +493,12 @@ extension Room { | |
|
|
||
| extension Room { | ||
| func sendSyncState() async throws { | ||
| guard let subscriber = _state.subscriber else { | ||
| log("Subscriber is nil", .error) | ||
| guard let transport = _state.transport else { | ||
| log("Transport is nil", .error) | ||
| return | ||
| } | ||
|
|
||
| let previousAnswer = await subscriber.localDescription | ||
| let previousOffer = await subscriber.remoteDescription | ||
| let (previousAnswer, previousOffer) = await transport.syncStateDescriptions() | ||
|
|
||
| // 1. autosubscribe on, so subscribed tracks = all tracks - unsub tracks, | ||
| // in this case, we send unsub tracks, so server add all tracks to this | ||
|
|
@@ -517,7 +536,7 @@ extension Room { | |
|
|
||
| extension Room { | ||
| func requirePublisher() throws -> Transport { | ||
| guard let publisher = _state.publisher else { | ||
| guard let publisher = _state.transport?.publisher else { | ||
| log("Publisher is nil", .error) | ||
| throw LiveKitError(.invalidState, message: "Publisher is nil") | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the log intentional ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's .debug and it existed before:
log("subscriberPrimary: \(joinResponse.subscriberPrimary)")