Skip to content

[hyperactor] channel: allow ":" as a channel-type delimiter #798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 39 additions & 51 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,20 +276,20 @@ pub type Port = u16;

/// The type of a channel address, used to multiplex different underlying
/// channel implementations. ChannelAddrs also have a concrete syntax:
/// the address type ("tcp" or "local"), followed by "!", and an address
/// parseable to that type. Addresses without a specified type default to
/// "tcp". For example:
/// the address type (e.g., "tcp" or "local"), followed by ":", and an address
/// parseable to that type. For example:
///
/// - `tcp!127.0.0.1:1234` - localhost port 1234 over TCP
/// - `tcp!192.168.0.1:1111` - 192.168.0.1 port 1111 over TCP
/// - `local!123` - the (in-process) local port 123
/// - `tcp:127.0.0.1:1234` - localhost port 1234 over TCP
/// - `tcp:192.168.0.1:1111` - 192.168.0.1 port 1111 over TCP
/// - `local:123` - the (in-process) local port 123
/// - `unix:/some/path` - the Unix socket at `/some/path`
///
/// Both local and TCP ports 0 are reserved to indicate "any available
/// port" when serving.
///
/// ```
/// # use hyperactor::channel::ChannelAddr;
/// let addr: ChannelAddr = "tcp!127.0.0.1:1234".parse().unwrap();
/// let addr: ChannelAddr = "tcp:127.0.0.1:1234".parse().unwrap();
/// let ChannelAddr::Tcp(socket_addr) = addr else {
/// panic!()
/// };
Expand Down Expand Up @@ -401,11 +401,11 @@ impl ChannelAddr {
impl fmt::Display for ChannelAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Tcp(addr) => write!(f, "tcp!{}", addr),
Self::MetaTls(hostname, port) => write!(f, "metatls!{}:{}", hostname, port),
Self::Local(index) => write!(f, "local!{}", index),
Self::Sim(sim_addr) => write!(f, "sim!{}", sim_addr),
Self::Unix(addr) => write!(f, "unix!{}", addr),
Self::Tcp(addr) => write!(f, "tcp:{}", addr),
Self::MetaTls(hostname, port) => write!(f, "metatls:{}:{}", hostname, port),
Self::Local(index) => write!(f, "local:{}", index),
Self::Sim(sim_addr) => write!(f, "sim:{}", sim_addr),
Self::Unix(addr) => write!(f, "unix:{}", addr),
}
}
}
Expand All @@ -414,7 +414,8 @@ impl FromStr for ChannelAddr {
type Err = anyhow::Error;

fn from_str(addr: &str) -> Result<Self, Self::Err> {
match addr.split_once('!') {
// "!" is the legacy delimiter; ":" is preferred
match addr.split_once('!').or_else(|| addr.split_once(':')) {
Some(("local", rest)) => rest
.parse::<u64>()
.map(Self::Local)
Expand Down Expand Up @@ -604,73 +605,60 @@ mod tests {
fn test_channel_addr() {
let cases_ok = vec![
(
"tcp![::1]:1234",
"tcp<DELIM>[::1]:1234",
ChannelAddr::Tcp(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
1234,
)),
),
(
"tcp!127.0.0.1:8080",
"tcp<DELIM>127.0.0.1:8080",
ChannelAddr::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
8080,
)),
),
("local!123", ChannelAddr::Local(123)),
#[cfg(target_os = "linux")]
("local<DELIM>123", ChannelAddr::Local(123)),
(
"unix!@yolo",
"unix<DELIM>@yolo",
ChannelAddr::Unix(
unix::SocketAddr::from_abstract_name("yolo")
.expect("can't make socket from abstract name"),
),
),
(
"unix!/cool/socket-path",
"unix<DELIM>/cool/socket-path",
ChannelAddr::Unix(
unix::SocketAddr::from_pathname("/cool/socket-path")
.expect("can't make socket from path"),
),
),
];

let src_ok = vec![
(
"tcp![::1]:1235",
ChannelAddr::Tcp(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
1235,
)),
),
(
"tcp!127.0.0.1:8081",
ChannelAddr::Tcp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
8081,
)),
),
("local!124", ChannelAddr::Local(124)),
];

for (raw, parsed) in cases_ok.clone() {
assert_eq!(raw.parse::<ChannelAddr>().unwrap(), parsed);
for delim in ["!", ":"] {
let raw = raw.replace("<DELIM>", delim);
assert_eq!(raw.parse::<ChannelAddr>().unwrap(), parsed);
}
}

for (raw, parsed) in cases_ok.iter().zip(src_ok.clone()).map(|(a, _)| {
(
format!("sim!{}", a.0),
ChannelAddr::Sim(SimAddr::new(a.1.clone()).unwrap()),
)
}) {
assert_eq!(raw.parse::<ChannelAddr>().unwrap(), parsed);
for (raw, parsed) in cases_ok {
for delim in ["!", ":"] {
// We don't allow mixing and matching delims
let raw = format!("sim{}{}", delim, raw.replace("<DELIM>", delim));
assert_eq!(
raw.parse::<ChannelAddr>().unwrap(),
ChannelAddr::Sim(SimAddr::new(parsed.clone()).unwrap())
);
}
}

let cases_err = vec![
("tcp!abcdef..123124", "invalid socket address syntax"),
("xxx!foo", "no such channel type: xxx"),
("tcp:abcdef..123124", "invalid socket address syntax"),
("xxx:foo", "no such channel type: xxx"),
("127.0.0.1", "no channel type specified"),
("local!abc", "invalid digit found in string"),
("local:abc", "invalid digit found in string"),
];

for (raw, error) in cases_err {
Expand Down Expand Up @@ -751,13 +739,13 @@ mod tests {

let rng = rand::thread_rng();
vec![
"tcp![::1]:0".parse().unwrap(),
"local!0".parse().unwrap(),
"tcp:[::1]:0".parse().unwrap(),
"local:0".parse().unwrap(),
#[cfg(target_os = "linux")]
"unix!".parse().unwrap(),
"unix:".parse().unwrap(),
#[cfg(target_os = "linux")]
format!(
"unix!@{}",
"unix:@{}",
rng.sample_iter(Uniform::new_inclusive('a', 'z'))
.take(10)
.collect::<String>()
Expand Down
34 changes: 17 additions & 17 deletions hyperactor/src/channel/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ mod tests {

#[tokio::test]
async fn test_sim_basic() {
let dst_ok = vec!["tcp![::1]:1234", "tcp!127.0.0.1:8080", "local!123"];
let srcs_ok = vec!["tcp![::2]:1234", "tcp!127.0.0.2:8080", "local!124"];
let dst_ok = vec!["tcp:[::1]:1234", "tcp:127.0.0.1:8080", "local:123"];
let srcs_ok = vec!["tcp:[::2]:1234", "tcp:127.0.0.2:8080", "local:124"];

start();

Expand Down Expand Up @@ -460,42 +460,42 @@ mod tests {

#[tokio::test]
async fn test_parse_sim_addr() {
let sim_addr = "sim!unix!@dst";
let sim_addr = "sim!unix:@dst";
let result = sim_addr.parse();
assert!(result.is_ok());
let ChannelAddr::Sim(sim_addr) = result.unwrap() else {
panic!("Expected a sim address");
};
assert!(sim_addr.src().is_none());
assert_eq!(sim_addr.addr().to_string(), "unix!@dst");
assert_eq!(sim_addr.addr().to_string(), "unix:@dst");

let sim_addr = "sim!unix!@src,unix!@dst";
let sim_addr = "sim!unix:@src,unix:@dst";
let result = sim_addr.parse();
assert!(result.is_ok());
let ChannelAddr::Sim(sim_addr) = result.unwrap() else {
panic!("Expected a sim address");
};
assert!(sim_addr.src().is_some());
assert_eq!(sim_addr.addr().to_string(), "unix!@dst");
assert_eq!(sim_addr.addr().to_string(), "unix:@dst");
}

#[tokio::test]
async fn test_realtime_frontier() {
start();

tokio::time::pause();
let sim_addr = SimAddr::new("unix!@dst".parse::<ChannelAddr>().unwrap()).unwrap();
let sim_addr = SimAddr::new("unix:@dst".parse::<ChannelAddr>().unwrap()).unwrap();
let sim_addr_with_src = SimAddr::new_with_src(
"unix!@src".parse::<ChannelAddr>().unwrap(),
"unix!@dst".parse::<ChannelAddr>().unwrap(),
"unix:@src".parse::<ChannelAddr>().unwrap(),
"unix:@dst".parse::<ChannelAddr>().unwrap(),
)
.unwrap();
let (_, mut rx) = sim::serve::<()>(sim_addr.clone()).unwrap();
let tx = sim::dial::<()>(sim_addr_with_src).unwrap();
let simnet_config_yaml = r#"
edges:
- src: unix!@src
dst: unix!@dst
- src: unix:@src
dst: unix:@dst
metadata:
latency: 100
"#;
Expand Down Expand Up @@ -526,24 +526,24 @@ mod tests {
tokio::time::pause();
start();
let controller_to_dst = SimAddr::new_with_src(
"unix!@controller".parse::<ChannelAddr>().unwrap(),
"unix!@dst".parse::<ChannelAddr>().unwrap(),
"unix:@controller".parse::<ChannelAddr>().unwrap(),
"unix:@dst".parse::<ChannelAddr>().unwrap(),
)
.unwrap();
let controller_tx = sim::dial::<()>(controller_to_dst.clone()).unwrap();

let client_to_dst = SimAddr::new_with_client_src(
"unix!@client".parse::<ChannelAddr>().unwrap(),
"unix!@dst".parse::<ChannelAddr>().unwrap(),
"unix:@client".parse::<ChannelAddr>().unwrap(),
"unix:@dst".parse::<ChannelAddr>().unwrap(),
)
.unwrap();
let client_tx = sim::dial::<()>(client_to_dst).unwrap();

// 1 second of latency
let simnet_config_yaml = r#"
edges:
- src: unix!@controller
dst: unix!@dst
- src: unix:@controller
dst: unix:@dst
metadata:
latency: 1
"#;
Expand Down
4 changes: 2 additions & 2 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2709,10 +2709,10 @@ mod tests {
#[tokio::test]
async fn test_sim_client_server() {
simnet::start();
let dst_addr = SimAddr::new("local!1".parse::<ChannelAddr>().unwrap()).unwrap();
let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
let src_to_dst = ChannelAddr::Sim(
SimAddr::new_with_src(
"local!0".parse::<ChannelAddr>().unwrap(),
"local:0".parse::<ChannelAddr>().unwrap(),
dst_addr.addr().clone(),
)
.unwrap(),
Expand Down
36 changes: 18 additions & 18 deletions hyperactor/src/simnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,8 @@ mod tests {
// Tests that we can create a simnet, config latency between two node and deliver
// the message with configured latency.
start();
let alice = "local!1".parse::<simnet::ChannelAddr>().unwrap();
let bob = "local!2".parse::<simnet::ChannelAddr>().unwrap();
let alice = "local:1".parse::<simnet::ChannelAddr>().unwrap();
let bob = "local:2".parse::<simnet::ChannelAddr>().unwrap();
let latency = Duration::from_millis(1000);
let config = NetworkConfig {
edges: vec![EdgeConfig {
Expand Down Expand Up @@ -949,7 +949,7 @@ mod tests {
.unwrap();
let records = simnet_handle().unwrap().close().await;
let expected_record = SimulatorEventRecord {
summary: "Sending message from local!1 to local!2".to_string(),
summary: "Sending message from local:1 to local:2".to_string(),
start_at: 0,
end_at: latency.as_millis() as u64,
};
Expand All @@ -960,8 +960,8 @@ mod tests {
#[tokio::test]
async fn test_simnet_debounce() {
start();
let alice = "local!1".parse::<simnet::ChannelAddr>().unwrap();
let bob = "local!2".parse::<simnet::ChannelAddr>().unwrap();
let alice = "local:1".parse::<simnet::ChannelAddr>().unwrap();
let bob = "local:2".parse::<simnet::ChannelAddr>().unwrap();

let latency = Duration::from_millis(10000);
simnet_handle()
Expand Down Expand Up @@ -1020,7 +1020,7 @@ mod tests {
// // Create a simple network of 4 nodes.
for i in 0..4 {
addresses.push(
format!("local!{}", i)
format!("local:{}", i)
.parse::<simnet::ChannelAddr>()
.unwrap(),
);
Expand Down Expand Up @@ -1083,46 +1083,46 @@ mod tests {
async fn test_read_config_from_yaml() {
let yaml = r#"
edges:
- src: local!0
dst: local!1
- src: local:0
dst: local:1
metadata:
latency: 1
- src: local!0
dst: local!2
- src: local:0
dst: local:2
metadata:
latency: 2
- src: local!1
dst: local!2
- src: local:1
dst: local:2
metadata:
latency: 3
"#;
let config = NetworkConfig::from_yaml(yaml).unwrap();
assert_eq!(config.edges.len(), 3);
assert_eq!(
config.edges[0].src,
"local!0".parse::<simnet::ChannelAddr>().unwrap()
"local:0".parse::<simnet::ChannelAddr>().unwrap()
);
assert_eq!(
config.edges[0].dst,
"local!1".parse::<simnet::ChannelAddr>().unwrap()
"local:1".parse::<simnet::ChannelAddr>().unwrap()
);
assert_eq!(config.edges[0].metadata.latency, Duration::from_secs(1));
assert_eq!(
config.edges[1].src,
"local!0".parse::<simnet::ChannelAddr>().unwrap()
"local:0".parse::<simnet::ChannelAddr>().unwrap()
);
assert_eq!(
config.edges[1].dst,
"local!2".parse::<simnet::ChannelAddr>().unwrap()
"local:2".parse::<simnet::ChannelAddr>().unwrap()
);
assert_eq!(config.edges[1].metadata.latency, Duration::from_secs(2));
assert_eq!(
config.edges[2].src,
"local!1".parse::<simnet::ChannelAddr>().unwrap()
"local:1".parse::<simnet::ChannelAddr>().unwrap()
);
assert_eq!(
config.edges[2].dst,
"local!2".parse::<simnet::ChannelAddr>().unwrap()
"local:2".parse::<simnet::ChannelAddr>().unwrap()
);
assert_eq!(config.edges[2].metadata.latency, Duration::from_secs(3));
}
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_multiprocess/src/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod tests {
#[tracing_test::traced_test]
#[tokio::test]
async fn test_sim_ping_pong() {
let system_addr = "local!1".parse::<ChannelAddr>().unwrap();
let system_addr = "local:1".parse::<ChannelAddr>().unwrap();

simnet::start();

Expand Down
4 changes: 2 additions & 2 deletions hyperactor_multiprocess/src/system_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2947,7 +2947,7 @@ mod tests {
panic!("Expected sim address");
};

assert_eq!(addr.src().clone().unwrap().to_string(), "unix!@src");
assert_eq!(addr.addr().to_string(), "unix!@dst");
assert_eq!(addr.src().clone().unwrap().to_string(), "unix:@src");
assert_eq!(addr.addr().to_string(), "unix:@dst");
}
}