@@ -949,31 +949,34 @@ def shutdown(self) -> None:
949949 Shutdown the process group. This will kill the underlying process and
950950 close all pipes.
951951
952+ We close the pipes by sending a _PIPE_CLOSE message from the writing end (local)
953+ to the reading end (remote). The remote end will then exit it's recv loop and we
954+ will join the thread or process.
955+
952956 This is a no-op if the process group is already shutdown.
953957
954958 ProcessGroup can be reconfigured after shutdown.
955959 """
956- # Close the future pipe first
960+ # Close the future pipe
957961 if self ._future_pipe is not None :
958- # close future thread
959962 self ._future_pipe .send ((- 1 , _PIPE_CLOSE , None , None ))
960963 assert self ._future_pipe is not None
961964 self ._future_pipe .close ()
962965 self ._future_pipe = None
963- # Join the future thread after closing its pipe
966+ # Join the future thread
964967 if self ._future_thread is not None :
965968 self ._future_thread .join (timeout = 10.0 )
966969 assert self ._future_thread is not None
967970 if self ._future_thread .is_alive ():
968971 raise RuntimeError ("Future thread did not exit" )
969972 self ._future_thread = None
970- # Close the request pipe to signal the worker process to exit
973+ # Close the process pipe
971974 if self ._pipe is not None :
972975 self ._pipe .send ((_PIPE_CLOSE ,))
973976 assert self ._pipe is not None
974977 self ._pipe .close ()
975978 self ._pipe = None
976- # Terminate the worker process after closing its pipe
979+ # Join the process
977980 if self ._p is not None :
978981 self ._p .join (timeout = 10.0 )
979982 assert self ._p is not None
@@ -997,9 +1000,8 @@ def configure(self, store_addr: str, rank: int, world_size: int) -> None:
9971000 | | Pipe 2 | |
9981001 +-------------------+ +-------------------+
9991002
1000- Main Process: Maintains self._futures
1001- Worker Process: Handles tasks, communicates with Future Thread.
1002- Future Thread: Manages asynchronous tasks, updates self._futures.
1003+ Worker Process: Executes the collective operations.
1004+ Future Thread: Executes the user defined future callbacks.
10031005 """
10041006
10051007 self ._world_size = world_size
0 commit comments