Skip to content

Commit 492f377

Browse files
ConradIrwinBe-ing
authored andcommitted
allow passing in TLS connector
We've been through a number of different possible rustls connectors, and we're currently using one (rustls-platform-verifier) that doesn't have feature flags in livekit or tungstenite yet. Instead of requiring that every transitive dependency supports the rustls feature flags we need, it's much easier to pass only the __rustls-tls feature flag and pass the Connector that we create with our configuration down the stack.
1 parent 83bd7c7 commit 492f377

File tree

3 files changed

+65
-20
lines changed

3 files changed

+65
-20
lines changed

livekit-api/src/signal_client/mod.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ mod signal_stream;
4242

4343
pub use region::RegionUrlProvider;
4444

45+
#[cfg(feature = "signal-client-tokio")]
46+
pub use tokio_tungstenite::Connector;
47+
#[cfg(not(feature = "signal-client-tokio"))]
48+
pub enum Connector {}
49+
4550
pub type SignalEmitter = mpsc::UnboundedSender<SignalEvent>;
4651
pub type SignalEvents = mpsc::UnboundedReceiver<SignalEvent>;
4752
pub type SignalResult<T> = Result<T, SignalError>;
@@ -84,11 +89,12 @@ impl Default for SignalSdkOptions {
8489
}
8590
}
8691

87-
#[derive(Debug, Clone)]
92+
#[derive(Clone)]
8893
#[non_exhaustive]
8994
pub struct SignalOptions {
9095
pub auto_subscribe: bool,
9196
pub adaptive_stream: bool,
97+
pub connector: Option<Connector>,
9298
pub sdk_options: SignalSdkOptions,
9399
}
94100

@@ -97,11 +103,23 @@ impl Default for SignalOptions {
97103
Self {
98104
auto_subscribe: true,
99105
adaptive_stream: false,
106+
connector: None,
100107
sdk_options: SignalSdkOptions::default(),
101108
}
102109
}
103110
}
104111

112+
impl Debug for SignalOptions {
113+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114+
f.debug_struct("SignalClient")
115+
.field("auto_subscribe", &self.auto_subscribe)
116+
.field("adaptive_stream", &self.adaptive_stream)
117+
.field("connector", &self.connector.is_some())
118+
.field("sdk_options", &self.sdk_options)
119+
.finish()
120+
}
121+
}
122+
105123
pub enum SignalEvent {
106124
/// Received a message from the server
107125
Message(Box<proto::signal_response::Message>),
@@ -250,17 +268,18 @@ impl SignalInner {
250268
let lk_url = get_livekit_url(url, &options)?;
251269

252270
// Try to connect to the SignalClient
253-
let (stream, mut events) = match SignalStream::connect(lk_url.clone(), token).await {
254-
Ok(stream) => stream,
255-
Err(err) => {
256-
if let SignalError::TokenFormat = err {
271+
let (stream, mut events) =
272+
match SignalStream::connect(lk_url.clone(), token, options.connector.clone()).await {
273+
Ok(stream) => stream,
274+
Err(err) => {
275+
if let SignalError::TokenFormat = err {
276+
return Err(err);
277+
}
278+
// Connection failed, try to retrieve more informations
279+
Self::validate(lk_url).await?;
257280
return Err(err);
258281
}
259-
// Connection failed, try to retrieve more informations
260-
Self::validate(lk_url).await?;
261-
return Err(err);
262-
}
263-
};
282+
};
264283

265284
let join_response = get_join_response(&mut events).await?;
266285

@@ -322,7 +341,8 @@ impl SignalInner {
322341
let mut lk_url = get_livekit_url(&self.url, &self.options).unwrap();
323342
lk_url.query_pairs_mut().append_pair("reconnect", "1").append_pair("sid", sid);
324343

325-
let (new_stream, mut events) = SignalStream::connect(lk_url, &token).await?;
344+
let (new_stream, mut events) =
345+
SignalStream::connect(lk_url, &token, self.options.connector.clone()).await?;
326346
let reconnect_response = get_reconnect_response(&mut events).await?;
327347
*stream = Some(new_stream);
328348

livekit-api/src/signal_client/signal_stream.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use tokio::sync::{mpsc, oneshot};
2525

2626
#[cfg(feature = "signal-client-tokio")]
2727
use base64;
28+
use tokio_tungstenite::Connector;
2829

2930
#[cfg(feature = "signal-client-tokio")]
3031
use tokio::{
@@ -34,7 +35,7 @@ use tokio::{
3435

3536
#[cfg(feature = "signal-client-tokio")]
3637
use tokio_tungstenite::{
37-
connect_async,
38+
client_async_with_config, connect_async_tls_with_config,
3839
tungstenite::client::IntoClientRequest,
3940
tungstenite::error::ProtocolError,
4041
tungstenite::http::{header::AUTHORIZATION, HeaderValue},
@@ -88,6 +89,7 @@ impl SignalStream {
8889
pub async fn connect(
8990
url: url::Url,
9091
token: &str,
92+
tls_connector: Option<Connector>,
9193
) -> SignalResult<(Self, mpsc::UnboundedReceiver<Box<proto::signal_response::Message>>)> {
9294
log::info!("connecting to {}", url);
9395
let mut request = url.clone().into_client_request()?;
@@ -288,17 +290,18 @@ impl SignalStream {
288290
};
289291

290292
// Now perform WebSocket handshake over the established connection
291-
let (ws_stream, _) =
292-
tokio_tungstenite::client_async_with_config(request, stream, None).await?;
293+
let (ws_stream, _) = client_async_with_config(request, stream, None).await?;
293294
ws_stream
294295
} else {
295296
// No proxy specified, connect directly
296-
let (ws_stream, _) = connect_async(request).await?;
297+
let (ws_stream, _) =
298+
connect_async_tls_with_config(request, None, false, tls_connector).await?;
297299
ws_stream
298300
}
299301
} else {
300302
// Non-tokio build or no proxy - connect directly
301-
let (ws_stream, _) = connect_async(request).await?;
303+
let (ws_stream, _) =
304+
connect_async_tls_with_config(request, None, false, tls_connector).await?;
302305
ws_stream
303306
};
304307

@@ -307,6 +310,9 @@ impl SignalStream {
307310

308311
#[cfg(not(feature = "signal-client-tokio"))]
309312
let (ws_stream, _) = connect_async(request).await?;
313+
#[cfg(not(feature = "signal-client-tokio"))]
314+
let _ = tls_connector;
315+
310316
let (ws_writer, ws_reader) = ws_stream.split();
311317

312318
let (emitter, events) = mpsc::unbounded_channel();

livekit/src/room/mod.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ impl From<RoomSdkOptions> for SignalSdkOptions {
348348
}
349349
}
350350

351-
#[derive(Debug, Clone)]
351+
#[derive(Clone)]
352352
#[non_exhaustive]
353353
pub struct RoomOptions {
354354
pub auto_subscribe: bool,
@@ -361,8 +361,24 @@ pub struct RoomOptions {
361361
pub rtc_config: RtcConfiguration,
362362
pub join_retries: u32,
363363
pub sdk_options: RoomSdkOptions,
364+
pub signal_options: SignalOptions,
365+
pub connector: Option<livekit_api::signal_client::Connector>,
366+
}
367+
impl Debug for RoomOptions {
368+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369+
f.debug_struct("RoomOptions")
370+
.field("auto_subscribe", &self.auto_subscribe)
371+
.field("adaptive_stream", &self.adaptive_stream)
372+
.field("dynacast", &self.dynacast)
373+
.field("e2ee", &self.e2ee)
374+
.field("rtc_config", &self.rtc_config)
375+
.field("join_retries", &self.join_retries)
376+
.field("sdk_options", &self.sdk_options)
377+
.field("signal_options", &self.signal_options)
378+
// Exclude connector field as it's not Debug
379+
.finish()
380+
}
364381
}
365-
366382
impl Default for RoomOptions {
367383
fn default() -> Self {
368384
Self {
@@ -381,6 +397,8 @@ impl Default for RoomOptions {
381397
},
382398
join_retries: 3,
383399
sdk_options: RoomSdkOptions::default(),
400+
signal_options: SignalOptions::default(),
401+
connector: None,
384402
}
385403
}
386404
}
@@ -474,6 +492,7 @@ impl Room {
474492
signal_options.sdk_options = options.sdk_options.clone().into();
475493
signal_options.auto_subscribe = options.auto_subscribe;
476494
signal_options.adaptive_stream = options.adaptive_stream;
495+
signal_options.connector = options.connector.clone();
477496
let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
478497
url,
479498
token,
@@ -1120,7 +1139,7 @@ impl RoomSession {
11201139
}
11211140

11221141
async fn send_sync_state(self: &Arc<Self>) {
1123-
let auto_subscribe = self.options.auto_subscribe;
1142+
let auto_subscribe = self.options.signal_options.auto_subscribe;
11241143
let session = self.rtc_engine.session();
11251144

11261145
if session.subscriber().peer_connection().current_local_description().is_none() {
@@ -1596,7 +1615,7 @@ impl RoomSession {
15961615
name,
15971616
metadata,
15981617
attributes,
1599-
self.options.auto_subscribe,
1618+
self.options.signal_options.auto_subscribe,
16001619
);
16011620

16021621
participant.on_track_published({

0 commit comments

Comments
 (0)