Skip to content

Commit d003266

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Remove AllocAssignedAddr (#1565)
Summary: Pull Request resolved: #1565 Address comment: https://www.internalfb.com/diff/D84614744?dst_version_fbid=1184641230485837&transaction_fbid=1876636203270171 Reviewed By: shayne-fletcher Differential Revision: D84818594 fbshipit-source-id: 28fe479ca9cf6c9836c67928805db0e3e90f0b39
1 parent dbe40ec commit d003266

File tree

3 files changed

+87
-98
lines changed

3 files changed

+87
-98
lines changed

hyperactor_mesh/src/alloc.rs

Lines changed: 64 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,8 @@ pub trait Alloc {
343343
}
344344

345345
/// The address that should be used to serve the client's router.
346-
fn client_router_addr(&self) -> AllocAssignedAddr {
347-
AllocAssignedAddr(ChannelAddr::any(self.transport()))
346+
fn client_router_addr(&self) -> ChannelAddr {
347+
ChannelAddr::any(self.transport())
348348
}
349349
}
350350

@@ -479,92 +479,80 @@ impl<A: ?Sized + Send + Alloc> AllocExt for A {
479479
}
480480
}
481481

482-
/// A new type to indicate this addr is assigned by alloc.
483-
#[derive(Debug, Clone, Serialize, Deserialize)]
484-
pub struct AllocAssignedAddr(ChannelAddr);
485-
486-
impl AllocAssignedAddr {
487-
pub(crate) fn new(addr: ChannelAddr) -> AllocAssignedAddr {
488-
AllocAssignedAddr(addr)
482+
/// If addr is Tcp or Metatls, use its IP address or hostname to create
483+
/// a new addr with port unspecified.
484+
///
485+
/// for other types of addr, return "any" address.
486+
pub(crate) fn with_unspecified_port_or_any(addr: &ChannelAddr) -> ChannelAddr {
487+
match addr {
488+
ChannelAddr::Tcp(socket) => {
489+
let mut new_socket = socket.clone();
490+
new_socket.set_port(0);
491+
ChannelAddr::Tcp(new_socket)
492+
}
493+
ChannelAddr::MetaTls(MetaTlsAddr::Socket(socket)) => {
494+
let mut new_socket = socket.clone();
495+
new_socket.set_port(0);
496+
ChannelAddr::MetaTls(MetaTlsAddr::Socket(new_socket))
497+
}
498+
ChannelAddr::MetaTls(MetaTlsAddr::Host { hostname, port: _ }) => {
499+
ChannelAddr::MetaTls(MetaTlsAddr::Host {
500+
hostname: hostname.clone(),
501+
port: 0,
502+
})
503+
}
504+
_ => addr.transport().any(),
489505
}
506+
}
490507

491-
/// If addr is Tcp or Metatls, use its IP address or hostname to create
492-
/// a new addr with port unspecified.
493-
///
494-
/// for other types of addr, return "any" address.
495-
pub(crate) fn with_unspecified_port_or_any(addr: &ChannelAddr) -> AllocAssignedAddr {
496-
let new_addr = match addr {
497-
ChannelAddr::Tcp(socket) => {
498-
let mut new_socket = socket.clone();
499-
new_socket.set_port(0);
500-
ChannelAddr::Tcp(new_socket)
501-
}
502-
ChannelAddr::MetaTls(MetaTlsAddr::Socket(socket)) => {
503-
let mut new_socket = socket.clone();
504-
new_socket.set_port(0);
505-
ChannelAddr::MetaTls(MetaTlsAddr::Socket(new_socket))
506-
}
507-
ChannelAddr::MetaTls(MetaTlsAddr::Host { hostname, port: _ }) => {
508-
ChannelAddr::MetaTls(MetaTlsAddr::Host {
509-
hostname: hostname.clone(),
510-
port: 0,
511-
})
512-
}
513-
_ => addr.transport().any(),
508+
pub(crate) fn serve_with_config<M: RemoteMessage>(
509+
mut serve_addr: ChannelAddr,
510+
) -> anyhow::Result<(ChannelAddr, ChannelRx<M>)> {
511+
fn set_as_inaddr_any(original: &mut SocketAddr) {
512+
let inaddr_any: IpAddr = match &original {
513+
SocketAddr::V4(_) => Ipv4Addr::UNSPECIFIED.into(),
514+
SocketAddr::V6(_) => Ipv6Addr::UNSPECIFIED.into(),
514515
};
515-
AllocAssignedAddr(new_addr)
516+
original.set_ip(inaddr_any);
516517
}
517518

518-
pub(crate) fn serve_with_config<M: RemoteMessage>(
519-
self,
520-
) -> anyhow::Result<(ChannelAddr, ChannelRx<M>)> {
521-
fn set_as_inaddr_any(original: &mut SocketAddr) {
522-
let inaddr_any: IpAddr = match &original {
523-
SocketAddr::V4(_) => Ipv4Addr::UNSPECIFIED.into(),
524-
SocketAddr::V6(_) => Ipv6Addr::UNSPECIFIED.into(),
525-
};
526-
original.set_ip(inaddr_any);
527-
}
528-
529-
let use_inaddr_any = config::global::get(REMOTE_ALLOC_BIND_TO_INADDR_ANY);
530-
let mut bind_to = self.0;
531-
let mut original_ip: Option<IpAddr> = None;
532-
match &mut bind_to {
533-
ChannelAddr::Tcp(socket) => {
534-
original_ip = Some(socket.ip().clone());
535-
if use_inaddr_any {
536-
set_as_inaddr_any(socket);
537-
tracing::debug!("binding {} to INADDR_ANY", original_ip.as_ref().unwrap(),);
538-
}
539-
if socket.port() == 0 {
540-
socket.set_port(next_allowed_port(socket.ip().clone())?);
541-
}
519+
let use_inaddr_any = config::global::get(REMOTE_ALLOC_BIND_TO_INADDR_ANY);
520+
let mut original_ip: Option<IpAddr> = None;
521+
match &mut serve_addr {
522+
ChannelAddr::Tcp(socket) => {
523+
original_ip = Some(socket.ip().clone());
524+
if use_inaddr_any {
525+
set_as_inaddr_any(socket);
526+
tracing::debug!("binding {} to INADDR_ANY", original_ip.as_ref().unwrap(),);
542527
}
543-
_ => {
544-
if use_inaddr_any {
545-
tracing::debug!(
546-
"can only bind to INADDR_ANY for TCP; got transport {}, addr {}",
547-
bind_to.transport(),
548-
bind_to
549-
);
550-
}
528+
if socket.port() == 0 {
529+
socket.set_port(next_allowed_port(socket.ip().clone())?);
551530
}
552-
};
531+
}
532+
_ => {
533+
if use_inaddr_any {
534+
tracing::debug!(
535+
"can only bind to INADDR_ANY for TCP; got transport {}, addr {}",
536+
serve_addr.transport(),
537+
serve_addr
538+
);
539+
}
540+
}
541+
};
553542

554-
let (mut bound, rx) = channel::serve(bind_to)?;
543+
let (mut bound, rx) = channel::serve(serve_addr)?;
555544

556-
// Restore the original IP address if we used INADDR_ANY.
557-
match &mut bound {
558-
ChannelAddr::Tcp(socket) => {
559-
if use_inaddr_any {
560-
socket.set_ip(original_ip.unwrap());
561-
}
545+
// Restore the original IP address if we used INADDR_ANY.
546+
match &mut bound {
547+
ChannelAddr::Tcp(socket) => {
548+
if use_inaddr_any {
549+
socket.set_ip(original_ip.unwrap());
562550
}
563-
_ => (),
564551
}
565-
566-
Ok((bound, rx))
552+
_ => (),
567553
}
554+
555+
Ok((bound, rx))
568556
}
569557

570558
enum AllowedPorts {

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ use tokio_stream::wrappers::WatchStream;
5959
use tokio_util::sync::CancellationToken;
6060

6161
use crate::alloc::Alloc;
62-
use crate::alloc::AllocAssignedAddr;
6362
use crate::alloc::AllocConstraints;
6463
use crate::alloc::AllocSpec;
6564
use crate::alloc::Allocator;
@@ -70,6 +69,8 @@ use crate::alloc::ProcessAllocator;
7069
use crate::alloc::REMOTE_ALLOC_BOOTSTRAP_ADDR;
7170
use crate::alloc::process::CLIENT_TRACE_ID_LABEL;
7271
use crate::alloc::process::ClientContext;
72+
use crate::alloc::serve_with_config;
73+
use crate::alloc::with_unspecified_port_or_any;
7374
use crate::shortuuid::ShortUuid;
7475

7576
/// Control messages sent from remote process allocator to local allocator.
@@ -91,7 +92,7 @@ pub enum RemoteProcessAllocatorMessage {
9192
/// the client_context will go to the message header instead
9293
client_context: Option<ClientContext>,
9394
/// The address allocator should use for its forwarder.
94-
forwarder_addr: AllocAssignedAddr,
95+
forwarder_addr: ChannelAddr,
9596
},
9697
/// Stop allocation.
9798
Stop,
@@ -317,11 +318,11 @@ impl RemoteProcessAllocator {
317318
bootstrap_addr: ChannelAddr,
318319
hosts: Vec<String>,
319320
cancel_token: CancellationToken,
320-
forwarder_addr: AllocAssignedAddr,
321+
forwarder_addr: ChannelAddr,
321322
) {
322323
tracing::info!("handle allocation request, bootstrap_addr: {bootstrap_addr}");
323324
// start proc message forwarder
324-
let (forwarder_addr, forwarder_rx) = match forwarder_addr.serve_with_config() {
325+
let (forwarder_addr, forwarder_rx) = match serve_with_config(forwarder_addr) {
325326
Ok(v) => v,
326327
Err(e) => {
327328
tracing::error!("failed to to bootstrap forwarder actor: {}", e);
@@ -626,11 +627,11 @@ impl RemoteProcessAlloc {
626627
initializer: impl RemoteProcessAllocInitializer + Send + Sync + 'static,
627628
) -> Result<Self, anyhow::Error> {
628629
let alloc_serve_addr = match config::global::try_get_cloned(REMOTE_ALLOC_BOOTSTRAP_ADDR) {
629-
Some(addr_str) => AllocAssignedAddr::new(addr_str.parse()?),
630-
None => AllocAssignedAddr::new(ChannelAddr::any(spec.transport.clone())),
630+
Some(addr_str) => addr_str.parse()?,
631+
None => ChannelAddr::any(spec.transport.clone()),
631632
};
632633

633-
let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config()?;
634+
let (bootstrap_addr, rx) = serve_with_config(alloc_serve_addr)?;
634635

635636
tracing::info!(
636637
"starting alloc for {} on: {}",
@@ -825,7 +826,7 @@ impl RemoteProcessAlloc {
825826
// its host's private IP address, while its known addres to
826827
// alloc is a public IP address. In some environment, that
827828
// could lead to port unreachable error.
828-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&remote_addr),
829+
forwarder_addr: with_unspecified_port_or_any(&remote_addr),
829830
};
830831
tracing::info!(
831832
name = message.as_ref(),
@@ -1208,8 +1209,8 @@ impl Alloc for RemoteProcessAlloc {
12081209
/// one could lead to port unreachable error.
12091210
///
12101211
/// For other channel types, this method still uses ChannelAddr::any.
1211-
fn client_router_addr(&self) -> AllocAssignedAddr {
1212-
AllocAssignedAddr::with_unspecified_port_or_any(&self.bootstrap_addr)
1212+
fn client_router_addr(&self) -> ChannelAddr {
1213+
with_unspecified_port_or_any(&self.bootstrap_addr)
12131214
}
12141215
}
12151216

@@ -1236,6 +1237,7 @@ mod test {
12361237
use crate::alloc::MockAllocWrapper;
12371238
use crate::alloc::MockAllocator;
12381239
use crate::alloc::ProcStopReason;
1240+
use crate::alloc::with_unspecified_port_or_any;
12391241
use crate::proc_mesh::mesh_agent::ProcMeshAgent;
12401242

12411243
async fn read_all_created(rx: &mut ChannelRx<RemoteProcessProcStateMessage>, alloc_len: usize) {
@@ -1372,7 +1374,7 @@ mod test {
13721374
bootstrap_addr,
13731375
hosts: vec![],
13741376
client_context: None,
1375-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1377+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
13761378
})
13771379
.await
13781380
.unwrap();
@@ -1526,7 +1528,7 @@ mod test {
15261528
bootstrap_addr,
15271529
hosts: vec![],
15281530
client_context: None,
1529-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1531+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
15301532
})
15311533
.await
15321534
.unwrap();
@@ -1631,7 +1633,7 @@ mod test {
16311633
bootstrap_addr: bootstrap_addr.clone(),
16321634
hosts: vec![],
16331635
client_context: None,
1634-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1636+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
16351637
})
16361638
.await
16371639
.unwrap();
@@ -1656,7 +1658,7 @@ mod test {
16561658
bootstrap_addr,
16571659
hosts: vec![],
16581660
client_context: None,
1659-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1661+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
16601662
})
16611663
.await
16621664
.unwrap();
@@ -1754,7 +1756,7 @@ mod test {
17541756
bootstrap_addr,
17551757
hosts: vec![],
17561758
client_context: None,
1757-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1759+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
17581760
})
17591761
.await
17601762
.unwrap();
@@ -1846,7 +1848,7 @@ mod test {
18461848
bootstrap_addr,
18471849
hosts: vec![],
18481850
client_context: None,
1849-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1851+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
18501852
})
18511853
.await
18521854
.unwrap();
@@ -1941,7 +1943,7 @@ mod test {
19411943
client_context: Some(ClientContext {
19421944
trace_id: test_trace_id.to_string(),
19431945
}),
1944-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
1946+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
19451947
})
19461948
.await
19471949
.unwrap();
@@ -2016,7 +2018,7 @@ mod test {
20162018
bootstrap_addr,
20172019
hosts: vec![],
20182020
client_context: None,
2019-
forwarder_addr: AllocAssignedAddr::with_unspecified_port_or_any(&tx.addr()),
2021+
forwarder_addr: with_unspecified_port_or_any(&tx.addr()),
20202022
})
20212023
.await
20222024
.unwrap();

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use crate::alloc::AllocatedProc;
6868
use crate::alloc::AllocatorError;
6969
use crate::alloc::ProcState;
7070
use crate::alloc::ProcStopReason;
71+
use crate::alloc::serve_with_config;
7172
use crate::assign::Ranks;
7273
use crate::comm::CommActorMode;
7374
use crate::proc_mesh::mesh_agent::GspawnResult;
@@ -379,10 +380,8 @@ impl ProcMesh {
379380
);
380381

381382
// Ensure that the router is served so that agents may reach us.
382-
let (router_channel_addr, router_rx) = alloc
383-
.client_router_addr()
384-
.serve_with_config()
385-
.map_err(AllocatorError::Other)?;
383+
let (router_channel_addr, router_rx) =
384+
serve_with_config(alloc.client_router_addr()).map_err(AllocatorError::Other)?;
386385
router.serve(router_rx);
387386
tracing::info!("router channel started listening on addr: {router_channel_addr}");
388387

0 commit comments

Comments
 (0)