Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,11 @@ function _runtests_in_current_env(
@info "Starting test workers"
workers = Vector{Worker}(undef, nworkers)
ntestitems = length(testitems.testitems)
ctx = TestContext(proj_name, ntestitems)
@sync for i in 1:nworkers
@spawn begin
with_logger(original_logger) do
$workers[$i] = robust_start_worker($proj_name, $(cfg.nworker_threads), $(cfg.worker_init_expr), $ntestitems; worker_num=$i)
$workers[$i] = robust_start_worker(ctx, $(cfg.nworker_threads), $(cfg.worker_init_expr); worker_num=$i)
end
end
end
Expand All @@ -461,9 +462,10 @@ function _runtests_in_current_env(
starting = get_starting_testitems(testitems, nworkers)
@sync for (i, w) in enumerate(workers)
ti = starting[i]
ti === nothing && continue
@spawn begin
with_logger(original_logger) do
manage_worker($w, $proj_name, $testitems, $ti, $cfg; worker_num=$i)
manage_worker($w, ctx, $testitems, $ti, $cfg; worker_num=$i)
end
end
end
Expand Down Expand Up @@ -497,14 +499,15 @@ end

# Start a new `Worker` with `nworker_threads` threads and run `worker_init_expr` on it.
# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num=nothing)
# function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num=nothing)
function start_worker(ctx::TestContext, nworker_threads::String, worker_init_expr::Expr; worker_num=nothing)
w = Worker(; threads=nworker_threads)
i = worker_num == nothing ? "" : " $worker_num"
# remote_fetch here because we want to make sure the worker is all setup before starting to eval testitems
remote_fetch(w, quote
using ReTestItems, Test
Test.TESTSET_PRINT_ENABLE[] = false
const GLOBAL_TEST_CONTEXT = ReTestItems.TestContext($proj_name, $ntestitems)
const GLOBAL_TEST_CONTEXT = $ctx
GLOBAL_TEST_CONTEXT.setups_evaled = ReTestItems.TestSetupModules()
nthreads_str = $nworker_threads
@info "Starting test worker$($i) on pid = $(Libc.getpid()), with $nthreads_str threads"
Expand Down Expand Up @@ -600,9 +603,17 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0)
return testitem
end

function _restart_for_memory(memory_threshold_percent::Number, ctx::TestContext, cfg::_Config, worker_num::Int)
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting process for worker $worker_num to try to free memory."
terminate!(worker, :memory_threshold)
wait(worker)
worker = robust_start_worker(ctx, cfg.nworker_threads, cfg.worker_init_expr; worker_num)
return worker
end

# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
function manage_worker(
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, cfg::_Config;
worker::Worker, ctx::TestContext, testitems::TestItems, testitem::TestItem, cfg::_Config;
worker_num::Int
)
ntestitems = length(testitems.testitems)
Expand All @@ -611,10 +622,7 @@ function manage_worker(
while testitem !== nothing
ch = Channel{TestItemResult}(1)
if memory_percent() > memory_threshold_percent
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting process for worker $worker_num to try to free memory."
terminate!(worker)
wait(worker)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
_restart_for_memory(memory_threshold_percent, ctx, cfg, worker_num)
end
testitem.workerid[] = worker.pid
timeout = something(testitem.timeout, cfg.testitem_timeout)
Expand Down Expand Up @@ -713,7 +721,7 @@ function manage_worker(
end
# The worker was terminated, so replace it unless there are no more testitems to run
if testitem !== nothing
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
worker = robust_start_worker(ctx, cfg.nworker_threads, cfg.worker_init_expr)
end
# Now loop back around to reschedule the testitem
continue
Expand Down
Loading