Skip to content

Commit 1b852c6

Browse files
committed
[hyperactor] channel: allow ":" as a channel-type delimiter
We currently use "!" to delimit between channel type and its address. The main issue with "!" is that it is not shell-friendly, as Bourne shells will expand "!" into a history entry. With this change, we allow ":" to be used instead. With the previous change (requiring channel types to be specified), we can parse these two cases unambiguously. The ChannelAddr `Display` implementation is adjusted to emit ":"-delimited channel addresses, too. It is the new preferred syntax. Differential Revision: [D79846852](https://our.internmc.facebook.com/intern/diff/D79846852/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D79846852/)! ghstack-source-id: 301540556 Pull Request resolved: #798
1 parent d4b2ff0 commit 1b852c6

File tree

5 files changed

+77
-89
lines changed

5 files changed

+77
-89
lines changed

hyperactor/src/channel.rs

Lines changed: 39 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -276,20 +276,20 @@ pub type Port = u16;
276276

277277
/// The type of a channel address, used to multiplex different underlying
278278
/// channel implementations. ChannelAddrs also have a concrete syntax:
279-
/// the address type ("tcp" or "local"), followed by "!", and an address
280-
/// parseable to that type. Addresses without a specified type default to
281-
/// "tcp". For example:
279+
/// the address type (e.g., "tcp" or "local"), followed by ":", and an address
280+
/// parseable to that type. For example:
282281
///
283-
/// - `tcp!127.0.0.1:1234` - localhost port 1234 over TCP
284-
/// - `tcp!192.168.0.1:1111` - 192.168.0.1 port 1111 over TCP
285-
/// - `local!123` - the (in-process) local port 123
282+
/// - `tcp:127.0.0.1:1234` - localhost port 1234 over TCP
283+
/// - `tcp:192.168.0.1:1111` - 192.168.0.1 port 1111 over TCP
284+
/// - `local:123` - the (in-process) local port 123
285+
/// - `unix:/some/path` - the Unix socket at `/some/path`
286286
///
287287
/// Both local and TCP ports 0 are reserved to indicate "any available
288288
/// port" when serving.
289289
///
290290
/// ```
291291
/// # use hyperactor::channel::ChannelAddr;
292-
/// let addr: ChannelAddr = "tcp!127.0.0.1:1234".parse().unwrap();
292+
/// let addr: ChannelAddr = "tcp:127.0.0.1:1234".parse().unwrap();
293293
/// let ChannelAddr::Tcp(socket_addr) = addr else {
294294
/// panic!()
295295
/// };
@@ -401,11 +401,11 @@ impl ChannelAddr {
401401
impl fmt::Display for ChannelAddr {
402402
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403403
match self {
404-
Self::Tcp(addr) => write!(f, "tcp!{}", addr),
405-
Self::MetaTls(hostname, port) => write!(f, "metatls!{}:{}", hostname, port),
406-
Self::Local(index) => write!(f, "local!{}", index),
407-
Self::Sim(sim_addr) => write!(f, "sim!{}", sim_addr),
408-
Self::Unix(addr) => write!(f, "unix!{}", addr),
404+
Self::Tcp(addr) => write!(f, "tcp:{}", addr),
405+
Self::MetaTls(hostname, port) => write!(f, "metatls:{}:{}", hostname, port),
406+
Self::Local(index) => write!(f, "local:{}", index),
407+
Self::Sim(sim_addr) => write!(f, "sim:{}", sim_addr),
408+
Self::Unix(addr) => write!(f, "unix:{}", addr),
409409
}
410410
}
411411
}
@@ -414,7 +414,8 @@ impl FromStr for ChannelAddr {
414414
type Err = anyhow::Error;
415415

416416
fn from_str(addr: &str) -> Result<Self, Self::Err> {
417-
match addr.split_once('!') {
417+
// "!" is the legacy delimiter; ":" is preferred
418+
match addr.split_once('!').or_else(|| addr.split_once(':')) {
418419
Some(("local", rest)) => rest
419420
.parse::<u64>()
420421
.map(Self::Local)
@@ -604,73 +605,60 @@ mod tests {
604605
fn test_channel_addr() {
605606
let cases_ok = vec![
606607
(
607-
"tcp![::1]:1234",
608+
"tcp<DELIM>[::1]:1234",
608609
ChannelAddr::Tcp(SocketAddr::new(
609610
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
610611
1234,
611612
)),
612613
),
613614
(
614-
"tcp!127.0.0.1:8080",
615+
"tcp<DELIM>127.0.0.1:8080",
615616
ChannelAddr::Tcp(SocketAddr::new(
616617
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
617618
8080,
618619
)),
619620
),
620-
("local!123", ChannelAddr::Local(123)),
621621
#[cfg(target_os = "linux")]
622+
("local<DELIM>123", ChannelAddr::Local(123)),
622623
(
623-
"unix!@yolo",
624+
"unix<DELIM>@yolo",
624625
ChannelAddr::Unix(
625626
unix::SocketAddr::from_abstract_name("yolo")
626627
.expect("can't make socket from abstract name"),
627628
),
628629
),
629630
(
630-
"unix!/cool/socket-path",
631+
"unix<DELIM>/cool/socket-path",
631632
ChannelAddr::Unix(
632633
unix::SocketAddr::from_pathname("/cool/socket-path")
633634
.expect("can't make socket from path"),
634635
),
635636
),
636637
];
637638

638-
let src_ok = vec![
639-
(
640-
"tcp![::1]:1235",
641-
ChannelAddr::Tcp(SocketAddr::new(
642-
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
643-
1235,
644-
)),
645-
),
646-
(
647-
"tcp!127.0.0.1:8081",
648-
ChannelAddr::Tcp(SocketAddr::new(
649-
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
650-
8081,
651-
)),
652-
),
653-
("local!124", ChannelAddr::Local(124)),
654-
];
655-
656639
for (raw, parsed) in cases_ok.clone() {
657-
assert_eq!(raw.parse::<ChannelAddr>().unwrap(), parsed);
640+
for delim in ["!", ":"] {
641+
let raw = raw.replace("<DELIM>", delim);
642+
assert_eq!(raw.parse::<ChannelAddr>().unwrap(), parsed);
643+
}
658644
}
659645

660-
for (raw, parsed) in cases_ok.iter().zip(src_ok.clone()).map(|(a, _)| {
661-
(
662-
format!("sim!{}", a.0),
663-
ChannelAddr::Sim(SimAddr::new(a.1.clone()).unwrap()),
664-
)
665-
}) {
666-
assert_eq!(raw.parse::<ChannelAddr>().unwrap(), parsed);
646+
for (raw, parsed) in cases_ok {
647+
for delim in ["!", ":"] {
648+
// We don't allow mixing and matching delims
649+
let raw = format!("sim{}{}", delim, raw.replace("<DELIM>", delim));
650+
assert_eq!(
651+
raw.parse::<ChannelAddr>().unwrap(),
652+
ChannelAddr::Sim(SimAddr::new(parsed.clone()).unwrap())
653+
);
654+
}
667655
}
668656

669657
let cases_err = vec![
670-
("tcp!abcdef..123124", "invalid socket address syntax"),
671-
("xxx!foo", "no such channel type: xxx"),
658+
("tcp:abcdef..123124", "invalid socket address syntax"),
659+
("xxx:foo", "no such channel type: xxx"),
672660
("127.0.0.1", "no channel type specified"),
673-
("local!abc", "invalid digit found in string"),
661+
("local:abc", "invalid digit found in string"),
674662
];
675663

676664
for (raw, error) in cases_err {
@@ -751,13 +739,13 @@ mod tests {
751739

752740
let rng = rand::thread_rng();
753741
vec![
754-
"tcp![::1]:0".parse().unwrap(),
755-
"local!0".parse().unwrap(),
742+
"tcp:[::1]:0".parse().unwrap(),
743+
"local:0".parse().unwrap(),
756744
#[cfg(target_os = "linux")]
757-
"unix!".parse().unwrap(),
745+
"unix:".parse().unwrap(),
758746
#[cfg(target_os = "linux")]
759747
format!(
760-
"unix!@{}",
748+
"unix:@{}",
761749
rng.sample_iter(Uniform::new_inclusive('a', 'z'))
762750
.take(10)
763751
.collect::<String>()

hyperactor/src/channel/sim.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -419,8 +419,8 @@ mod tests {
419419

420420
#[tokio::test]
421421
async fn test_sim_basic() {
422-
let dst_ok = vec!["tcp![::1]:1234", "tcp!127.0.0.1:8080", "local!123"];
423-
let srcs_ok = vec!["tcp![::2]:1234", "tcp!127.0.0.2:8080", "local!124"];
422+
let dst_ok = vec!["tcp:[::1]:1234", "tcp:127.0.0.1:8080", "local:123"];
423+
let srcs_ok = vec!["tcp:[::2]:1234", "tcp:127.0.0.2:8080", "local:124"];
424424

425425
start();
426426

@@ -460,42 +460,42 @@ mod tests {
460460

461461
#[tokio::test]
462462
async fn test_parse_sim_addr() {
463-
let sim_addr = "sim!unix!@dst";
463+
let sim_addr = "sim!unix:@dst";
464464
let result = sim_addr.parse();
465465
assert!(result.is_ok());
466466
let ChannelAddr::Sim(sim_addr) = result.unwrap() else {
467467
panic!("Expected a sim address");
468468
};
469469
assert!(sim_addr.src().is_none());
470-
assert_eq!(sim_addr.addr().to_string(), "unix!@dst");
470+
assert_eq!(sim_addr.addr().to_string(), "unix:@dst");
471471

472-
let sim_addr = "sim!unix!@src,unix!@dst";
472+
let sim_addr = "sim!unix:@src,unix:@dst";
473473
let result = sim_addr.parse();
474474
assert!(result.is_ok());
475475
let ChannelAddr::Sim(sim_addr) = result.unwrap() else {
476476
panic!("Expected a sim address");
477477
};
478478
assert!(sim_addr.src().is_some());
479-
assert_eq!(sim_addr.addr().to_string(), "unix!@dst");
479+
assert_eq!(sim_addr.addr().to_string(), "unix:@dst");
480480
}
481481

482482
#[tokio::test]
483483
async fn test_realtime_frontier() {
484484
start();
485485

486486
tokio::time::pause();
487-
let sim_addr = SimAddr::new("unix!@dst".parse::<ChannelAddr>().unwrap()).unwrap();
487+
let sim_addr = SimAddr::new("unix:@dst".parse::<ChannelAddr>().unwrap()).unwrap();
488488
let sim_addr_with_src = SimAddr::new_with_src(
489-
"unix!@src".parse::<ChannelAddr>().unwrap(),
490-
"unix!@dst".parse::<ChannelAddr>().unwrap(),
489+
"unix:@src".parse::<ChannelAddr>().unwrap(),
490+
"unix:@dst".parse::<ChannelAddr>().unwrap(),
491491
)
492492
.unwrap();
493493
let (_, mut rx) = sim::serve::<()>(sim_addr.clone()).unwrap();
494494
let tx = sim::dial::<()>(sim_addr_with_src).unwrap();
495495
let simnet_config_yaml = r#"
496496
edges:
497-
- src: unix!@src
498-
dst: unix!@dst
497+
- src: unix:@src
498+
dst: unix:@dst
499499
metadata:
500500
latency: 100
501501
"#;
@@ -526,24 +526,24 @@ mod tests {
526526
tokio::time::pause();
527527
start();
528528
let controller_to_dst = SimAddr::new_with_src(
529-
"unix!@controller".parse::<ChannelAddr>().unwrap(),
530-
"unix!@dst".parse::<ChannelAddr>().unwrap(),
529+
"unix:@controller".parse::<ChannelAddr>().unwrap(),
530+
"unix:@dst".parse::<ChannelAddr>().unwrap(),
531531
)
532532
.unwrap();
533533
let controller_tx = sim::dial::<()>(controller_to_dst.clone()).unwrap();
534534

535535
let client_to_dst = SimAddr::new_with_client_src(
536-
"unix!@client".parse::<ChannelAddr>().unwrap(),
537-
"unix!@dst".parse::<ChannelAddr>().unwrap(),
536+
"unix:@client".parse::<ChannelAddr>().unwrap(),
537+
"unix:@dst".parse::<ChannelAddr>().unwrap(),
538538
)
539539
.unwrap();
540540
let client_tx = sim::dial::<()>(client_to_dst).unwrap();
541541

542542
// 1 second of latency
543543
let simnet_config_yaml = r#"
544544
edges:
545-
- src: unix!@controller
546-
dst: unix!@dst
545+
- src: unix:@controller
546+
dst: unix:@dst
547547
metadata:
548548
latency: 1
549549
"#;

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2639,10 +2639,10 @@ mod tests {
26392639
#[tokio::test]
26402640
async fn test_sim_client_server() {
26412641
simnet::start();
2642-
let dst_addr = SimAddr::new("local!1".parse::<ChannelAddr>().unwrap()).unwrap();
2642+
let dst_addr = SimAddr::new("local:1".parse::<ChannelAddr>().unwrap()).unwrap();
26432643
let src_to_dst = ChannelAddr::Sim(
26442644
SimAddr::new_with_src(
2645-
"local!0".parse::<ChannelAddr>().unwrap(),
2645+
"local:0".parse::<ChannelAddr>().unwrap(),
26462646
dst_addr.addr().clone(),
26472647
)
26482648
.unwrap(),

hyperactor/src/simnet.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -917,8 +917,8 @@ mod tests {
917917
// Tests that we can create a simnet, config latency between two node and deliver
918918
// the message with configured latency.
919919
start();
920-
let alice = "local!1".parse::<simnet::ChannelAddr>().unwrap();
921-
let bob = "local!2".parse::<simnet::ChannelAddr>().unwrap();
920+
let alice = "local:1".parse::<simnet::ChannelAddr>().unwrap();
921+
let bob = "local:2".parse::<simnet::ChannelAddr>().unwrap();
922922
let latency = Duration::from_millis(1000);
923923
let config = NetworkConfig {
924924
edges: vec![EdgeConfig {
@@ -949,7 +949,7 @@ mod tests {
949949
.unwrap();
950950
let records = simnet_handle().unwrap().close().await;
951951
let expected_record = SimulatorEventRecord {
952-
summary: "Sending message from local!1 to local!2".to_string(),
952+
summary: "Sending message from local:1 to local:2".to_string(),
953953
start_at: 0,
954954
end_at: latency.as_millis() as u64,
955955
};
@@ -960,8 +960,8 @@ mod tests {
960960
#[tokio::test]
961961
async fn test_simnet_debounce() {
962962
start();
963-
let alice = "local!1".parse::<simnet::ChannelAddr>().unwrap();
964-
let bob = "local!2".parse::<simnet::ChannelAddr>().unwrap();
963+
let alice = "local:1".parse::<simnet::ChannelAddr>().unwrap();
964+
let bob = "local:2".parse::<simnet::ChannelAddr>().unwrap();
965965

966966
let latency = Duration::from_millis(10000);
967967
simnet_handle()
@@ -1020,7 +1020,7 @@ mod tests {
10201020
// // Create a simple network of 4 nodes.
10211021
for i in 0..4 {
10221022
addresses.push(
1023-
format!("local!{}", i)
1023+
format!("local:{}", i)
10241024
.parse::<simnet::ChannelAddr>()
10251025
.unwrap(),
10261026
);
@@ -1083,46 +1083,46 @@ mod tests {
10831083
async fn test_read_config_from_yaml() {
10841084
let yaml = r#"
10851085
edges:
1086-
- src: local!0
1087-
dst: local!1
1086+
- src: local:0
1087+
dst: local:1
10881088
metadata:
10891089
latency: 1
1090-
- src: local!0
1091-
dst: local!2
1090+
- src: local:0
1091+
dst: local:2
10921092
metadata:
10931093
latency: 2
1094-
- src: local!1
1095-
dst: local!2
1094+
- src: local:1
1095+
dst: local:2
10961096
metadata:
10971097
latency: 3
10981098
"#;
10991099
let config = NetworkConfig::from_yaml(yaml).unwrap();
11001100
assert_eq!(config.edges.len(), 3);
11011101
assert_eq!(
11021102
config.edges[0].src,
1103-
"local!0".parse::<simnet::ChannelAddr>().unwrap()
1103+
"local:0".parse::<simnet::ChannelAddr>().unwrap()
11041104
);
11051105
assert_eq!(
11061106
config.edges[0].dst,
1107-
"local!1".parse::<simnet::ChannelAddr>().unwrap()
1107+
"local:1".parse::<simnet::ChannelAddr>().unwrap()
11081108
);
11091109
assert_eq!(config.edges[0].metadata.latency, Duration::from_secs(1));
11101110
assert_eq!(
11111111
config.edges[1].src,
1112-
"local!0".parse::<simnet::ChannelAddr>().unwrap()
1112+
"local:0".parse::<simnet::ChannelAddr>().unwrap()
11131113
);
11141114
assert_eq!(
11151115
config.edges[1].dst,
1116-
"local!2".parse::<simnet::ChannelAddr>().unwrap()
1116+
"local:2".parse::<simnet::ChannelAddr>().unwrap()
11171117
);
11181118
assert_eq!(config.edges[1].metadata.latency, Duration::from_secs(2));
11191119
assert_eq!(
11201120
config.edges[2].src,
1121-
"local!1".parse::<simnet::ChannelAddr>().unwrap()
1121+
"local:1".parse::<simnet::ChannelAddr>().unwrap()
11221122
);
11231123
assert_eq!(
11241124
config.edges[2].dst,
1125-
"local!2".parse::<simnet::ChannelAddr>().unwrap()
1125+
"local:2".parse::<simnet::ChannelAddr>().unwrap()
11261126
);
11271127
assert_eq!(config.edges[2].metadata.latency, Duration::from_secs(3));
11281128
}

hyperactor_multiprocess/src/ping_pong.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ mod tests {
3333
#[tracing_test::traced_test]
3434
#[tokio::test]
3535
async fn test_sim_ping_pong() {
36-
let system_addr = "local!1".parse::<ChannelAddr>().unwrap();
36+
let system_addr = "local:1".parse::<ChannelAddr>().unwrap();
3737

3838
simnet::start();
3939

0 commit comments

Comments
 (0)