Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ mod signal_stream;

pub use region::RegionUrlProvider;

#[cfg(feature = "signal-client-tokio")]
pub use tokio_tungstenite::Connector;
#[cfg(not(feature = "signal-client-tokio"))]
pub enum Connector {}

pub type SignalEmitter = mpsc::UnboundedSender<SignalEvent>;
pub type SignalEvents = mpsc::UnboundedReceiver<SignalEvent>;
pub type SignalResult<T> = Result<T, SignalError>;
Expand Down Expand Up @@ -84,11 +89,12 @@ impl Default for SignalSdkOptions {
}
}

#[derive(Debug, Clone)]
#[derive(Clone)]
#[non_exhaustive]
pub struct SignalOptions {
pub auto_subscribe: bool,
pub adaptive_stream: bool,
pub connector: Option<Connector>,
pub sdk_options: SignalSdkOptions,
}

Expand All @@ -97,11 +103,23 @@ impl Default for SignalOptions {
Self {
auto_subscribe: true,
adaptive_stream: false,
connector: None,
sdk_options: SignalSdkOptions::default(),
}
}
}

impl Debug for SignalOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SignalClient")
.field("auto_subscribe", &self.auto_subscribe)
.field("adaptive_stream", &self.adaptive_stream)
.field("connector", &self.connector.is_some())
.field("sdk_options", &self.sdk_options)
.finish()
}
}

pub enum SignalEvent {
/// Received a message from the server
Message(Box<proto::signal_response::Message>),
Expand Down Expand Up @@ -250,17 +268,18 @@ impl SignalInner {
let lk_url = get_livekit_url(url, &options)?;

// Try to connect to the SignalClient
let (stream, mut events) = match SignalStream::connect(lk_url.clone(), token).await {
Ok(stream) => stream,
Err(err) => {
if let SignalError::TokenFormat = err {
let (stream, mut events) =
match SignalStream::connect(lk_url.clone(), token, options.connector.clone()).await {
Ok(stream) => stream,
Err(err) => {
if let SignalError::TokenFormat = err {
return Err(err);
}
// Connection failed, try to retrieve more informations
Self::validate(lk_url).await?;
return Err(err);
}
// Connection failed, try to retrieve more informations
Self::validate(lk_url).await?;
return Err(err);
}
};
};

let join_response = get_join_response(&mut events).await?;

Expand Down Expand Up @@ -322,7 +341,8 @@ impl SignalInner {
let mut lk_url = get_livekit_url(&self.url, &self.options).unwrap();
lk_url.query_pairs_mut().append_pair("reconnect", "1").append_pair("sid", sid);

let (new_stream, mut events) = SignalStream::connect(lk_url, &token).await?;
let (new_stream, mut events) =
SignalStream::connect(lk_url, &token, self.options.connector.clone()).await?;
let reconnect_response = get_reconnect_response(&mut events).await?;
*stream = Some(new_stream);

Expand Down
17 changes: 11 additions & 6 deletions livekit-api/src/signal_client/signal_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ use tokio::{

#[cfg(feature = "signal-client-tokio")]
use tokio_tungstenite::{
connect_async,
client_async_with_config, connect_async_tls_with_config,
tungstenite::client::IntoClientRequest,
tungstenite::error::ProtocolError,
tungstenite::http::{header::AUTHORIZATION, HeaderValue},
tungstenite::{Error as WsError, Message},
MaybeTlsStream, WebSocketStream,
Connector, MaybeTlsStream, WebSocketStream,
};

#[cfg(feature = "__signal-client-async-compatible")]
Expand Down Expand Up @@ -88,6 +88,7 @@ impl SignalStream {
pub async fn connect(
url: url::Url,
token: &str,
tls_connector: Option<Connector>,
) -> SignalResult<(Self, mpsc::UnboundedReceiver<Box<proto::signal_response::Message>>)> {
log::info!("connecting to {}", url);
let mut request = url.clone().into_client_request()?;
Expand Down Expand Up @@ -288,17 +289,18 @@ impl SignalStream {
};

// Now perform WebSocket handshake over the established connection
let (ws_stream, _) =
tokio_tungstenite::client_async_with_config(request, stream, None).await?;
let (ws_stream, _) = client_async_with_config(request, stream, None).await?;
ws_stream
} else {
// No proxy specified, connect directly
let (ws_stream, _) = connect_async(request).await?;
let (ws_stream, _) =
connect_async_tls_with_config(request, None, false, tls_connector).await?;
ws_stream
}
} else {
// Non-tokio build or no proxy - connect directly
let (ws_stream, _) = connect_async(request).await?;
let (ws_stream, _) =
connect_async_tls_with_config(request, None, false, tls_connector).await?;
ws_stream
};

Expand All @@ -307,6 +309,9 @@ impl SignalStream {

#[cfg(not(feature = "signal-client-tokio"))]
let (ws_stream, _) = connect_async(request).await?;
#[cfg(not(feature = "signal-client-tokio"))]
let _ = tls_connector;

let (ws_writer, ws_reader) = ws_stream.split();

let (emitter, events) = mpsc::unbounded_channel();
Expand Down
27 changes: 23 additions & 4 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl From<RoomSdkOptions> for SignalSdkOptions {
}
}

#[derive(Debug, Clone)]
#[derive(Clone)]
#[non_exhaustive]
pub struct RoomOptions {
pub auto_subscribe: bool,
Expand All @@ -361,8 +361,24 @@ pub struct RoomOptions {
pub rtc_config: RtcConfiguration,
pub join_retries: u32,
pub sdk_options: RoomSdkOptions,
pub signal_options: SignalOptions,
pub connector: Option<livekit_api::signal_client::Connector>,
}
impl Debug for RoomOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoomOptions")
.field("auto_subscribe", &self.auto_subscribe)
.field("adaptive_stream", &self.adaptive_stream)
.field("dynacast", &self.dynacast)
.field("e2ee", &self.e2ee)
.field("rtc_config", &self.rtc_config)
.field("join_retries", &self.join_retries)
.field("sdk_options", &self.sdk_options)
.field("signal_options", &self.signal_options)
// Exclude connector field as it's not Debug
.finish()
}
}

impl Default for RoomOptions {
fn default() -> Self {
Self {
Expand All @@ -381,6 +397,8 @@ impl Default for RoomOptions {
},
join_retries: 3,
sdk_options: RoomSdkOptions::default(),
signal_options: SignalOptions::default(),
connector: None,
}
}
}
Expand Down Expand Up @@ -474,6 +492,7 @@ impl Room {
signal_options.sdk_options = options.sdk_options.clone().into();
signal_options.auto_subscribe = options.auto_subscribe;
signal_options.adaptive_stream = options.adaptive_stream;
signal_options.connector = options.connector.clone();
let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
url,
token,
Expand Down Expand Up @@ -1120,7 +1139,7 @@ impl RoomSession {
}

async fn send_sync_state(self: &Arc<Self>) {
let auto_subscribe = self.options.auto_subscribe;
let auto_subscribe = self.options.signal_options.auto_subscribe;
let session = self.rtc_engine.session();

if session.subscriber().peer_connection().current_local_description().is_none() {
Expand Down Expand Up @@ -1596,7 +1615,7 @@ impl RoomSession {
name,
metadata,
attributes,
self.options.auto_subscribe,
self.options.signal_options.auto_subscribe,
);

participant.on_track_published({
Expand Down