diff --git a/docs/src/index.md b/docs/src/index.md index 9487f9c..10c0891 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -7,7 +7,7 @@ This Julia package provides some tools to ease multithreaded and distributed pro Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier. -An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`SlurmRun`](@ref)), or via HTCondor ([`HTCondorRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). +An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`OnSlurm`](@ref)), or via HTCondor ([`OnHTCondor`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy. @@ -107,7 +107,7 @@ using ParallelProcessingTools, Distributed pinthreads_auto() end -_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`)) +_, n = runworkers(OnSlurm(slurm_flags = `--cpu-bind=cores --mem-bind=local`)) @wait_while maxtime=240 nprocs() < n + 1 resources = worker_resources() diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 210fc59..394098d 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -40,6 +40,7 @@ include("procinit.jl") include("workerpool.jl") include("onworkers.jl") include("runworkers.jl") +include("localhost.jl") include("slurm.jl") include("htcondor.jl") include("deprecated.jl") diff --git a/src/htcondor.jl b/src/htcondor.jl index 4d1618f..1423754 100644 --- a/src/htcondor.jl +++ b/src/htcondor.jl @@ -1,7 +1,7 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). """ - HTCondorRun(; + OnHTCondor(; n::Int = 1 condor_flags::Cmd = _default_condor_flags() condor_settings::Dict{String,String} = Dict{String,String}() @@ -19,7 +19,7 @@ Condor submit script and steering `.sh` files are stored in `jobfile_dir`. Example: ```julia-repl -julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB")) +julia> runmode = OnHTCondor(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB")) task = runworkers(runmode) julia> runworkers(runmode) @@ -39,7 +39,7 @@ Workers can also be started manually, use [`worker_start_command(runmode)`](@ref) to get the `condor_submit` start command and run it from a separate process or so. """ -@with_kw struct HTCondorRun <: DynamicAddProcsMode +@with_kw struct OnHTCondor <: DynamicAddProcsMode n::Int = 1 condor_flags::Cmd = _default_condor_flags() condor_settings::Dict{String,String} = Dict{String,String}() @@ -49,12 +49,14 @@ run it from a separate process or so. env::Dict{String,String} = Dict{String,String}() redirect_output::Bool = true end -export HTCondorRun +export OnHTCondor + +@deprecate HTCondorRun OnHTCondor _default_condor_flags() = `` const _g_condor_nextjlstep = Base.Threads.Atomic{Int}(1) -function worker_start_command(runmode::HTCondorRun, manager::ElasticManager) +function worker_start_command(runmode::OnHTCondor, manager::ElasticManager) flags = runmode.condor_flags n_workers = runmode.n temp_name = tempname(runmode.jobfile_dir) @@ -66,7 +68,7 @@ function worker_start_command(runmode::HTCondorRun, manager::ElasticManager) return `condor_submit $flags $submit_file_path`, 1, n_workers end -function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager::ElasticManager) +function _generate_condor_worker_script(filename, runmode::OnHTCondor, manager::ElasticManager) julia_flags = runmode.julia_flags request_memory = get(runmode.condor_settings, "request_memory", "2GB") @@ -94,7 +96,7 @@ function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager: end end -function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::HTCondorRun) +function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::OnHTCondor) jlstep = atomic_add!(_g_condor_nextjlstep, 1) jobname = "julia-$(getpid())-$jlstep" default_dict = Dict( @@ -117,7 +119,7 @@ function _generate_condor_submit_file(submit_file_path, worker_script_path, runm end end -function runworkers(runmode::HTCondorRun, manager::ElasticManager) +function runworkers(runmode::OnHTCondor, manager::ElasticManager) run_cmd, m, n = worker_start_command(runmode, manager) @info "Submitting HTCondor job: $run_cmd" process = run(run_cmd) diff --git a/src/localhost.jl b/src/localhost.jl new file mode 100644 index 0000000..a733905 --- /dev/null +++ b/src/localhost.jl @@ -0,0 +1,87 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + OnLocalhost(; + n::Integer = 1 + env::Dict{String,String} = Dict{String,String}() + julia_flags::Cmd = _default_julia_flags() + dir = pwd() + ) isa DynamicAddProcsMode + +Mode that runs `n` worker processes on the current host. + +Example: + +```julia +runmode = OnLocalhost(n = 4) +task, n = runworkers(runmode) + +Threads.@async begin + wait(task) + @info "SLURM workers have terminated." +end + +@wait_while nprocs()-1 < n) +``` + +Workers can also be started manually, use +[`worker_start_command(runmode)`](@ref) to get the system (shell) command and +run it from a separate process or so. +""" +@with_kw struct OnLocalhost <: DynamicAddProcsMode + n::Int + julia_flags::Cmd = _default_julia_flags() + dir = pwd() + env::Dict{String,String} = Dict{String,String}() +end +export OnLocalhost + +function worker_start_command(runmode::OnLocalhost, manager::ElasticManager) + julia_flags = runmode.julia_flags + dir = runmode.dir + + jl_threads_flag = any(occursin.(Ref("--threads"), string.(julia_flags))) ? `` : `--threads=$(nthreads())` + jl_dir_flags = `-e "cd(\"$(dir)\")"` + additional_julia_flags = `$jl_threads_flag $julia_flags $jl_dir_flags` + + worker_cmd = worker_local_startcmd( + manager; + julia_flags = additional_julia_flags, + env = runmode.env + ) + return worker_cmd, runmode.n, runmode.n +end + +function runworkers(runmode::OnLocalhost, manager::ElasticManager) + start_cmd, m, n = worker_start_command(runmode, manager) + + task = Threads.@async begin + processes = Base.Process[] + for _ in 1:m + push!(processes, open(start_cmd)) + end + @wait_while any(isactive, processes) + end + + return task, n +end + + +#= +# ToDo: Add SSHWorkers or similar: + +@with_kw struct SSHWorkers <: RunProcsMode + hosts::Vector{Any} + ssd_flags::Cmd = _default_slurm_flags() + julia_flags::Cmd = _default_julia_flags() + dir = ... + env = ... + tunnel::Bool = false + multiplex::Bool = false + shell::Symbol = :posix + max_parallel::Int = 10 + enable_threaded_blas::Bool = true + topology::Symbol = :all_to_all + lazy_connections::Bool = true +end +=# \ No newline at end of file diff --git a/src/runworkers.jl b/src/runworkers.jl index e45eb7b..da232b6 100644 --- a/src/runworkers.jl +++ b/src/runworkers.jl @@ -297,84 +297,6 @@ function worker_local_startcmd( end -""" - OnLocalhost(; - n::Integer = 1 - env::Dict{String,String} = Dict{String,String}() - ) isa DynamicAddProcsMode - -Mode that runs `n` worker processes on the current host. - -Example: - -```julia -runmode = OnLocalhost(n = 4) -task, n = runworkers(runmode) - -Threads.@async begin - wait(task) - @info "SLURM workers have terminated." -end - -@wait_while nprocs()-1 < n) -``` - -Workers can also be started manually, use -[`worker_start_command(runmode)`](@ref) to get the system (shell) command and -run it from a separate process or so. -""" -@with_kw struct OnLocalhost <: DynamicAddProcsMode - n::Int - env::Dict{String,String} = Dict{String,String}() -end -export OnLocalhost - -function worker_start_command(runmode::OnLocalhost, manager::ElasticManager) - worker_nthreads = nthreads() - julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads` - worker_cmd = worker_local_startcmd( - manager; - julia_flags = julia_flags, - env = runmode.env - ) - return worker_cmd, runmode.n, runmode.n -end - -function runworkers(runmode::OnLocalhost, manager::ElasticManager) - start_cmd, m, n = worker_start_command(runmode, manager) - - task = Threads.@async begin - processes = Base.Process[] - for _ in 1:m - push!(processes, open(start_cmd)) - end - @wait_while any(isactive, processes) - end - - return task, n -end - - -#= -# ToDo: Add SSHWorkers or similar: - -@with_kw struct SSHWorkers <: RunProcsMode - hosts::Vector{Any} - ssd_flags::Cmd = _default_slurm_flags() - julia_flags::Cmd = _default_julia_flags() - dir = ... - env = ... - tunnel::Bool = false - multiplex::Bool = false - shell::Symbol = :posix - max_parallel::Int = 10 - enable_threaded_blas::Bool = true - topology::Symbol = :all_to_all - lazy_connections::Bool = true -end -=# - - """ stopworkers() stopworkers(pid::Int) diff --git a/src/slurm.jl b/src/slurm.jl index e4dad97..1e1e794 100644 --- a/src/slurm.jl +++ b/src/slurm.jl @@ -1,7 +1,7 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). """ - SlurmRun(; + OnSlurm(; slurm_flags::Cmd = {defaults} julia_flags::Cmd = {defaults} dir = pwd() @@ -20,7 +20,7 @@ Workers are started with current directory set to `dir`. Example: ```julia -runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) +runmode = OnSlurm(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) task = runworkers(runmode) Threads.@async begin @@ -35,19 +35,20 @@ Workers can also be started manually, use [`worker_start_command(runmode)`](@ref) to get the `srun` start command and run it from a separate process or so. """ -@with_kw struct SlurmRun <: DynamicAddProcsMode +@with_kw struct OnSlurm <: DynamicAddProcsMode slurm_flags::Cmd = _default_slurm_flags() julia_flags::Cmd = _default_julia_flags() dir = pwd() env::Dict{String,String} = Dict{String,String}() redirect_output::Bool = true end -export SlurmRun +export OnSlurm +@deprecate SlurmRun OnSlurm const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1) -function worker_start_command(runmode::SlurmRun, manager::ElasticManager) +function worker_start_command(runmode::OnSlurm, manager::ElasticManager) slurm_flags = runmode.slurm_flags julia_flags = runmode.julia_flags dir = runmode.dir @@ -99,7 +100,7 @@ function _slurm_mem_per_task(tc::NamedTuple) end -function runworkers(runmode::SlurmRun, manager::ElasticManager) +function runworkers(runmode::OnSlurm, manager::ElasticManager) srun_cmd, m, n = worker_start_command(runmode, manager) @info "Starting SLURM job: $srun_cmd" task = Threads.@async begin