Skip to content

WIP/RFC: Allow workers to have a user-given number identifier #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "ReTestItems"
uuid = "817f1d60-ba6b-4fd5-9520-3cf149f6a823"
version = "1.29.0"
version = "1.30.0"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand Down
42 changes: 24 additions & 18 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using .Threads: @spawn, nthreads
using Pkg: Pkg
using TestEnv
using Logging
using LoggingExtras
using LoggingExtras: LoggingExtras, @debugv

export runtests, runtestitem
export @testsetup, @testitem
Expand Down Expand Up @@ -443,43 +443,45 @@ function _runtests_in_current_env(
ti = starting[i]
@spawn begin
with_logger(original_logger) do
manage_worker($w, $proj_name, $testitems, $ti, $cfg)
manage_worker($w, $proj_name, $testitems, $ti, $cfg; worker_num=$i)
end
end
end
end
Test.TESTSET_PRINT_ENABLE[] = true # reenable printing so our `finish` prints
# Let users know if tests are done, and if all of them ran (or if we failed fast).
# Print this above the final report as there might have been other logs printed
# since a failfast-cancellation was printed, but print it ASAP after tests finish
# in case any of the recording/reporting steps have an issue.
print_completion_summary(testitems; failedfast=(cfg.failfast && is_cancelled(testitems)))
record_results!(testitems)
cfg.report && write_junit_file(proj_name, dirname(projectfile), testitems.graph.junit)
if cfg.failfast && is_cancelled(testitems)
# Let users know if not all tests ran. Print this just above the final report as
# there might have been other logs printed since the cancellation was printed.
print_failfast_summary(testitems)
end
@debugv 1 "Calling Test.finish(testitems)"
Test.finish(testitems) # print summary of total passes/failures/errors
finally
Test.TESTSET_PRINT_ENABLE[] = true
# Cleanup test setup logs
@debugv 1 "Cleaning up test setup logs"
foreach(Iterators.filter(endswith(".log"), readdir(RETESTITEMS_TEMP_FOLDER[], join=true))) do logfile
rm(logfile; force=true) # `force` to ignore error if file already cleaned up
end
@debugv 1 "Done cleaning up test setup logs"
end
@debugv 1 "DONE"
return nothing
end

# Start a new `Worker` with `nworker_threads` threads and run `worker_init_expr` on it.
# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num=nothing)
w = Worker(; threads=nworker_threads)
i = worker_num == nothing ? "" : " $worker_num"
function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num)
w = Worker(; threads=nworker_threads, num=worker_num)
# remote_fetch here because we want to make sure the worker is all setup before starting to eval testitems
remote_fetch(w, quote
using ReTestItems, Test
Test.TESTSET_PRINT_ENABLE[] = false
const GLOBAL_TEST_CONTEXT = ReTestItems.TestContext($proj_name, $ntestitems)
GLOBAL_TEST_CONTEXT.setups_evaled = ReTestItems.TestSetupModules()
nthreads_str = $nworker_threads
@info "Starting test worker$($i) on pid = $(Libc.getpid()), with $nthreads_str threads"
num = $worker_num
@info "Starting test worker $(num) on pid=$(Libc.getpid()), with $(nthreads_str) threads"
$(worker_init_expr.args...)
nothing
end)
Expand Down Expand Up @@ -572,19 +574,21 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0)
return testitem
end

# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
function manage_worker(
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config,
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config;
worker_num::Int
)
ntestitems = length(testitems.testitems)
run_number = 1
memory_threshold_percent = 100 * cfg.memory_threshold
while testitem !== nothing
ch = Channel{TestItemResult}(1)
if memory_percent() > memory_threshold_percent
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory."
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting process for worker $worker_num to try to free memory."
terminate!(worker)
wait(worker)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems; worker_num)
end
testitem.workerid[] = worker.pid
timeout = something(testitem.timeout, cfg.testitem_timeout)
Expand Down Expand Up @@ -679,17 +683,19 @@ function manage_worker(
run_number = 1
else
run_number += 1
@info "Retrying $(repr(testitem.name)) on a new worker process. Run=$run_number."
@info "Retrying $(repr(testitem.name)) on a new worker $worker_num process. Run=$run_number."
end
# The worker was terminated, so replace it unless there are no more testitems to run
if testitem !== nothing
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems; worker_num)
end
# Now loop back around to reschedule the testitem
continue
end
end
@info "All tests on worker $worker_num completed. Closing $worker."
close(worker)
@debugv 1 "Worker $worker_num closed: $(worker)"
return nothing
end

Expand Down
4 changes: 4 additions & 0 deletions src/junit_xml.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ function write_junit_file(path::AbstractString, junit::Union{JUnitTestSuites,JUn
open(path, "w") do io
write_junit_file(io, junit)
end
@debugv 1 "Done writing JUnit XML file to $(repr(path))"
return nothing
end

Expand All @@ -201,6 +202,7 @@ function write_junit_file(io::IO, junit::Union{JUnitTestSuites,JUnitTestSuite})
end

function write_junit_xml(io, junit::JUnitTestSuites)
@debugv 2 "Writing JUnit XML for testsuites $(junit.name)"
write(io, "\n<testsuites")
write_counts(io, junit.counts)
write(io, ">")
Expand All @@ -212,6 +214,7 @@ function write_junit_xml(io, junit::JUnitTestSuites)
end

function write_junit_xml(io, ts::JUnitTestSuite)
@debugv 2 "Writing JUnit XML for testsuite $(ts.name)"
write(io, "\n<testsuite name=", xml_markup(ts.name))
write_counts(io, ts.counts)
write(io, ">")
Expand Down Expand Up @@ -258,6 +261,7 @@ function write_dd_tags(io, tc::JUnitTestCase)
end

function write_junit_xml(io, tc::JUnitTestCase)
@debugv 2 "Writing JUnit XML for testcase $(tc.name)"
write(io, "\n\t<testcase name=", xml_markup(tc.name))
write_counts(io, tc.counts)
write(io, ">")
Expand Down
8 changes: 6 additions & 2 deletions src/log_capture.jl
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,13 @@ end
# So that the user is warned that not all tests were run.
# We don't use loglock here, because this is only called once on the coordinator after all
# tasks running tests have stopped and we're printing the final test report.
function print_failfast_summary(t::TestItems)
function print_completion_summary(t::TestItems; failedfast::Bool)
io = DEFAULT_STDOUT[]
printstyled(io, "[ Fail Fast: "; bold=true, color=Base.warn_color())
if failedfast
printstyled(io, "[ Fail Fast: "; bold=true, color=Base.warn_color())
else
printstyled(io, "[ Tests Completed: "; bold=true, color=Base.info_color())
end
println(io, "$(t.count)/$(length(t.testitems)) test items were run.")
return nothing
end
Expand Down
3 changes: 3 additions & 0 deletions src/testcontext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ is_cancelled(t::TestItems) = @atomic t.cancelled
###

function record_results!(ti::TestItems)
@debugv 1 "Recording testitem results"
foreach(ti.graph.children) do child
record_results!(ti.graph, child)
end
@debugv 1 "Done recording testitem results"
return ti
end

function record_results!(dir::DirNode, child_dir::DirNode)
Expand Down
30 changes: 17 additions & 13 deletions src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Base.fetch(f::Future) = fetch(f.value)

mutable struct Worker
lock::ReentrantLock # protects the .futures field; no other fields are modified after construction
pid::Int
num::Int # user given ID
pid::Int # process ID
process::Base.Process
socket::TCPSocket
messages::Task
Expand All @@ -73,7 +74,7 @@ end
function terminate!(w::Worker, from::Symbol=:manual)
already_terminated = @atomicswap :monotonic w.terminated = true
if !already_terminated
@debug "terminating worker $(w.pid) from $from"
@debug "terminating $worker from $from"
end
wte = WorkerTerminatedException(w)
@lock w.lock begin
Expand Down Expand Up @@ -114,7 +115,7 @@ end
# Called when timeout_profile_wait is non-zero.
function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual)
if !Sys.iswindows()
@debug "sending profile request to worker $(w.pid) from $from"
@debug "sending profile request to $worker from $from"
if Sys.islinux()
kill(w.process, 10) # SIGUSR1
elseif Sys.isbsd()
Expand All @@ -128,35 +129,38 @@ end
# gracefully terminate a worker by sending a shutdown message
# and waiting for the other tasks to perform worker shutdown
function Base.close(w::Worker)
@debug "closing $worker"
if !w.terminated && isopen(w.socket)
req = Request(Symbol(), :(), rand(UInt64), true)
@lock w.lock begin
serialize(w.socket, req)
flush(w.socket)
end
end
@debug "waiting for $worker to terminate"
wait(w)
return
end

# wait until our spawned tasks have all finished
Base.wait(w::Worker) = fetch(w.process_watch) && fetch(w.messages) && fetch(w.output)

Base.show(io::IO, w::Worker) = print(io, "Worker(pid=$(w.pid)", w.terminated ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")")
Base.show(io::IO, w::Worker) = print(io, "Worker(num=$(w.num), pid=$(w.pid)", w.terminated ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")")

# used in testing to ensure all created workers are
# eventually cleaned up properly
const GLOBAL_CALLBACK_PER_WORKER = Ref{Any}()

function Worker(;
num::Int=rand(1:typemax(Int32)),
env::AbstractDict=ENV,
dir::String=pwd(),
threads::String="auto",
exeflags=`--threads=$threads`,
connect_timeout::Int=60,
worker_redirect_io::IO=stdout,
worker_redirect_fn=(io, pid, line)->println(io, " Worker $pid: $line")
)
worker_redirect_fn=(io, pid, line)->println(io, " Worker $num $pid: $line")
)
# below copied from Distributed.launch
env = Dict{String, String}(env)
pathsep = Sys.iswindows() ? ";" : ":"
Expand Down Expand Up @@ -192,7 +196,7 @@ function Worker(;
return Sockets.connect(parse(Int, split(port_str, ':')[2]))
end
# create worker
w = Worker(ReentrantLock(), pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false)
w = Worker(ReentrantLock(), num, pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false)
## start a task to watch for worker process termination, notify the event when the task starts
e1 = Threads.Event()
w.process_watch = Threads.@spawn watch_and_terminate!(w, $e1)
Expand Down Expand Up @@ -231,7 +235,7 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
end
end
catch e
# @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace())
# @error "Error redirecting $worker output" exception=(e, catch_backtrace())
terminate!(w, :redirect_worker_output)
e isa EOFError || e isa Base.IOError || rethrow()
finally
Expand All @@ -250,13 +254,13 @@ function process_responses(w::Worker, ev::Threads.Event)
while isopen(w.socket) && !w.terminated
# get the next Response from the worker
r = deserialize(w.socket)
@assert r isa Response "Received invalid response from worker $(w.pid): $(r)"
# println("Received response $(r) from worker $(w.pid)")
@assert r isa Response "Received invalid response from $worker: $(r)"
# println("Received response $(r) from $worker")
@lock lock begin
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)"
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from $worker"
# look up the Future for this request
fut = pop!(reqs, r.id)
@assert !isready(fut.value) "Received duplicate response for request $(r.id) from worker $(w.pid)"
@assert !isready(fut.value) "Received duplicate response for request $(r.id) from $worker"
if r.error !== nothing
# this allows rethrowing the exception from the worker to the caller
close(fut.value, r.error)
Expand All @@ -266,7 +270,7 @@ function process_responses(w::Worker, ev::Threads.Event)
end
end
catch e
# @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace())
# @error "Error processing responses from $worker" exception=(e, catch_backtrace())
terminate!(w, :process_responses)
e isa EOFError || e isa Base.IOError || rethrow()
end
Expand Down
46 changes: 30 additions & 16 deletions test/integrationtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,24 @@ end

# test we can call runtests manually w/ directory
@testset "manual `runtests(dir)`" begin
results = encased_testset() do
runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl"))
using IOCapture
c = IOCapture.capture() do
encased_testset(() -> runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl")))
end
results = c.value
@test n_passed(results) == 2 # NoDeps has two test files with a test each
@test contains(c.output, "[ Tests Completed: 2/2 test items were run.")
end

@testset "manual `runtests(file)`" begin
# test we can point to a file at the base of the package (not just in `src` or `test`)
results = encased_testset() do
runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl", "toplevel_tests.jl"))
using IOCapture
c = IOCapture.capture() do
encased_testset(() -> runtests(joinpath(TEST_PKG_DIR, "NoDeps.jl", "toplevel_tests.jl")))
end
results = c.value
@test n_passed(results) == 1
@test contains(c.output, "[ Tests Completed: 1/1 test items were run.")
end

@testset "`runtests(path)` auto finds testsetups" begin
Expand Down Expand Up @@ -273,20 +279,28 @@ end
nworkers = 2
@testset "runtests with nworkers = $nworkers" verbose=true begin
@testset "Pkg.test() $pkg" for pkg in TEST_PKGS
results = with_test_package(pkg) do
withenv("RETESTITEMS_NWORKERS" => nworkers) do
Pkg.test()
c = IOCapture.capture() do
with_test_package(pkg) do
withenv("RETESTITEMS_NWORKERS" => nworkers) do
Pkg.test()
end
end
end
results = c.value
@test all_passed(results)
@test contains(c.output, "[ Tests Completed")
end
@testset "Pkg.test() DontPass.jl" begin
results = with_test_package("DontPass.jl") do
withenv("RETESTITEMS_NWORKERS" => 2) do
Pkg.test()
c = IOCapture.capture() do
with_test_package("DontPass.jl") do
withenv("RETESTITEMS_NWORKERS" => 2) do
Pkg.test()
end
end
end
results = c.value
@test length(non_passes(results)) > 0
@test contains(c.output, "[ Tests Completed")
end
end

Expand Down Expand Up @@ -1003,10 +1017,10 @@ end
@testset "worker always crashes immediately" begin
file = joinpath(TEST_FILES_DIR, "_happy_tests.jl")

# We have occassionally seen the Process exist with the expected signal.
# We have occassionally seen the Process exit without the expected signal.
@assert typemin(Int32) == -2147483648
terminated_err_log_1 = r"Error: Worker\(pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)."
terminated_err_log_2 = r"Error: Worker\(pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 2/2\)."
terminated_err_log_1 = r"Error: Worker\(num=\d+, pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)."
terminated_err_log_2 = r"Error: Worker\(num=\d+, pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 2/2\)."

worker_init_expr = :(@eval ccall(:abort, Cvoid, ()))
# We don't use IOCapture for capturing logs as that seems to hang when the worker crashes.
Expand Down Expand Up @@ -1036,9 +1050,9 @@ end
@eval ccall(:abort, Cvoid, ())
end
end
# We have occassionally seen the Process exist with the expected signal.
# We have occassionally seen the Process exit without the expected signal.
@assert typemin(Int32) == -2147483648
terminated_err_log_1 = r"Error: Worker\(pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)."
terminated_err_log_1 = r"Error: Worker\(num=\d+, pid=\d+, terminated=true, termsignal=(6|-2147483648)\) terminated unexpectedly. Starting new worker process \(retry 1/2\)."
# We don't use IOCapture for capturing logs as that seems to hang when the worker crashes.
mktemp() do log_io, _
results = redirect_stdio(stdout=log_io, stderr=log_io, stdin=devnull) do
Expand Down Expand Up @@ -1198,7 +1212,7 @@ end
# monkey-patch the internal `memory_percent` function to return a fixed value, so we
# can control if we hit the `memory_threshold`.
@eval ReTestItems.memory_percent() = 83.1
expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting worker process to try to free memory."
expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting process for worker 1 to try to free memory."

# Pass `memory_threshold` keyword, and hit the memory threshold.
c1 = IOCapture.capture() do
Expand Down
Loading