Skip to content

Commit d9c7d50

Browse files
pablorfb-metafacebook-github-bot
authored andcommitted
Create hyperactor_mesh config file (#758)
Summary: Split config for hyperactor and hyperactor_mesh Rollback Plan: Differential Revision: D79599694
1 parent 24d475c commit d9c7d50

File tree

4 files changed

+222
-37
lines changed

4 files changed

+222
-37
lines changed

hyperactor/src/config.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@ declare_attrs! {
4646

4747
/// Timeout used by proc mesh for stopping an actor.
4848
pub attr STOP_ACTOR_TIMEOUT: Duration = Duration::from_secs(1);
49-
50-
/// Heartbeat interval for remote allocator
51-
pub attr REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
5249
}
5350

5451
/// Load configuration from environment variables
@@ -90,13 +87,6 @@ pub fn from_env() -> Attrs {
9087
}
9188
}
9289

93-
// Load remote allocator heartbeat interval
94-
if let Ok(val) = env::var("HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL_SECS") {
95-
if let Ok(parsed) = val.parse::<u64>() {
96-
config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL] = Duration::from_secs(parsed);
97-
}
98-
}
99-
10090
config
10191
}
10292

@@ -132,9 +122,6 @@ pub fn merge(config: &mut Attrs, other: &Attrs) {
132122
if other.contains_key(SPLIT_MAX_BUFFER_SIZE) {
133123
config[SPLIT_MAX_BUFFER_SIZE] = other[SPLIT_MAX_BUFFER_SIZE];
134124
}
135-
if other.contains_key(REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL) {
136-
config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL] = other[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL];
137-
}
138125
}
139126

140127
/// Global configuration functions
@@ -180,6 +167,16 @@ pub mod global {
180167
static MUTEX: LazyLock<std::sync::Mutex<()>> = LazyLock::new(|| std::sync::Mutex::new(()));
181168
ConfigLock {
182169
_guard: MUTEX.lock().unwrap(),
170+
config: CONFIG.clone(),
171+
}
172+
}
173+
174+
/// Create a new ConfigLock with a specific config.
175+
pub fn new(config: Arc<RwLock<Attrs>>) -> ConfigLock {
176+
static MUTEX: LazyLock<std::sync::Mutex<()>> = LazyLock::new(|| std::sync::Mutex::new(()));
177+
ConfigLock {
178+
_guard: MUTEX.lock().unwrap(),
179+
config,
183180
}
184181
}
185182

@@ -235,6 +232,7 @@ pub mod global {
235232
/// this ConfigLock, ensuring proper synchronization.
236233
pub struct ConfigLock {
237234
_guard: std::sync::MutexGuard<'static, ()>,
235+
config: Arc<RwLock<Attrs>>,
238236
}
239237

240238
impl ConfigLock {
@@ -256,7 +254,7 @@ pub mod global {
256254
value: T,
257255
) -> ConfigValueGuard<'a, T> {
258256
let orig = {
259-
let mut config = CONFIG.write().unwrap();
257+
let mut config = self.config.write().unwrap();
260258
let orig = config.take_value(key);
261259
config.set(key, value);
262260
orig
@@ -265,6 +263,7 @@ pub mod global {
265263
ConfigValueGuard {
266264
key,
267265
orig,
266+
config: self.config.clone(),
268267
_phantom: PhantomData,
269268
}
270269
}
@@ -274,13 +273,14 @@ pub mod global {
274273
pub struct ConfigValueGuard<'a, T: 'static> {
275274
key: crate::attrs::Key<T>,
276275
orig: Option<Box<dyn crate::attrs::SerializableValue>>,
276+
config: Arc<RwLock<Attrs>>,
277277
// This is here so we can hold onto a 'a lifetime.
278278
_phantom: PhantomData<&'a ()>,
279279
}
280280

281281
impl<T: 'static> Drop for ConfigValueGuard<'_, T> {
282282
fn drop(&mut self) {
283-
let mut config = CONFIG.write().unwrap();
283+
let mut config = self.config.write().unwrap();
284284
if let Some(orig) = self.orig.take() {
285285
config.restore_value(self.key, orig);
286286
} else {
@@ -305,10 +305,6 @@ mod tests {
305305
);
306306
assert_eq!(config[MESSAGE_ACK_EVERY_N_MESSAGES], 1000);
307307
assert_eq!(config[SPLIT_MAX_BUFFER_SIZE], 5);
308-
assert_eq!(
309-
config[REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL],
310-
Duration::from_secs(5)
311-
);
312308
}
313309

314310
#[test]

hyperactor_mesh/src/alloc/remoteprocess.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use hyperactor::channel::TxStatus;
3232
use hyperactor::clock;
3333
use hyperactor::clock::Clock;
3434
use hyperactor::clock::RealClock;
35-
use hyperactor::config;
3635
use hyperactor::mailbox::DialMailboxRouter;
3736
use hyperactor::mailbox::MailboxServer;
3837
use hyperactor::mailbox::monitored_return_handle;
@@ -59,6 +58,8 @@ use crate::alloc::AllocatorError;
5958
use crate::alloc::ProcState;
6059
use crate::alloc::ProcStopReason;
6160
use crate::alloc::ProcessAllocator;
61+
use crate::config;
62+
use crate::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL;
6263

6364
/// Control messages sent from remote process allocator to local allocator.
6465
#[derive(Debug, Clone, Serialize, Deserialize, Named)]
@@ -407,7 +408,7 @@ impl RemoteProcessAllocator {
407408
}
408409
}
409410
}
410-
_ = RealClock.sleep(config::global::get(config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL)) => {
411+
_ = RealClock.sleep(crate::config::global::get(REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL)) => {
411412
tracing::trace!("sending heartbeat");
412413
tx.post(RemoteProcessProcStateMessage::HeartBeat);
413414
}
@@ -1208,9 +1209,9 @@ mod test {
12081209

12091210
#[timed_test::async_timed_test(timeout_secs = 5)]
12101211
async fn test_simple() {
1211-
let config = hyperactor::config::global::lock();
1212+
let config = crate::config::global::lock();
12121213
let _guard = config.override_key(
1213-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1214+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
12141215
Duration::from_millis(100),
12151216
);
12161217
hyperactor_telemetry::initialize_logging(ClockKind::default());
@@ -1350,9 +1351,9 @@ mod test {
13501351

13511352
#[timed_test::async_timed_test(timeout_secs = 15)]
13521353
async fn test_normal_stop() {
1353-
let config = hyperactor::config::global::lock();
1354+
let config = crate::config::global::lock();
13541355
let _guard = config.override_key(
1355-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1356+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
13561357
Duration::from_millis(100),
13571358
);
13581359
hyperactor_telemetry::initialize_logging(ClockKind::default());
@@ -1428,9 +1429,9 @@ mod test {
14281429

14291430
#[timed_test::async_timed_test(timeout_secs = 15)]
14301431
async fn test_realloc() {
1431-
let config = hyperactor::config::global::lock();
1432+
let config = crate::config::global::lock();
14321433
let _guard = config.override_key(
1433-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1434+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
14341435
Duration::from_millis(100),
14351436
);
14361437
hyperactor_telemetry::initialize_logging(ClockKind::default());
@@ -1554,7 +1555,7 @@ mod test {
15541555
Duration::from_secs(1),
15551556
);
15561557
let _guard2 = config.override_key(
1557-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1558+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
15581559
Duration::from_millis(100),
15591560
);
15601561

@@ -1640,10 +1641,8 @@ mod test {
16401641
#[timed_test::async_timed_test(timeout_secs = 15)]
16411642
async fn test_inner_alloc_failure() {
16421643
let config = hyperactor::config::global::lock();
1643-
let _guard = config.override_key(
1644-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1645-
Duration::from_secs(60),
1646-
);
1644+
let _guard =
1645+
config.override_key(REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL, Duration::from_secs(60));
16471646
hyperactor_telemetry::initialize_logging(ClockKind::default());
16481647
let serve_addr = ChannelAddr::any(ChannelTransport::Unix);
16491648
let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix);
@@ -1727,6 +1726,7 @@ mod test {
17271726
mod test_alloc {
17281727
use std::os::unix::process::ExitStatusExt;
17291728

1729+
use REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL;
17301730
use hyperactor::clock::ClockKind;
17311731
use hyperactor::config;
17321732
use ndslice::shape;
@@ -1745,7 +1745,7 @@ mod test_alloc {
17451745
Duration::from_secs(1),
17461746
);
17471747
let _guard2 = config.override_key(
1748-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1748+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
17491749
Duration::from_millis(100),
17501750
);
17511751
hyperactor_telemetry::initialize_logging(ClockKind::default());
@@ -1870,7 +1870,7 @@ mod test_alloc {
18701870
Duration::from_secs(1),
18711871
);
18721872
let _guard2 = config.override_key(
1873-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
1873+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
18741874
Duration::from_millis(100),
18751875
);
18761876
hyperactor_telemetry::initialize_logging(ClockKind::default());
@@ -1946,7 +1946,7 @@ mod test_alloc {
19461946
tracing::info!("aborting task1 allocator");
19471947
task1_allocator_handle.abort();
19481948
RealClock
1949-
.sleep(config::global::get(config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL) * 2)
1949+
.sleep(crate::config::global::get(REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL) * 2)
19501950
.await;
19511951
for _ in 0..spec.shape.slice().len() / 2 {
19521952
let proc_state = alloc.next().await.unwrap();
@@ -1970,7 +1970,7 @@ mod test_alloc {
19701970
tracing::info!("aborting task2 allocator");
19711971
task2_allocator_handle.abort();
19721972
RealClock
1973-
.sleep(config::global::get(config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL) * 2)
1973+
.sleep(config::global::get(REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL) * 2)
19741974
.await;
19751975
for _ in 0..spec.shape.slice().len() / 2 {
19761976
let proc_state = alloc.next().await.unwrap();
@@ -1998,7 +1998,7 @@ mod test_alloc {
19981998
}
19991999
let config = hyperactor::config::global::lock();
20002000
let _guard = config.override_key(
2001-
hyperactor::config::REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
2001+
REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL,
20022002
Duration::from_millis(100),
20032003
);
20042004
hyperactor_telemetry::initialize_logging(ClockKind::default());

0 commit comments

Comments
 (0)