From 6e03073809d3d8e24b92e8e780b31121d45d2ea2 Mon Sep 17 00:00:00 2001 From: Florian Henkes Date: Thu, 31 Jul 2025 18:37:08 -0400 Subject: [PATCH 1/3] renamed OnLocalhost --> LocalRun + allow julia flags and dir for LocalRun --- src/ParallelProcessingTools.jl | 1 + src/localhost.jl | 89 ++++++++++++++++++++++++++++++++++ src/runworkers.jl | 78 ----------------------------- 3 files changed, 90 insertions(+), 78 deletions(-) create mode 100644 src/localhost.jl diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 210fc59..394098d 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -40,6 +40,7 @@ include("procinit.jl") include("workerpool.jl") include("onworkers.jl") include("runworkers.jl") +include("localhost.jl") include("slurm.jl") include("htcondor.jl") include("deprecated.jl") diff --git a/src/localhost.jl b/src/localhost.jl new file mode 100644 index 0000000..79138bf --- /dev/null +++ b/src/localhost.jl @@ -0,0 +1,89 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + LocalRun(; + n::Integer = 1 + env::Dict{String,String} = Dict{String,String}() + julia_flags::Cmd = _default_julia_flags() + dir = pwd() + ) isa DynamicAddProcsMode + +Mode that runs `n` worker processes on the current host. + +Example: + +```julia +runmode = LocalRun(n = 4) +task, n = runworkers(runmode) + +Threads.@async begin + wait(task) + @info "SLURM workers have terminated." +end + +@wait_while nprocs()-1 < n) +``` + +Workers can also be started manually, use +[`worker_start_command(runmode)`](@ref) to get the system (shell) command and +run it from a separate process or so. +""" +@with_kw struct LocalRun <: DynamicAddProcsMode + n::Int + julia_flags::Cmd = _default_julia_flags() + dir = pwd() + env::Dict{String,String} = Dict{String,String}() +end +export LocalRun + +@deprecate OnLocalhost LocalRun + +function worker_start_command(runmode::LocalRun, manager::ElasticManager) + julia_flags = runmode.julia_flags + dir = runmode.dir + + jl_threads_flag = any(occursin.(Ref("--threads"), string.(julia_flags))) ? `` : `--threads=$(nthreads())` + jl_dir_flags = `-e "cd(\"$(dir)\")"` + additional_julia_flags = `$jl_threads_flag $julia_flags $jl_dir_flags` + + worker_cmd = worker_local_startcmd( + manager; + julia_flags = additional_julia_flags, + env = runmode.env + ) + return worker_cmd, runmode.n, runmode.n +end + +function runworkers(runmode::LocalRun, manager::ElasticManager) + start_cmd, m, n = worker_start_command(runmode, manager) + + task = Threads.@async begin + processes = Base.Process[] + for _ in 1:m + push!(processes, open(start_cmd)) + end + @wait_while any(isactive, processes) + end + + return task, n +end + + +#= +# ToDo: Add SSHWorkers or similar: + +@with_kw struct SSHWorkers <: RunProcsMode + hosts::Vector{Any} + ssd_flags::Cmd = _default_slurm_flags() + julia_flags::Cmd = _default_julia_flags() + dir = ... + env = ... + tunnel::Bool = false + multiplex::Bool = false + shell::Symbol = :posix + max_parallel::Int = 10 + enable_threaded_blas::Bool = true + topology::Symbol = :all_to_all + lazy_connections::Bool = true +end +=# \ No newline at end of file diff --git a/src/runworkers.jl b/src/runworkers.jl index e45eb7b..da232b6 100644 --- a/src/runworkers.jl +++ b/src/runworkers.jl @@ -297,84 +297,6 @@ function worker_local_startcmd( end -""" - OnLocalhost(; - n::Integer = 1 - env::Dict{String,String} = Dict{String,String}() - ) isa DynamicAddProcsMode - -Mode that runs `n` worker processes on the current host. - -Example: - -```julia -runmode = OnLocalhost(n = 4) -task, n = runworkers(runmode) - -Threads.@async begin - wait(task) - @info "SLURM workers have terminated." -end - -@wait_while nprocs()-1 < n) -``` - -Workers can also be started manually, use -[`worker_start_command(runmode)`](@ref) to get the system (shell) command and -run it from a separate process or so. -""" -@with_kw struct OnLocalhost <: DynamicAddProcsMode - n::Int - env::Dict{String,String} = Dict{String,String}() -end -export OnLocalhost - -function worker_start_command(runmode::OnLocalhost, manager::ElasticManager) - worker_nthreads = nthreads() - julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads` - worker_cmd = worker_local_startcmd( - manager; - julia_flags = julia_flags, - env = runmode.env - ) - return worker_cmd, runmode.n, runmode.n -end - -function runworkers(runmode::OnLocalhost, manager::ElasticManager) - start_cmd, m, n = worker_start_command(runmode, manager) - - task = Threads.@async begin - processes = Base.Process[] - for _ in 1:m - push!(processes, open(start_cmd)) - end - @wait_while any(isactive, processes) - end - - return task, n -end - - -#= -# ToDo: Add SSHWorkers or similar: - -@with_kw struct SSHWorkers <: RunProcsMode - hosts::Vector{Any} - ssd_flags::Cmd = _default_slurm_flags() - julia_flags::Cmd = _default_julia_flags() - dir = ... - env = ... - tunnel::Bool = false - multiplex::Bool = false - shell::Symbol = :posix - max_parallel::Int = 10 - enable_threaded_blas::Bool = true - topology::Symbol = :all_to_all - lazy_connections::Bool = true -end -=# - - """ stopworkers() stopworkers(pid::Int) From 6513b2c09f6c9ebe68945613c679099cfe54ff20 Mon Sep 17 00:00:00 2001 From: Florian Henkes Date: Thu, 31 Jul 2025 18:42:44 -0400 Subject: [PATCH 2/3] Update tests --- test/test_onworkers.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/test_onworkers.jl b/test/test_onworkers.jl index 260c3c4..7d65544 100644 --- a/test/test_onworkers.jl +++ b/test/test_onworkers.jl @@ -46,7 +46,7 @@ end @testset "onworkers" begin - runmode = OnLocalhost(n = 2) + runmode = LocalRun(n = 2) @testset "runworkers $(nameof(typeof(runmode)))" begin test_runprocs(2) do @@ -86,7 +86,7 @@ end @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(gen_mayfail(1), "bar"; tries = 2, label = "mayfail") @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", maxtime = 0.5, tries = 2) - runworkers(OnLocalhost(n = 2)) + runworkers(LocalRun(n = 2)) timer = Timer(30) @wait_while nprocs() < 3 && isopen(timer) @@ -114,7 +114,7 @@ end @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", time = 0.5, tries = 2) @test nprocs() == 1 - addworkers(OnLocalhost(2)) + addworkers(LocalRun(2)) @test @inferred(onworker(mytask)) == () @test @inferred(onworker(mytask, 1, "foo")) == ("foo", ) From afbc8d48ca3500c7845b4fbf793b5bb928b048e6 Mon Sep 17 00:00:00 2001 From: Florian Henkes Date: Wed, 6 Aug 2025 10:00:22 -0400 Subject: [PATCH 3/3] Renamed all *Run DynamicProcsMode to On* modes --- docs/src/index.md | 4 ++-- src/htcondor.jl | 18 ++++++++++-------- src/localhost.jl | 14 ++++++-------- src/slurm.jl | 13 +++++++------ test/test_onworkers.jl | 6 +++--- 5 files changed, 28 insertions(+), 27 deletions(-) diff --git a/docs/src/index.md b/docs/src/index.md index 9487f9c..10c0891 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -7,7 +7,7 @@ This Julia package provides some tools to ease multithreaded and distributed pro Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier. -An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`SlurmRun`](@ref)), or via HTCondor ([`HTCondorRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). +An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`OnSlurm`](@ref)), or via HTCondor ([`OnHTCondor`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy. @@ -107,7 +107,7 @@ using ParallelProcessingTools, Distributed pinthreads_auto() end -_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`)) +_, n = runworkers(OnSlurm(slurm_flags = `--cpu-bind=cores --mem-bind=local`)) @wait_while maxtime=240 nprocs() < n + 1 resources = worker_resources() diff --git a/src/htcondor.jl b/src/htcondor.jl index 4d1618f..1423754 100644 --- a/src/htcondor.jl +++ b/src/htcondor.jl @@ -1,7 +1,7 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). """ - HTCondorRun(; + OnHTCondor(; n::Int = 1 condor_flags::Cmd = _default_condor_flags() condor_settings::Dict{String,String} = Dict{String,String}() @@ -19,7 +19,7 @@ Condor submit script and steering `.sh` files are stored in `jobfile_dir`. Example: ```julia-repl -julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB")) +julia> runmode = OnHTCondor(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB")) task = runworkers(runmode) julia> runworkers(runmode) @@ -39,7 +39,7 @@ Workers can also be started manually, use [`worker_start_command(runmode)`](@ref) to get the `condor_submit` start command and run it from a separate process or so. """ -@with_kw struct HTCondorRun <: DynamicAddProcsMode +@with_kw struct OnHTCondor <: DynamicAddProcsMode n::Int = 1 condor_flags::Cmd = _default_condor_flags() condor_settings::Dict{String,String} = Dict{String,String}() @@ -49,12 +49,14 @@ run it from a separate process or so. env::Dict{String,String} = Dict{String,String}() redirect_output::Bool = true end -export HTCondorRun +export OnHTCondor + +@deprecate HTCondorRun OnHTCondor _default_condor_flags() = `` const _g_condor_nextjlstep = Base.Threads.Atomic{Int}(1) -function worker_start_command(runmode::HTCondorRun, manager::ElasticManager) +function worker_start_command(runmode::OnHTCondor, manager::ElasticManager) flags = runmode.condor_flags n_workers = runmode.n temp_name = tempname(runmode.jobfile_dir) @@ -66,7 +68,7 @@ function worker_start_command(runmode::HTCondorRun, manager::ElasticManager) return `condor_submit $flags $submit_file_path`, 1, n_workers end -function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager::ElasticManager) +function _generate_condor_worker_script(filename, runmode::OnHTCondor, manager::ElasticManager) julia_flags = runmode.julia_flags request_memory = get(runmode.condor_settings, "request_memory", "2GB") @@ -94,7 +96,7 @@ function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager: end end -function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::HTCondorRun) +function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::OnHTCondor) jlstep = atomic_add!(_g_condor_nextjlstep, 1) jobname = "julia-$(getpid())-$jlstep" default_dict = Dict( @@ -117,7 +119,7 @@ function _generate_condor_submit_file(submit_file_path, worker_script_path, runm end end -function runworkers(runmode::HTCondorRun, manager::ElasticManager) +function runworkers(runmode::OnHTCondor, manager::ElasticManager) run_cmd, m, n = worker_start_command(runmode, manager) @info "Submitting HTCondor job: $run_cmd" process = run(run_cmd) diff --git a/src/localhost.jl b/src/localhost.jl index 79138bf..a733905 100644 --- a/src/localhost.jl +++ b/src/localhost.jl @@ -1,7 +1,7 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). """ - LocalRun(; + OnLocalhost(; n::Integer = 1 env::Dict{String,String} = Dict{String,String}() julia_flags::Cmd = _default_julia_flags() @@ -13,7 +13,7 @@ Mode that runs `n` worker processes on the current host. Example: ```julia -runmode = LocalRun(n = 4) +runmode = OnLocalhost(n = 4) task, n = runworkers(runmode) Threads.@async begin @@ -28,17 +28,15 @@ Workers can also be started manually, use [`worker_start_command(runmode)`](@ref) to get the system (shell) command and run it from a separate process or so. """ -@with_kw struct LocalRun <: DynamicAddProcsMode +@with_kw struct OnLocalhost <: DynamicAddProcsMode n::Int julia_flags::Cmd = _default_julia_flags() dir = pwd() env::Dict{String,String} = Dict{String,String}() end -export LocalRun +export OnLocalhost -@deprecate OnLocalhost LocalRun - -function worker_start_command(runmode::LocalRun, manager::ElasticManager) +function worker_start_command(runmode::OnLocalhost, manager::ElasticManager) julia_flags = runmode.julia_flags dir = runmode.dir @@ -54,7 +52,7 @@ function worker_start_command(runmode::LocalRun, manager::ElasticManager) return worker_cmd, runmode.n, runmode.n end -function runworkers(runmode::LocalRun, manager::ElasticManager) +function runworkers(runmode::OnLocalhost, manager::ElasticManager) start_cmd, m, n = worker_start_command(runmode, manager) task = Threads.@async begin diff --git a/src/slurm.jl b/src/slurm.jl index e4dad97..1e1e794 100644 --- a/src/slurm.jl +++ b/src/slurm.jl @@ -1,7 +1,7 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). """ - SlurmRun(; + OnSlurm(; slurm_flags::Cmd = {defaults} julia_flags::Cmd = {defaults} dir = pwd() @@ -20,7 +20,7 @@ Workers are started with current directory set to `dir`. Example: ```julia -runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) +runmode = OnSlurm(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) task = runworkers(runmode) Threads.@async begin @@ -35,19 +35,20 @@ Workers can also be started manually, use [`worker_start_command(runmode)`](@ref) to get the `srun` start command and run it from a separate process or so. """ -@with_kw struct SlurmRun <: DynamicAddProcsMode +@with_kw struct OnSlurm <: DynamicAddProcsMode slurm_flags::Cmd = _default_slurm_flags() julia_flags::Cmd = _default_julia_flags() dir = pwd() env::Dict{String,String} = Dict{String,String}() redirect_output::Bool = true end -export SlurmRun +export OnSlurm +@deprecate SlurmRun OnSlurm const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1) -function worker_start_command(runmode::SlurmRun, manager::ElasticManager) +function worker_start_command(runmode::OnSlurm, manager::ElasticManager) slurm_flags = runmode.slurm_flags julia_flags = runmode.julia_flags dir = runmode.dir @@ -99,7 +100,7 @@ function _slurm_mem_per_task(tc::NamedTuple) end -function runworkers(runmode::SlurmRun, manager::ElasticManager) +function runworkers(runmode::OnSlurm, manager::ElasticManager) srun_cmd, m, n = worker_start_command(runmode, manager) @info "Starting SLURM job: $srun_cmd" task = Threads.@async begin diff --git a/test/test_onworkers.jl b/test/test_onworkers.jl index 7d65544..260c3c4 100644 --- a/test/test_onworkers.jl +++ b/test/test_onworkers.jl @@ -46,7 +46,7 @@ end @testset "onworkers" begin - runmode = LocalRun(n = 2) + runmode = OnLocalhost(n = 2) @testset "runworkers $(nameof(typeof(runmode)))" begin test_runprocs(2) do @@ -86,7 +86,7 @@ end @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(gen_mayfail(1), "bar"; tries = 2, label = "mayfail") @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", maxtime = 0.5, tries = 2) - runworkers(LocalRun(n = 2)) + runworkers(OnLocalhost(n = 2)) timer = Timer(30) @wait_while nprocs() < 3 && isopen(timer) @@ -114,7 +114,7 @@ end @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", time = 0.5, tries = 2) @test nprocs() == 1 - addworkers(LocalRun(2)) + addworkers(OnLocalhost(2)) @test @inferred(onworker(mytask)) == () @test @inferred(onworker(mytask, 1, "foo")) == ("foo", )