diff --git a/src/ReTestItems.jl b/src/ReTestItems.jl index f9ab37a6..4fd25e33 100644 --- a/src/ReTestItems.jl +++ b/src/ReTestItems.jl @@ -499,8 +499,7 @@ function manage_worker( ch = Channel{TestItemResult}(1) if memory_percent() > memory_threshold_percent @warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory." - terminate!(worker) - wait(worker) + close(worker, :high_memory) worker = robust_start_worker(proj_name, nworker_threads, worker_init_expr, ntestitems) end testitem.workerid[] = worker.pid @@ -554,8 +553,7 @@ function manage_worker( # Handle the exception if e isa TimeoutException @debugv 1 "Test item $(repr(testitem.name)) timed out. Terminating worker $worker" - terminate!(worker) - wait(worker) + close(worker, :timeout) @error "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \ Recording test error." record_timeout!(testitem, run_number, timeout) diff --git a/src/workers.jl b/src/workers.jl index 94f01521..2b116514 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -82,12 +82,12 @@ function terminate!(w::Worker, from::Symbol=:manual) empty!(w.futures) end signal = Base.SIGTERM - while true + while !process_exited(w.process) + @debug "sending signal $signal to worker $(w.pid)" kill(w.process, signal) signal = signal == Base.SIGTERM ? Base.SIGINT : Base.SIGKILL process_exited(w.process) && break sleep(0.1) - process_exited(w.process) && break end if !(w.socket.status == Base.StatusUninit || w.socket.status == Base.StatusInit || w.socket.handle === C_NULL) close(w.socket) @@ -107,8 +107,9 @@ end # gracefully terminate a worker by sending a shutdown message # and waiting for the other tasks to perform worker shutdown -function Base.close(w::Worker) +function Base.close(w::Worker, from::Symbol=:manual) if !w.terminated && isopen(w.socket) + @debug "closing worker $(w.pid) from $from" req = Request(Symbol(), :(), rand(UInt64), true) @lock w.lock begin serialize(w.socket, req) @@ -211,7 +212,7 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event) end end catch e - # @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace()) + @debug "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace()) terminate!(w, :redirect_worker_output) e isa EOFError || e isa Base.IOError || rethrow() finally @@ -246,7 +247,7 @@ function process_responses(w::Worker, ev::Threads.Event) end end catch e - # @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace()) + @debug "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace()) terminate!(w, :process_responses) e isa EOFError || e isa Base.IOError || rethrow() end diff --git a/test/workers.jl b/test/workers.jl index 40b0e802..35dac221 100644 --- a/test/workers.jl +++ b/test/workers.jl @@ -17,6 +17,8 @@ using Test @testset "clean shutdown ($w)" begin close(w) @test !process_running(w.process) + @test w.process.termsignal == Base.SIGTERM + @test w.process.exitcode == 0 @test !isopen(w.socket) @test w.terminated @test istaskstarted(w.messages) && istaskdone(w.messages)