Skip to content

Commit 230dd3c

Browse files
minor shuffling, add dummy struct
1 parent 4f7fcc7 commit 230dd3c

File tree

5 files changed

+56
-67
lines changed

5 files changed

+56
-67
lines changed

src/client/csfle/state_machine.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@ use tokio::{
1313
sync::{oneshot, Mutex},
1414
};
1515

16-
#[cfg(feature = "socks5-proxy")]
17-
use crate::options::Socks5Proxy;
1816
use crate::{
1917
bson::{rawdoc, Document, RawDocument, RawDocumentBuf},
2018
client::{csfle::options::KmsProvidersTlsOptions, options::ServerAddress, WeakClient},
2119
error::{Error, Result},
2220
operation::{raw_output::RawOutput, run_command::RunCommand},
23-
options::ReadConcern,
21+
options::{ReadConcern, Socks5Proxy},
2422
runtime::{process::Process, AsyncStream, TlsConfig},
2523
Client,
2624
Namespace,
@@ -39,7 +37,6 @@ pub(crate) struct CryptExecutor {
3937
metadata_client: Option<WeakClient>,
4038
#[cfg(feature = "azure-kms")]
4139
azure: azure::ExecutorState,
42-
#[cfg(feature = "socks5-proxy")]
4340
proxy: Option<Socks5Proxy>,
4441
}
4542

@@ -64,7 +61,6 @@ impl CryptExecutor {
6461
metadata_client: None,
6562
#[cfg(feature = "azure-kms")]
6663
azure: azure::ExecutorState::new()?,
67-
#[cfg(feature = "socks5-proxy")]
6864
proxy: None,
6965
})
7066
}
@@ -76,7 +72,7 @@ impl CryptExecutor {
7672
mongocryptd_opts: Option<MongocryptdOptions>,
7773
mongocryptd_client: Option<Client>,
7874
metadata_client: Option<WeakClient>,
79-
#[cfg(feature = "socks5-proxy")] proxy: Option<Socks5Proxy>,
75+
proxy: Option<Socks5Proxy>,
8076
) -> Result<Self> {
8177
let mongocryptd = match mongocryptd_opts {
8278
Some(opts) => Some(Mongocryptd::new(opts).await?),
@@ -188,7 +184,7 @@ impl CryptExecutor {
188184
async fn execute(
189185
kms_ctx: &mut KmsCtx<'_>,
190186
tls_options: Option<&KmsProvidersTlsOptions>,
191-
#[cfg(feature = "socks5-proxy")] proxy: Option<&Socks5Proxy>,
187+
proxy: Option<&Socks5Proxy>,
192188
) -> Result<()> {
193189
let endpoint = kms_ctx.endpoint()?;
194190
let addr = ServerAddress::parse(endpoint)?;
@@ -197,14 +193,9 @@ impl CryptExecutor {
197193
.and_then(|tls| tls.get(&provider))
198194
.cloned()
199195
.unwrap_or_default();
200-
let mut stream = AsyncStream::connect(
201-
addr,
202-
Some(&TlsConfig::new(tls_options)?),
203-
Duration::MAX,
204-
#[cfg(feature = "socks5-proxy")]
205-
proxy,
206-
)
207-
.await?;
196+
let mut stream =
197+
AsyncStream::connect(addr, Some(&TlsConfig::new(tls_options)?), proxy)
198+
.await?;
208199
stream.write_all(kms_ctx.message()?).await?;
209200
let mut buf = vec![0];
210201
while kms_ctx.bytes_needed() > 0 {

src/client/options.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,9 @@ pub struct Socks5Proxy {
430430
/// A username/password pair to authenticate to the proxy.
431431
pub authentication: Option<(String, String)>,
432432
}
433+
#[cfg(not(feature = "socks5-proxy"))]
434+
#[derive(Clone)]
435+
pub(crate) struct Socks5Proxy;
433436

434437
#[cfg(feature = "socks5-proxy")]
435438
impl Socks5Proxy {

src/cmap/establish.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,17 @@ pub(crate) mod handshake;
22

33
use std::time::{Duration, Instant};
44

5-
use self::handshake::{Handshaker, HandshakerOptions};
65
#[cfg(test)]
76
use crate::options::ClientOptions;
8-
#[cfg(feature = "socks5-proxy")]
9-
use crate::options::Socks5Proxy;
107
use crate::{
118
client::{
129
auth::Credential,
1310
options::{ServerAddress, TlsOptions},
1411
},
1512
error::{Error as MongoError, ErrorKind, Result},
1613
hello::HelloReply,
14+
options::Socks5Proxy,
15+
runtime,
1716
runtime::{stream::DEFAULT_CONNECT_TIMEOUT, AsyncStream, TlsConfig},
1817
sdam::{topology::TopologySpec, HandshakePhase},
1918
};
@@ -29,6 +28,8 @@ use super::{
2928
PoolGeneration,
3029
};
3130

31+
use handshake::{Handshaker, HandshakerOptions};
32+
3233
/// Contains the logic to establish a connection, including handshaking, authenticating, and
3334
/// potentially more.
3435
#[derive(Clone)]
@@ -41,21 +42,20 @@ pub(crate) struct ConnectionEstablisher {
4142

4243
connect_timeout: Duration,
4344

45+
proxy: Option<Socks5Proxy>,
46+
4447
#[cfg(test)]
4548
test_patch_reply: Option<fn(&mut Result<HelloReply>)>,
46-
47-
#[cfg(feature = "socks5-proxy")]
48-
proxy: Option<Socks5Proxy>,
4949
}
5050

5151
pub(crate) struct EstablisherOptions {
5252
handshake_options: HandshakerOptions,
5353
tls_options: Option<TlsOptions>,
5454
connect_timeout: Option<Duration>,
55+
#[allow(unused)]
56+
proxy: Option<Socks5Proxy>,
5557
#[cfg(test)]
5658
pub(crate) test_patch_reply: Option<fn(&mut Result<HelloReply>)>,
57-
#[cfg(feature = "socks5-proxy")]
58-
proxy: Option<Socks5Proxy>,
5959
}
6060

6161
impl From<&TopologySpec> for EstablisherOptions {
@@ -68,6 +68,8 @@ impl From<&TopologySpec> for EstablisherOptions {
6868
test_patch_reply: None,
6969
#[cfg(feature = "socks5-proxy")]
7070
proxy: spec.options.socks5_proxy.clone(),
71+
#[cfg(not(feature = "socks5-proxy"))]
72+
proxy: None,
7173
}
7274
}
7375
}
@@ -104,18 +106,17 @@ impl ConnectionEstablisher {
104106
test_patch_reply: options.test_patch_reply,
105107
#[cfg(feature = "socks5-proxy")]
106108
proxy: options.proxy,
109+
#[cfg(not(feature = "socks5-proxy"))]
110+
proxy: None,
107111
})
108112
}
109113

110114
async fn make_stream(&self, address: ServerAddress) -> Result<AsyncStream> {
111-
AsyncStream::connect(
112-
address,
113-
self.tls_config.as_ref(),
115+
runtime::timeout(
114116
self.connect_timeout,
115-
#[cfg(feature = "socks5-proxy")]
116-
self.proxy.as_ref(),
117+
AsyncStream::connect(address, self.tls_config.as_ref(), self.proxy.as_ref()),
117118
)
118-
.await
119+
.await?
119120
}
120121

121122
/// Establishes a connection.

src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -820,6 +820,7 @@ impl ErrorKind {
820820
ErrorKind::Encryption(..) => "Encryption",
821821
ErrorKind::Custom(..) => "Custom",
822822
ErrorKind::Shutdown => "Shutdown",
823+
#[cfg(feature = "socks5-proxy")]
823824
ErrorKind::ProxyConnect { .. } => "ProxyConnect",
824825
}
825826
}

src/runtime/stream.rs

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@ use tokio::{
1111
net::TcpStream,
1212
};
1313

14-
#[cfg(feature = "socks5-proxy")]
15-
use crate::options::Socks5Proxy;
1614
use crate::{
1715
error::{Error, ErrorKind, Result},
18-
options::ServerAddress,
16+
options::{ServerAddress, Socks5Proxy},
1917
runtime,
2018
};
2119

@@ -98,47 +96,42 @@ impl AsyncStream {
9896
pub(crate) async fn connect(
9997
address: ServerAddress,
10098
tls_cfg: Option<&TlsConfig>,
101-
connect_timeout: Duration,
102-
#[cfg(feature = "socks5-proxy")] proxy: Option<&Socks5Proxy>,
99+
#[allow(unused)] proxy: Option<&Socks5Proxy>,
103100
) -> Result<Self> {
104-
let connect = async {
105-
match &address {
106-
#[allow(unused)] // port is unused when socks5-proxy is not enabled
107-
ServerAddress::Tcp { host, port } => {
108-
#[cfg(feature = "socks5-proxy")]
109-
if let Some(proxy) = proxy {
110-
let inner = proxy.connect(host.clone(), *port).await?;
111-
return match tls_cfg {
112-
Some(cfg) => {
113-
Ok(AsyncStream::Socks5Tls(tls_connect(host, inner, cfg).await?))
114-
}
115-
None => Ok(AsyncStream::Socks5(inner)),
116-
};
117-
}
118-
119-
let resolved: Vec<_> = runtime::resolve_address(&address).await?.collect();
120-
if resolved.is_empty() {
121-
return Err(ErrorKind::DnsResolve {
122-
message: format!("No DNS results for domain {address}"),
101+
match &address {
102+
#[allow(unused)] // port is unused when socks5-proxy is not enabled
103+
ServerAddress::Tcp { host, port } => {
104+
#[cfg(feature = "socks5-proxy")]
105+
if let Some(proxy) = proxy {
106+
let inner = proxy.connect(host.clone(), *port).await?;
107+
return match tls_cfg {
108+
Some(cfg) => {
109+
Ok(AsyncStream::Socks5Tls(tls_connect(host, inner, cfg).await?))
123110
}
124-
.into());
125-
}
126-
let inner = tcp_connect(resolved).await?;
111+
None => Ok(AsyncStream::Socks5(inner)),
112+
};
113+
}
127114

128-
// If there are TLS options, wrap the inner stream in an AsyncTlsStream.
129-
match tls_cfg {
130-
Some(cfg) => Ok(AsyncStream::Tls(tls_connect(host, inner, cfg).await?)),
131-
None => Ok(AsyncStream::Tcp(inner)),
115+
let resolved: Vec<_> = runtime::resolve_address(&address).await?.collect();
116+
if resolved.is_empty() {
117+
return Err(ErrorKind::DnsResolve {
118+
message: format!("No DNS results for domain {address}"),
132119
}
120+
.into());
133121
}
134-
#[cfg(unix)]
135-
ServerAddress::Unix { path } => Ok(AsyncStream::Unix(
136-
tokio::net::UnixStream::connect(path.as_path()).await?,
137-
)),
138-
}
139-
};
122+
let inner = tcp_connect(resolved).await?;
140123

141-
runtime::timeout(connect_timeout, connect).await?
124+
// If there are TLS options, wrap the inner stream in an AsyncTlsStream.
125+
match tls_cfg {
126+
Some(cfg) => Ok(AsyncStream::Tls(tls_connect(host, inner, cfg).await?)),
127+
None => Ok(AsyncStream::Tcp(inner)),
128+
}
129+
}
130+
#[cfg(unix)]
131+
ServerAddress::Unix { path } => Ok(AsyncStream::Unix(
132+
tokio::net::UnixStream::connect(path.as_path()).await?,
133+
)),
134+
}
142135
}
143136
}
144137

0 commit comments

Comments
 (0)