@@ -43,6 +43,8 @@ struct Response
4343 # if true, worker is shutting down, so we can stop listening to it.
4444 shutdown:: Bool
4545end
46+ Response (a, b, c) = Response (a, b, c, false )
47+ shutdown_response () = Response (nothing , nothing , rand (UInt64), true )
4648is_shutdown (r:: Response ) = r. shutdown
4749
4850# simple Future that coordinator can wait on until a Response comes back for a Request
@@ -96,6 +98,7 @@ function terminate!(w::Worker, from::Symbol=:manual)
9698 if ! (w. socket. status == Base. StatusUninit || w. socket. status == Base. StatusInit || w. socket. handle === C_NULL )
9799 close (w. socket)
98100 end
101+ @debug " Done cleaning up after terminating worker $(w. pid) from $from "
99102 return
100103end
101104
@@ -210,6 +213,8 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
210213 try
211214 notify (ev) # notify we've started
212215 while ! process_exited (proc) && ! w. terminated
216+ Core. println (getpid (proc))
217+ Core. println (process_exited (proc))
213218 line = readline (proc)
214219 if ! isempty (line)
215220 fn (io, w. pid, line)
@@ -237,7 +242,14 @@ function process_responses(w::Worker, ev::Threads.Event)
237242 # get the next Response from the worker
238243 r = deserialize (w. socket)
239244 @assert r isa Response " Received invalid response from worker $(w. pid) : $(r) "
240- is_shutdown (r) && break
245+ if is_shutdown (r)
246+ @debug " Received shutdown response from worker $(w. pid) . Waiting for shutdown of $(w. process) "
247+ # TODO (PR): SOMEHOW this wait(p) is not getting interrupted when p dies as a zombie <defunct> process.
248+ wait (w. process)
249+ @debug " shutdown"
250+ terminate! (w, :process_responses )
251+ break
252+ end
241253 # println("Received response $(r) from worker $(w.pid)")
242254 @lock lock begin
243255 @assert haskey (reqs, r. id) " Received response for unknown request $(r. id) from worker $(w. pid) "
@@ -295,6 +307,7 @@ function startworker()
295307 serve_requests (accept (sock))
296308 finally
297309 close (sock)
310+ @debug " Shutting down worker $(getpid ()) "
298311 exit (0 )
299312 end
300313end
@@ -326,7 +339,7 @@ function serve_requests(io)
326339 @assert req isa Request
327340 if is_shutdown (req)
328341 @debug " Received shutdown request on worker $(getpid ()) "
329- resp = Response ( nothing , nothing , rand (UInt64), true )
342+ resp = shutdown_response ( )
330343 @lock iolock begin
331344 # println("sending response: $(resp)")
332345 serialize (io, resp)
0 commit comments