Skip to content

Commit 0c48a48

Browse files
committed
To fix the shutdown errors, we need to stop the tokio loop...
I am pretty sure our finalization errors are some combo of: 1. schedule some work that will hold or take GIL on a thread owned by tokio. This includes anything that holds a reference to a PyObject, because Drop on PyObject will take the GIL. 2. call Py_Finalize from the main thread 3. Python unloads 4. tokio thread tries to use GIL and crashes. We also start at least one std::thread which runs python that will encounter this problem and not be on the tokio loop. To avoid this, we need to shutdown the tokio loop (i.e. allow all tasks to reach an await, and then cancel everything). However, pyo3_async_runtime makes this nearly impossible because it requires a 'static lifetime reference to the loop, so it cannot be shutdown. This diff shows how to call shutdown on the tokio loop, and I observed it able to fix the `test_proc_mesh_size` finalization issues. However, I have to avoid initializing pyo3_async_runtime. One way to make this shippable would be to remove our usein of pyo3_async_runtime entirely. We primarily need it for future_into_py but we can just reimplement it directly ourself. Differential Revision: [D79533010](https://our.internmc.facebook.com/intern/diff/D79533010/) ghstack-source-id: 300628159 Pull Request resolved: #750
1 parent 3e93c3f commit 0c48a48

File tree

15 files changed

+154
-62
lines changed

15 files changed

+154
-62
lines changed

monarch_extension/src/code_sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl CodeSyncMeshClient {
191191
remote: RemoteWorkspace,
192192
auto_reload: bool,
193193
) -> PyResult<Bound<'py, PyAny>> {
194-
pyo3_async_runtimes::tokio::future_into_py(
194+
monarch_hyperactor::runtime::future_into_py(
195195
py,
196196
CodeSyncMeshClient::sync_workspace_(
197197
self.actor_mesh.clone(),
@@ -211,7 +211,7 @@ impl CodeSyncMeshClient {
211211
auto_reload: bool,
212212
) -> PyResult<Bound<'py, PyAny>> {
213213
let actor_mesh = self.actor_mesh.clone();
214-
pyo3_async_runtimes::tokio::future_into_py(
214+
monarch_hyperactor::runtime::future_into_py(
215215
py,
216216
try_join_all(workspaces.into_iter().map(|workspace| {
217217
CodeSyncMeshClient::sync_workspace_(

monarch_extension/src/simulation_tools.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use pyo3::prelude::*;
1414
#[pyfunction]
1515
#[pyo3(name = "start_event_loop")]
1616
pub fn start_simnet_event_loop(py: Python) -> PyResult<Bound<'_, PyAny>> {
17-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
17+
monarch_hyperactor::runtime::future_into_py(py, async move {
1818
simnet::start();
1919
Ok(())
2020
})
@@ -24,7 +24,7 @@ pub fn start_simnet_event_loop(py: Python) -> PyResult<Bound<'_, PyAny>> {
2424
#[pyo3(name="sleep",signature=(seconds))]
2525
pub fn py_sim_sleep<'py>(py: Python<'py>, seconds: f64) -> PyResult<Bound<'py, PyAny>> {
2626
let millis = (seconds * 1000.0).ceil() as u64;
27-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
27+
monarch_hyperactor::runtime::future_into_py(py, async move {
2828
let duration = tokio::time::Duration::from_millis(millis);
2929
SimClock.sleep(duration).await;
3030
Ok(())

monarch_extension/src/tensor_worker.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1389,7 +1389,6 @@ fn worker_main(py: Python<'_>) -> PyResult<()> {
13891389
BinaryArgs::Pipe => bootstrap_pipe(),
13901390
BinaryArgs::WorkerServer { rd, wr } => {
13911391
worker_server(
1392-
get_tokio_runtime(),
13931392
// SAFETY: Raw FD passed in from parent.
13941393
BufReader::new(File::from(unsafe { OwnedFd::from_raw_fd(rd) })),
13951394
// SAFETY: Raw FD passed in from parent.

monarch_hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ lazy_static = "1.5"
2929
monarch_types = { version = "0.0.0", path = "../monarch_types" }
3030
ndslice = { version = "0.0.0", path = "../ndslice" }
3131
nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }
32+
once_cell = "1.21"
3233
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
3334
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
3435
serde = { version = "1.0.219", features = ["derive", "rc"] }

monarch_hyperactor/src/actor.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -503,26 +503,24 @@ impl Actor for PythonActor {
503503

504504
/// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
505505
fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
506-
let (tx, rx) = std::sync::mpsc::channel();
507-
let _ = std::thread::spawn(move || {
508-
Python::with_gil(|py| {
509-
let asyncio = Python::import(py, "asyncio").unwrap();
510-
let event_loop = asyncio.call_method0("new_event_loop").unwrap();
511-
asyncio
512-
.call_method1("set_event_loop", (event_loop.clone(),))
513-
.unwrap();
514-
515-
let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
516-
.copy_context(py)
517-
.unwrap();
518-
tx.send(task_locals).unwrap();
519-
if let Err(e) = event_loop.call_method0("run_forever") {
520-
eprintln!("Event loop stopped with error: {:?}", e);
521-
}
522-
let _ = event_loop.call_method0("close");
523-
});
524-
});
525-
rx.recv().unwrap()
506+
Python::with_gil(|py| {
507+
let asyncio = Python::import(py, "asyncio").unwrap();
508+
let event_loop = asyncio.call_method0("new_event_loop").unwrap();
509+
let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
510+
.copy_context(py)
511+
.unwrap();
512+
513+
let kwargs = PyDict::new(py);
514+
let target = event_loop.getattr("run_forever").unwrap();
515+
kwargs.set_item("target", target).unwrap();
516+
let thread = py
517+
.import("threading")
518+
.unwrap()
519+
.call_method("Thread", (), Some(&kwargs))
520+
.unwrap();
521+
thread.call_method0("start").unwrap();
522+
task_locals
523+
})
526524
}
527525

528526
// [Panics in async endpoints]

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ impl PythonActorMesh {
245245

246246
fn stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
247247
let actor_mesh = self.inner.clone();
248-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
248+
crate::runtime::future_into_py(py, async move {
249249
let actor_mesh = actor_mesh
250250
.take()
251251
.await

monarch_hyperactor/src/alloc.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -319,17 +319,17 @@ impl PyRemoteProcessAllocInitializer {
319319
.call_method1("initialize_alloc", args)
320320
.map(|x| x.unbind())
321321
})?;
322-
get_tokio_runtime()
323-
.spawn_blocking(move || -> PyResult<Vec<String>> {
324-
// call the function as implemented in python
325-
Python::with_gil(|py| {
326-
let asyncio = py.import("asyncio").unwrap();
327-
let addrs = asyncio.call_method1("run", (coro,))?;
328-
let addrs: PyResult<Vec<String>> = addrs.extract();
329-
addrs
330-
})
322+
let r = get_tokio_runtime().spawn_blocking(move || -> PyResult<Vec<String>> {
323+
// call the function as implemented in python
324+
Python::with_gil(|py| {
325+
let asyncio = py.import("asyncio").unwrap();
326+
let addrs = asyncio.call_method1("run", (coro,))?;
327+
let addrs: PyResult<Vec<String>> = addrs.extract();
328+
addrs
331329
})
332-
.await
330+
});
331+
332+
r.await
333333
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
334334
}
335335

monarch_hyperactor/src/bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
2626
};
2727

2828
hyperactor::tracing::debug!("entering async bootstrap");
29-
pyo3_async_runtimes::tokio::future_into_py::<_, ()>(py, async move {
29+
crate::runtime::future_into_py::<_, ()>(py, async move {
3030
// SAFETY:
3131
// - Only one of these is ever created.
3232
// - This is the entry point of this program, so this will be dropped when

monarch_hyperactor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010
#![feature(exit_status_error)]
11+
#![feature(mapped_lock_guards)]
12+
#![feature(rwlock_downgrade)]
1113

1214
pub mod actor;
1315
pub mod actor_mesh;

monarch_hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ pub(super) struct PythonUndeliverablePortReceiver {
430430
impl PythonUndeliverablePortReceiver {
431431
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
432432
let receiver = self.inner.clone();
433-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
433+
crate::runtime::future_into_py(py, async move {
434434
let message = receiver
435435
.lock()
436436
.await

0 commit comments

Comments
 (0)