diff --git a/Project.toml b/Project.toml index 1b7f821f..72e619ef 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "ReTestItems" uuid = "817f1d60-ba6b-4fd5-9520-3cf149f6a823" -version = "1.29.0" +version = "1.30.0" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" diff --git a/src/ReTestItems.jl b/src/ReTestItems.jl index 82d67792..ea778d5c 100644 --- a/src/ReTestItems.jl +++ b/src/ReTestItems.jl @@ -7,7 +7,7 @@ using .Threads: @spawn, nthreads using Pkg: Pkg using TestEnv using Logging -using LoggingExtras +using LoggingExtras: LoggingExtras, @debugv export runtests, runtestitem export @testsetup, @testitem @@ -443,35 +443,36 @@ function _runtests_in_current_env( ti = starting[i] @spawn begin with_logger(original_logger) do - manage_worker($w, $proj_name, $testitems, $ti, $cfg) + manage_worker($w, $proj_name, $testitems, $ti, $cfg; worker_num=$i) end end end end Test.TESTSET_PRINT_ENABLE[] = true # reenable printing so our `finish` prints + # Let users know if tests are done, and if all of them ran (or if we failed fast). + # Print this above the final report as there might have been other logs printed + # since a failfast-cancellation was printed, but print it ASAP after tests finish + # in case any of the recording/reporting steps have an issue. + print_completion_summary(testitems; failedfast=(cfg.failfast && is_cancelled(testitems))) record_results!(testitems) cfg.report && write_junit_file(proj_name, dirname(projectfile), testitems.graph.junit) - if cfg.failfast && is_cancelled(testitems) - # Let users know if not all tests ran. Print this just above the final report as - # there might have been other logs printed since the cancellation was printed. - print_failfast_summary(testitems) - end + @debugv 1 "Calling Test.finish(testitems)" Test.finish(testitems) # print summary of total passes/failures/errors finally Test.TESTSET_PRINT_ENABLE[] = true - # Cleanup test setup logs + @debugv 1 "Cleaning up test setup logs" foreach(Iterators.filter(endswith(".log"), readdir(RETESTITEMS_TEMP_FOLDER[], join=true))) do logfile rm(logfile; force=true) # `force` to ignore error if file already cleaned up end + @debugv 1 "Done cleaning up test setup logs" end + @debugv 1 "DONE" return nothing end # Start a new `Worker` with `nworker_threads` threads and run `worker_init_expr` on it. -# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker. -function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num=nothing) - w = Worker(; threads=nworker_threads) - i = worker_num == nothing ? "" : " $worker_num" +function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num) + w = Worker(; threads=nworker_threads, num=worker_num) # remote_fetch here because we want to make sure the worker is all setup before starting to eval testitems remote_fetch(w, quote using ReTestItems, Test @@ -479,7 +480,8 @@ function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr const GLOBAL_TEST_CONTEXT = ReTestItems.TestContext($proj_name, $ntestitems) GLOBAL_TEST_CONTEXT.setups_evaled = ReTestItems.TestSetupModules() nthreads_str = $nworker_threads - @info "Starting test worker$($i) on pid = $(Libc.getpid()), with $nthreads_str threads" + num = $worker_num + @info "Starting test worker $(num) on pid=$(Libc.getpid()), with $(nthreads_str) threads" $(worker_init_expr.args...) nothing end) @@ -572,8 +574,10 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0) return testitem end +# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker. function manage_worker( - worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config, + worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config; + worker_num::Int ) ntestitems = length(testitems.testitems) run_number = 1 @@ -581,10 +585,10 @@ function manage_worker( while testitem !== nothing ch = Channel{TestItemResult}(1) if memory_percent() > memory_threshold_percent - @warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory." + @warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting process for worker $worker_num to try to free memory." terminate!(worker) wait(worker) - worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems) + worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems; worker_num) end testitem.workerid[] = worker.pid timeout = something(testitem.timeout, cfg.testitem_timeout) @@ -679,17 +683,19 @@ function manage_worker( run_number = 1 else run_number += 1 - @info "Retrying $(repr(testitem.name)) on a new worker process. Run=$run_number." + @info "Retrying $(repr(testitem.name)) on a new worker $worker_num process. Run=$run_number." end # The worker was terminated, so replace it unless there are no more testitems to run if testitem !== nothing - worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems) + worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems; worker_num) end # Now loop back around to reschedule the testitem continue end end + @info "All tests on worker $worker_num completed. Closing $worker." close(worker) + @debugv 1 "Worker $worker_num closed: $(worker)" return nothing end diff --git a/src/junit_xml.jl b/src/junit_xml.jl index fefeb062..1f0e5607 100644 --- a/src/junit_xml.jl +++ b/src/junit_xml.jl @@ -191,6 +191,7 @@ function write_junit_file(path::AbstractString, junit::Union{JUnitTestSuites,JUn open(path, "w") do io write_junit_file(io, junit) end + @debugv 1 "Done writing JUnit XML file to $(repr(path))" return nothing end @@ -201,6 +202,7 @@ function write_junit_file(io::IO, junit::Union{JUnitTestSuites,JUnitTestSuite}) end function write_junit_xml(io, junit::JUnitTestSuites) + @debugv 2 "Writing JUnit XML for testsuites $(junit.name)" write(io, "\n") @@ -212,6 +214,7 @@ function write_junit_xml(io, junit::JUnitTestSuites) end function write_junit_xml(io, ts::JUnitTestSuite) + @debugv 2 "Writing JUnit XML for testsuite $(ts.name)" write(io, "\n") @@ -258,6 +261,7 @@ function write_dd_tags(io, tc::JUnitTestCase) end function write_junit_xml(io, tc::JUnitTestCase) + @debugv 2 "Writing JUnit XML for testcase $(tc.name)" write(io, "\n\t") diff --git a/src/log_capture.jl b/src/log_capture.jl index 8c2fbb57..12dd14cc 100644 --- a/src/log_capture.jl +++ b/src/log_capture.jl @@ -310,9 +310,13 @@ end # So that the user is warned that not all tests were run. # We don't use loglock here, because this is only called once on the coordinator after all # tasks running tests have stopped and we're printing the final test report. -function print_failfast_summary(t::TestItems) +function print_completion_summary(t::TestItems; failedfast::Bool) io = DEFAULT_STDOUT[] - printstyled(io, "[ Fail Fast: "; bold=true, color=Base.warn_color()) + if failedfast + printstyled(io, "[ Fail Fast: "; bold=true, color=Base.warn_color()) + else + printstyled(io, "[ Tests Completed: "; bold=true, color=Base.info_color()) + end println(io, "$(t.count)/$(length(t.testitems)) test items were run.") return nothing end diff --git a/src/testcontext.jl b/src/testcontext.jl index fdd574e5..9fdae14e 100644 --- a/src/testcontext.jl +++ b/src/testcontext.jl @@ -108,9 +108,12 @@ is_cancelled(t::TestItems) = @atomic t.cancelled ### function record_results!(ti::TestItems) + @debugv 1 "Recording testitem results" foreach(ti.graph.children) do child record_results!(ti.graph, child) end + @debugv 1 "Done recording testitem results" + return ti end function record_results!(dir::DirNode, child_dir::DirNode) diff --git a/src/workers.jl b/src/workers.jl index ebaac6f3..6271e14a 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -52,7 +52,8 @@ Base.fetch(f::Future) = fetch(f.value) mutable struct Worker lock::ReentrantLock # protects the .futures field; no other fields are modified after construction - pid::Int + num::Int # user given ID + pid::Int # process ID process::Base.Process socket::TCPSocket messages::Task @@ -73,7 +74,7 @@ end function terminate!(w::Worker, from::Symbol=:manual) already_terminated = @atomicswap :monotonic w.terminated = true if !already_terminated - @debug "terminating worker $(w.pid) from $from" + @debug "terminating $worker from $from" end wte = WorkerTerminatedException(w) @lock w.lock begin @@ -114,7 +115,7 @@ end # Called when timeout_profile_wait is non-zero. function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual) if !Sys.iswindows() - @debug "sending profile request to worker $(w.pid) from $from" + @debug "sending profile request to $worker from $from" if Sys.islinux() kill(w.process, 10) # SIGUSR1 elseif Sys.isbsd() @@ -128,6 +129,7 @@ end # gracefully terminate a worker by sending a shutdown message # and waiting for the other tasks to perform worker shutdown function Base.close(w::Worker) + @debug "closing $worker" if !w.terminated && isopen(w.socket) req = Request(Symbol(), :(), rand(UInt64), true) @lock w.lock begin @@ -135,6 +137,7 @@ function Base.close(w::Worker) flush(w.socket) end end + @debug "waiting for $worker to terminate" wait(w) return end @@ -142,21 +145,22 @@ end # wait until our spawned tasks have all finished Base.wait(w::Worker) = fetch(w.process_watch) && fetch(w.messages) && fetch(w.output) -Base.show(io::IO, w::Worker) = print(io, "Worker(pid=$(w.pid)", w.terminated ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")") +Base.show(io::IO, w::Worker) = print(io, "Worker(num=$(w.num), pid=$(w.pid)", w.terminated ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")") # used in testing to ensure all created workers are # eventually cleaned up properly const GLOBAL_CALLBACK_PER_WORKER = Ref{Any}() function Worker(; + num::Int=rand(1:typemax(Int32)), env::AbstractDict=ENV, dir::String=pwd(), threads::String="auto", exeflags=`--threads=$threads`, connect_timeout::Int=60, worker_redirect_io::IO=stdout, - worker_redirect_fn=(io, pid, line)->println(io, " Worker $pid: $line") - ) + worker_redirect_fn=(io, pid, line)->println(io, " Worker $num $pid: $line") +) # below copied from Distributed.launch env = Dict{String, String}(env) pathsep = Sys.iswindows() ? ";" : ":" @@ -192,7 +196,7 @@ function Worker(; return Sockets.connect(parse(Int, split(port_str, ':')[2])) end # create worker - w = Worker(ReentrantLock(), pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false) + w = Worker(ReentrantLock(), num, pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false) ## start a task to watch for worker process termination, notify the event when the task starts e1 = Threads.Event() w.process_watch = Threads.@spawn watch_and_terminate!(w, $e1) @@ -231,7 +235,7 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event) end end catch e - # @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace()) + # @error "Error redirecting $worker output" exception=(e, catch_backtrace()) terminate!(w, :redirect_worker_output) e isa EOFError || e isa Base.IOError || rethrow() finally @@ -250,13 +254,13 @@ function process_responses(w::Worker, ev::Threads.Event) while isopen(w.socket) && !w.terminated # get the next Response from the worker r = deserialize(w.socket) - @assert r isa Response "Received invalid response from worker $(w.pid): $(r)" - # println("Received response $(r) from worker $(w.pid)") + @assert r isa Response "Received invalid response from $worker: $(r)" + # println("Received response $(r) from $worker") @lock lock begin - @assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)" + @assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from $worker" # look up the Future for this request fut = pop!(reqs, r.id) - @assert !isready(fut.value) "Received duplicate response for request $(r.id) from worker $(w.pid)" + @assert !isready(fut.value) "Received duplicate response for request $(r.id) from $worker" if r.error !== nothing # this allows rethrowing the exception from the worker to the caller close(fut.value, r.error) @@ -266,7 +270,7 @@ function process_responses(w::Worker, ev::Threads.Event) end end catch e - # @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace()) + # @error "Error processing responses from $worker" exception=(e, catch_backtrace()) terminate!(w, :process_responses) e isa EOFError || e isa Base.IOError || rethrow() end diff --git a/test/integrationtests.jl b/test/integrationtests.jl index f4c7cc3c..f49b150d 100644 --- a/test/integrationtests.jl +++ b/test/integrationtests.jl @@ -45,18 +45,24 @@ end # test we can call runtests manually w/ directory @testset "manual `runtests(dir)`" begin - results = encased_testset() do - runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl")) + using IOCapture + c = IOCapture.capture() do + encased_testset(() -> runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl"))) end + results = c.value @test n_passed(results) == 2 # NoDeps has two test files with a test each + @test contains(c.output, "[ Tests Completed: 2/2 test items were run.") end @testset "manual `runtests(file)`" begin # test we can point to a file at the base of the package (not just in `src` or `test`) - results = encased_testset() do - runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl", "toplevel_tests.jl")) + using IOCapture + c = IOCapture.capture() do + encased_testset(() -> runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl", "toplevel_tests.jl"))) end + results = c.value @test n_passed(results) == 1 + @test contains(c.output, "[ Tests Completed: 1/1 test items were run.") end @testset "`runtests(path)` auto finds testsetups" begin @@ -273,20 +279,28 @@ end nworkers = 2 @testset "runtests with nworkers = $nworkers" verbose=true begin @testset "Pkg.test() $pkg" for pkg in TEST_PKGS - results = with_test_package(pkg) do - withenv("RETESTITEMS_NWORKERS" => nworkers) do - Pkg.test() + c = IOCapture.capture() do + with_test_package(pkg) do + withenv("RETESTITEMS_NWORKERS" => nworkers) do + Pkg.test() + end end end + results = c.value @test all_passed(results) + @test contains(c.output, "[ Tests Completed") end @testset "Pkg.test() DontPass.jl" begin - results = with_test_package("DontPass.jl") do - withenv("RETESTITEMS_NWORKERS" => 2) do - Pkg.test() + c = IOCapture.capture() do + with_test_package("DontPass.jl") do + withenv("RETESTITEMS_NWORKERS" => 2) do + Pkg.test() + end end end + results = c.value @test length(non_passes(results)) > 0 + @test contains(c.output, "[ Tests Completed") end end @@ -1003,10 +1017,10 @@ end @testset "worker always crashes immediately" begin file = joinpath(TEST_FILES_DIR, "_happy_tests.jl") - # We have occassionally seen the Process exist with the expected signal. + # We have occassionally seen the Process exit without the expected signal. @assert typemin(Int32) == -2147483648 - terminated_err_log_1 = r"Error: Worker\(pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)." - terminated_err_log_2 = r"Error: Worker\(pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 2/2\)." + terminated_err_log_1 = r"Error: Worker\(num=\d+, pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)." + terminated_err_log_2 = r"Error: Worker\(num=\d+, pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 2/2\)." worker_init_expr = :(@eval ccall(:abort, Cvoid, ())) # We don't use IOCapture for capturing logs as that seems to hang when the worker crashes. @@ -1036,9 +1050,9 @@ end @eval ccall(:abort, Cvoid, ()) end end - # We have occassionally seen the Process exist with the expected signal. + # We have occassionally seen the Process exit without the expected signal. @assert typemin(Int32) == -2147483648 - terminated_err_log_1 = r"Error: Worker\(pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)." + terminated_err_log_1 = r"Error: Worker\(num=\d+, pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)." # We don't use IOCapture for capturing logs as that seems to hang when the worker crashes. mktemp() do log_io, _ results = redirect_stdio(stdout=log_io, stderr=log_io, stdin=devnull) do @@ -1198,7 +1212,7 @@ end # monkey-patch the internal `memory_percent` function to return a fixed value, so we # can control if we hit the `memory_threshold`. @eval ReTestItems.memory_percent() = 83.1 - expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting worker process to try to free memory." + expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting process for worker 1 to try to free memory." # Pass `memory_threshold` keyword, and hit the memory threshold. c1 = IOCapture.capture() do