Skip to content

Commit f1f1721

Browse files
committed
To fix the shutdown errors, we need to stop the tokio loop...
Pull Request resolved: #750 I am pretty sure our finalization errors are some combo of: 1. schedule some work that will hold or take GIL on a thread owned by tokio. This includes anything that holds a reference to a PyObject, because Drop on PyObject will take the GIL. 2. call Py_Finalize from the main thread 3. Python unloads 4. tokio thread tries to use GIL and crashes. We also start at least one std::thread which runs python that will encounter this problem and not be on the tokio loop. To avoid this, we need to shutdown the tokio loop (i.e. allow all tasks to reach an await, and then cancel everything). However,` pyo3_async_runtime::tokio` makes this nearly impossible because it requires a 'static lifetime reference to the loop, so it cannot be shutdown. This diff shows how to call shutdown on the tokio loop, and I observed it able to fix the `test_proc_mesh_size` finalization issues. However, I have to avoid initializing pyo3_async_runtime::tokio. This diff removes our use of pyo3_async_runtime::tokio. We instead replace with our own wrapper to get a python future which uses our get_tokio_runtime function. ghstack-source-id: 300750441 Differential Revision: [D79533010](https://our.internmc.facebook.com/intern/diff/D79533010/)
1 parent 35d6478 commit f1f1721

File tree

15 files changed

+154
-62
lines changed

15 files changed

+154
-62
lines changed

monarch_extension/src/code_sync.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl CodeSyncMeshClient {
191191
remote: RemoteWorkspace,
192192
auto_reload: bool,
193193
) -> PyResult<Bound<'py, PyAny>> {
194-
pyo3_async_runtimes::tokio::future_into_py(
194+
monarch_hyperactor::runtime::future_into_py(
195195
py,
196196
CodeSyncMeshClient::sync_workspace_(
197197
self.actor_mesh.clone(),
@@ -211,7 +211,7 @@ impl CodeSyncMeshClient {
211211
auto_reload: bool,
212212
) -> PyResult<Bound<'py, PyAny>> {
213213
let actor_mesh = self.actor_mesh.clone();
214-
pyo3_async_runtimes::tokio::future_into_py(
214+
monarch_hyperactor::runtime::future_into_py(
215215
py,
216216
try_join_all(workspaces.into_iter().map(|workspace| {
217217
CodeSyncMeshClient::sync_workspace_(

monarch_extension/src/simulation_tools.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use pyo3::prelude::*;
1414
#[pyfunction]
1515
#[pyo3(name = "start_event_loop")]
1616
pub fn start_simnet_event_loop(py: Python) -> PyResult<Bound<'_, PyAny>> {
17-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
17+
monarch_hyperactor::runtime::future_into_py(py, async move {
1818
simnet::start();
1919
Ok(())
2020
})
@@ -24,7 +24,7 @@ pub fn start_simnet_event_loop(py: Python) -> PyResult<Bound<'_, PyAny>> {
2424
#[pyo3(name="sleep",signature=(seconds))]
2525
pub fn py_sim_sleep<'py>(py: Python<'py>, seconds: f64) -> PyResult<Bound<'py, PyAny>> {
2626
let millis = (seconds * 1000.0).ceil() as u64;
27-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
27+
monarch_hyperactor::runtime::future_into_py(py, async move {
2828
let duration = tokio::time::Duration::from_millis(millis);
2929
SimClock.sleep(duration).await;
3030
Ok(())

monarch_extension/src/tensor_worker.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1389,7 +1389,6 @@ fn worker_main(py: Python<'_>) -> PyResult<()> {
13891389
BinaryArgs::Pipe => bootstrap_pipe(),
13901390
BinaryArgs::WorkerServer { rd, wr } => {
13911391
worker_server(
1392-
get_tokio_runtime(),
13931392
// SAFETY: Raw FD passed in from parent.
13941393
BufReader::new(File::from(unsafe { OwnedFd::from_raw_fd(rd) })),
13951394
// SAFETY: Raw FD passed in from parent.

monarch_hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ lazy_static = "1.5"
2929
monarch_types = { version = "0.0.0", path = "../monarch_types" }
3030
ndslice = { version = "0.0.0", path = "../ndslice" }
3131
nix = { version = "0.30.1", features = ["dir", "event", "hostname", "inotify", "ioctl", "mman", "mount", "net", "poll", "ptrace", "reboot", "resource", "sched", "signal", "term", "time", "user", "zerocopy"] }
32+
once_cell = "1.21"
3233
opentelemetry = "0.29"
3334
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
3435
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }

monarch_hyperactor/src/actor.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -502,26 +502,24 @@ impl Actor for PythonActor {
502502

503503
/// Create a new TaskLocals with its own asyncio event loop in a dedicated thread.
504504
fn create_task_locals() -> pyo3_async_runtimes::TaskLocals {
505-
let (tx, rx) = std::sync::mpsc::channel();
506-
let _ = std::thread::spawn(move || {
507-
Python::with_gil(|py| {
508-
let asyncio = Python::import(py, "asyncio").unwrap();
509-
let event_loop = asyncio.call_method0("new_event_loop").unwrap();
510-
asyncio
511-
.call_method1("set_event_loop", (event_loop.clone(),))
512-
.unwrap();
513-
514-
let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
515-
.copy_context(py)
516-
.unwrap();
517-
tx.send(task_locals).unwrap();
518-
if let Err(e) = event_loop.call_method0("run_forever") {
519-
eprintln!("Event loop stopped with error: {:?}", e);
520-
}
521-
let _ = event_loop.call_method0("close");
522-
});
523-
});
524-
rx.recv().unwrap()
505+
Python::with_gil(|py| {
506+
let asyncio = Python::import(py, "asyncio").unwrap();
507+
let event_loop = asyncio.call_method0("new_event_loop").unwrap();
508+
let task_locals = pyo3_async_runtimes::TaskLocals::new(event_loop.clone())
509+
.copy_context(py)
510+
.unwrap();
511+
512+
let kwargs = PyDict::new(py);
513+
let target = event_loop.getattr("run_forever").unwrap();
514+
kwargs.set_item("target", target).unwrap();
515+
let thread = py
516+
.import("threading")
517+
.unwrap()
518+
.call_method("Thread", (), Some(&kwargs))
519+
.unwrap();
520+
thread.call_method0("start").unwrap();
521+
task_locals
522+
})
525523
}
526524

527525
// [Panics in async endpoints]

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ impl PythonActorMesh {
249249

250250
fn stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
251251
let actor_mesh = self.inner.clone();
252-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
252+
crate::runtime::future_into_py(py, async move {
253253
let actor_mesh = actor_mesh
254254
.take()
255255
.await

monarch_hyperactor/src/alloc.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -318,17 +318,17 @@ impl PyRemoteProcessAllocInitializer {
318318
.call_method1("initialize_alloc", args)
319319
.map(|x| x.unbind())
320320
})?;
321-
get_tokio_runtime()
322-
.spawn_blocking(move || -> PyResult<Vec<String>> {
323-
// call the function as implemented in python
324-
Python::with_gil(|py| {
325-
let asyncio = py.import("asyncio").unwrap();
326-
let addrs = asyncio.call_method1("run", (coro,))?;
327-
let addrs: PyResult<Vec<String>> = addrs.extract();
328-
addrs
329-
})
321+
let r = get_tokio_runtime().spawn_blocking(move || -> PyResult<Vec<String>> {
322+
// call the function as implemented in python
323+
Python::with_gil(|py| {
324+
let asyncio = py.import("asyncio").unwrap();
325+
let addrs = asyncio.call_method1("run", (coro,))?;
326+
let addrs: PyResult<Vec<String>> = addrs.extract();
327+
addrs
330328
})
331-
.await
329+
});
330+
331+
r.await
332332
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
333333
}
334334

monarch_hyperactor/src/bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub fn bootstrap_main(py: Python) -> PyResult<Bound<PyAny>> {
2626
};
2727

2828
hyperactor::tracing::debug!("entering async bootstrap");
29-
pyo3_async_runtimes::tokio::future_into_py::<_, ()>(py, async move {
29+
crate::runtime::future_into_py::<_, ()>(py, async move {
3030
// SAFETY:
3131
// - Only one of these is ever created.
3232
// - This is the entry point of this program, so this will be dropped when

monarch_hyperactor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010
#![feature(exit_status_error)]
11+
#![feature(mapped_lock_guards)]
12+
#![feature(rwlock_downgrade)]
1113

1214
pub mod actor;
1315
pub mod actor_mesh;

monarch_hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ pub(super) struct PythonUndeliverablePortReceiver {
430430
impl PythonUndeliverablePortReceiver {
431431
fn recv<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
432432
let receiver = self.inner.clone();
433-
pyo3_async_runtimes::tokio::future_into_py(py, async move {
433+
crate::runtime::future_into_py(py, async move {
434434
let message = receiver
435435
.lock()
436436
.await

0 commit comments

Comments
 (0)