Skip to content

Commit 84d2d9d

Browse files
committed
feat(thrift): add fallback address option for shmipc
1 parent 01a614b commit 84d2d9d

File tree

4 files changed

+192
-0
lines changed

4 files changed

+192
-0
lines changed

volo-thrift/src/client/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use motore::{
1717
};
1818
use pilota::thrift::TMessageType;
1919
use tokio::time::Duration;
20+
#[cfg(feature = "shmipc")]
21+
use volo::net::shmipc_fallback::ShmipcMakeTransportWithFallback;
2022
use volo::{
2123
FastStr,
2224
client::WithOptService,
@@ -495,6 +497,37 @@ impl<IL, OL, C, Req, Resp, MkT, MkC, LB> ClientBuilder<IL, OL, C, Req, Resp, MkT
495497
}
496498
}
497499

500+
#[cfg(feature = "shmipc")]
501+
/// Set the address for the client with shmipc fallback.
502+
pub fn address_with_fallback<A1: Into<Address>, A2: Into<Address>>(
503+
self,
504+
shmipc_addr: A1,
505+
fallback_addr: A2,
506+
) -> ClientBuilder<IL, OL, C, Req, Resp, ShmipcMakeTransportWithFallback, MkC, LB> {
507+
ClientBuilder {
508+
address: Some(shmipc_addr.into()),
509+
make_transport: ShmipcMakeTransportWithFallback::new(
510+
DefaultMakeTransport::default(),
511+
DefaultMakeTransport::default(),
512+
fallback_addr.into(),
513+
),
514+
config: self.config,
515+
pool: self.pool,
516+
caller_name: self.caller_name,
517+
callee_name: self.callee_name,
518+
inner_layer: self.inner_layer,
519+
outer_layer: self.outer_layer,
520+
mk_client: self.mk_client,
521+
_marker: PhantomData,
522+
make_codec: self.make_codec,
523+
mk_lb: self.mk_lb,
524+
disable_timeout_layer: self.disable_timeout_layer,
525+
enable_biz_error: self.enable_biz_error,
526+
#[cfg(feature = "multiplex")]
527+
multiplex: self.multiplex,
528+
}
529+
}
530+
498531
#[doc(hidden)]
499532
pub fn get_callee_name(&self) -> &FastStr {
500533
&self.callee_name

volo-thrift/src/server/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use tokio::{
1616
sync::Notify,
1717
};
1818
use tracing::{info, trace};
19+
#[cfg(feature = "shmipc")]
20+
use volo::net::shmipc_fallback::ShmipcAddressWithFallback;
1921
use volo::{
2022
net::{
2123
Address,
@@ -408,6 +410,38 @@ impl<S, L, Req, MkC, SP> Server<S, L, Req, MkC, SP> {
408410
Ok(())
409411
}
410412

413+
#[cfg(feature = "shmipc")]
414+
/// Run the server with shmipc and TCP fallback support.
415+
///
416+
/// The server will listen on both the shmipc address and the fallback TCP address.
417+
/// Clients can connect via either transport.
418+
pub async fn run_with_fallback<A1, A2>(
419+
self,
420+
shmipc_addr: A1,
421+
fallback_addr: A2,
422+
) -> Result<(), BoxError>
423+
where
424+
A1: Into<Address>,
425+
A2: volo::net::incoming::MakeIncoming + Send,
426+
A2::Incoming: Send,
427+
L: Layer<BoxService<ServerContext, Req, S::Response, crate::ServerError>>,
428+
MkC: MakeCodec<OwnedReadHalf, OwnedWriteHalf>,
429+
L::Service: Service<ServerContext, Req, Response = S::Response, Error = crate::ServerError>
430+
+ Send
431+
+ 'static
432+
+ Sync,
433+
S: Service<ServerContext, Req, Error = crate::ServerError> + Send + 'static + Sync,
434+
S::Response: EntryMessage + Send + 'static + Sync,
435+
Req: EntryMessage + Send + 'static,
436+
SP: SpanProvider,
437+
{
438+
self.run(ShmipcAddressWithFallback {
439+
shmipc_addr: shmipc_addr.into(),
440+
default_mi: fallback_addr,
441+
})
442+
.await
443+
}
444+
411445
#[cfg(feature = "multiplex")]
412446
/// Use multiplexing to handle multiple requests in one connection.
413447
///

volo/src/net/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ pub mod ext;
44
pub mod incoming;
55
#[cfg(feature = "shmipc")]
66
pub mod shmipc;
7+
#[cfg(feature = "shmipc")]
8+
pub mod shmipc_fallback;
79
#[cfg(feature = "__tls")]
810
#[cfg_attr(docsrs, doc(cfg(any(feature = "rustls", feature = "native-tls"))))]
911
pub mod tls;

volo/src/net/shmipc_fallback.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use std::io;
2+
3+
use futures::{FutureExt, future::Either};
4+
5+
use super::{
6+
Address, DefaultIncoming, MakeIncoming,
7+
conn::{Conn, OwnedReadHalf, OwnedWriteHalf},
8+
dial::{DefaultMakeTransport, MakeTransport},
9+
incoming::Incoming,
10+
};
11+
12+
pub struct ShmipcAddressWithFallback<MI> {
13+
pub shmipc_addr: Address,
14+
pub default_mi: MI,
15+
}
16+
17+
impl<MI, I> MakeIncoming for ShmipcAddressWithFallback<MI>
18+
where
19+
MI: MakeIncoming<Incoming = I> + Send,
20+
I: Incoming + Send,
21+
{
22+
type Incoming = ShmipcIncoming<I>;
23+
24+
async fn make_incoming(self) -> io::Result<Self::Incoming> {
25+
Ok(ShmipcIncoming {
26+
shmipc_listener: self.shmipc_addr.make_incoming().await?,
27+
default_incoming: self.default_mi.make_incoming().await?,
28+
})
29+
}
30+
}
31+
32+
#[derive(Debug)]
33+
pub struct ShmipcIncoming<I> {
34+
shmipc_listener: DefaultIncoming,
35+
default_incoming: I,
36+
}
37+
38+
impl<I> Incoming for ShmipcIncoming<I>
39+
where
40+
I: Incoming,
41+
{
42+
async fn accept(&mut self) -> io::Result<Option<Conn>> {
43+
self.try_next().await
44+
}
45+
}
46+
47+
impl<I> ShmipcIncoming<I>
48+
where
49+
I: Incoming,
50+
{
51+
async fn try_next(&mut self) -> io::Result<Option<Conn>> {
52+
let shmipc_conn = self.shmipc_listener.accept().fuse();
53+
let default_conn = self.default_incoming.accept().fuse();
54+
futures::pin_mut!(shmipc_conn, default_conn);
55+
match futures::future::select(shmipc_conn, default_conn).await {
56+
Either::Left((conn, _)) => {
57+
tracing::trace!("recv a conn from shmipc");
58+
conn
59+
}
60+
Either::Right((conn, _)) => {
61+
tracing::trace!("recv a conn from default");
62+
conn
63+
}
64+
}
65+
}
66+
}
67+
68+
#[derive(Clone, Debug)]
69+
pub struct ShmipcMakeTransportWithFallback {
70+
pub shmipc_mkt: DefaultMakeTransport,
71+
pub default_mkt: DefaultMakeTransport,
72+
pub fallback_addr: Address,
73+
}
74+
75+
impl ShmipcMakeTransportWithFallback {
76+
pub fn new(
77+
shmipc: DefaultMakeTransport,
78+
default_mkt: DefaultMakeTransport,
79+
fallback_addr: Address,
80+
) -> Self {
81+
Self {
82+
shmipc_mkt: shmipc,
83+
default_mkt,
84+
fallback_addr,
85+
}
86+
}
87+
}
88+
89+
impl MakeTransport for ShmipcMakeTransportWithFallback {
90+
type ReadHalf = OwnedReadHalf;
91+
type WriteHalf = OwnedWriteHalf;
92+
93+
async fn make_transport(
94+
&self,
95+
mut addr: Address,
96+
) -> io::Result<(Self::ReadHalf, Self::WriteHalf)> {
97+
if addr.is_shmipc() {
98+
match self.shmipc_mkt.make_transport(addr).await {
99+
Ok(ret) => return Ok(ret),
100+
Err(e) => {
101+
tracing::info!(
102+
"failed to connect to shmipc target: {e}, fallback to default target"
103+
);
104+
addr = self.fallback_addr.clone();
105+
}
106+
}
107+
}
108+
109+
self.default_mkt.make_transport(addr).await
110+
}
111+
112+
fn set_connect_timeout(&mut self, timeout: Option<std::time::Duration>) {
113+
self.default_mkt.set_connect_timeout(timeout);
114+
}
115+
116+
fn set_read_timeout(&mut self, timeout: Option<std::time::Duration>) {
117+
self.default_mkt.set_read_timeout(timeout);
118+
}
119+
120+
fn set_write_timeout(&mut self, timeout: Option<std::time::Duration>) {
121+
self.default_mkt.set_write_timeout(timeout);
122+
}
123+
}

0 commit comments

Comments
 (0)