Skip to content

Improved OnLocalhost #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This Julia package provides some tools to ease multithreaded and distributed pro

Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.

An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`SlurmRun`](@ref)), or via HTCondor ([`HTCondorRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`OnSlurm`](@ref)), or via HTCondor ([`OnHTCondor`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).

The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy.

Expand Down Expand Up @@ -107,7 +107,7 @@ using ParallelProcessingTools, Distributed
pinthreads_auto()
end

_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
_, n = runworkers(OnSlurm(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
@wait_while maxtime=240 nprocs() < n + 1

resources = worker_resources()
Expand Down
1 change: 1 addition & 0 deletions src/ParallelProcessingTools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ include("procinit.jl")
include("workerpool.jl")
include("onworkers.jl")
include("runworkers.jl")
include("localhost.jl")
include("slurm.jl")
include("htcondor.jl")
include("deprecated.jl")
Expand Down
18 changes: 10 additions & 8 deletions src/htcondor.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).

"""
HTCondorRun(;
OnHTCondor(;
n::Int = 1
condor_flags::Cmd = _default_condor_flags()
condor_settings::Dict{String,String} = Dict{String,String}()
Expand All @@ -19,7 +19,7 @@ Condor submit script and steering `.sh` files are stored in `jobfile_dir`.
Example:

```julia-repl
julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
julia> runmode = OnHTCondor(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
task = runworkers(runmode)

julia> runworkers(runmode)
Expand All @@ -39,7 +39,7 @@ Workers can also be started manually, use
[`worker_start_command(runmode)`](@ref) to get the `condor_submit` start command and
run it from a separate process or so.
"""
@with_kw struct HTCondorRun <: DynamicAddProcsMode
@with_kw struct OnHTCondor <: DynamicAddProcsMode
n::Int = 1
condor_flags::Cmd = _default_condor_flags()
condor_settings::Dict{String,String} = Dict{String,String}()
Expand All @@ -49,12 +49,14 @@ run it from a separate process or so.
env::Dict{String,String} = Dict{String,String}()
redirect_output::Bool = true
end
export HTCondorRun
export OnHTCondor

@deprecate HTCondorRun OnHTCondor

_default_condor_flags() = ``
const _g_condor_nextjlstep = Base.Threads.Atomic{Int}(1)

function worker_start_command(runmode::HTCondorRun, manager::ElasticManager)
function worker_start_command(runmode::OnHTCondor, manager::ElasticManager)
flags = runmode.condor_flags
n_workers = runmode.n
temp_name = tempname(runmode.jobfile_dir)
Expand All @@ -66,7 +68,7 @@ function worker_start_command(runmode::HTCondorRun, manager::ElasticManager)
return `condor_submit $flags $submit_file_path`, 1, n_workers
end

function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager::ElasticManager)
function _generate_condor_worker_script(filename, runmode::OnHTCondor, manager::ElasticManager)
julia_flags = runmode.julia_flags

request_memory = get(runmode.condor_settings, "request_memory", "2GB")
Expand Down Expand Up @@ -94,7 +96,7 @@ function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager:
end
end

function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::HTCondorRun)
function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::OnHTCondor)
jlstep = atomic_add!(_g_condor_nextjlstep, 1)
jobname = "julia-$(getpid())-$jlstep"
default_dict = Dict(
Expand All @@ -117,7 +119,7 @@ function _generate_condor_submit_file(submit_file_path, worker_script_path, runm
end
end

function runworkers(runmode::HTCondorRun, manager::ElasticManager)
function runworkers(runmode::OnHTCondor, manager::ElasticManager)
run_cmd, m, n = worker_start_command(runmode, manager)
@info "Submitting HTCondor job: $run_cmd"
process = run(run_cmd)
Expand Down
87 changes: 87 additions & 0 deletions src/localhost.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).

"""
OnLocalhost(;
n::Integer = 1
env::Dict{String,String} = Dict{String,String}()
julia_flags::Cmd = _default_julia_flags()
dir = pwd()
) isa DynamicAddProcsMode

Mode that runs `n` worker processes on the current host.

Example:

```julia
runmode = OnLocalhost(n = 4)
task, n = runworkers(runmode)

Threads.@async begin
wait(task)
@info "SLURM workers have terminated."
end

@wait_while nprocs()-1 < n)
```

Workers can also be started manually, use
[`worker_start_command(runmode)`](@ref) to get the system (shell) command and
run it from a separate process or so.
"""
@with_kw struct OnLocalhost <: DynamicAddProcsMode
n::Int
julia_flags::Cmd = _default_julia_flags()
dir = pwd()
env::Dict{String,String} = Dict{String,String}()
end
export OnLocalhost

function worker_start_command(runmode::OnLocalhost, manager::ElasticManager)
julia_flags = runmode.julia_flags
dir = runmode.dir

jl_threads_flag = any(occursin.(Ref("--threads"), string.(julia_flags))) ? `` : `--threads=$(nthreads())`
jl_dir_flags = `-e "cd(\"$(dir)\")"`
additional_julia_flags = `$jl_threads_flag $julia_flags $jl_dir_flags`

worker_cmd = worker_local_startcmd(
manager;
julia_flags = additional_julia_flags,
env = runmode.env
)
return worker_cmd, runmode.n, runmode.n
end

function runworkers(runmode::OnLocalhost, manager::ElasticManager)
start_cmd, m, n = worker_start_command(runmode, manager)

task = Threads.@async begin
processes = Base.Process[]
for _ in 1:m
push!(processes, open(start_cmd))
end
@wait_while any(isactive, processes)
end

return task, n
end


#=
# ToDo: Add SSHWorkers or similar:

@with_kw struct SSHWorkers <: RunProcsMode
hosts::Vector{Any}
ssd_flags::Cmd = _default_slurm_flags()
julia_flags::Cmd = _default_julia_flags()
dir = ...
env = ...
tunnel::Bool = false
multiplex::Bool = false
shell::Symbol = :posix
max_parallel::Int = 10
enable_threaded_blas::Bool = true
topology::Symbol = :all_to_all
lazy_connections::Bool = true
end
=#
78 changes: 0 additions & 78 deletions src/runworkers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -297,84 +297,6 @@ function worker_local_startcmd(
end


"""
OnLocalhost(;
n::Integer = 1
env::Dict{String,String} = Dict{String,String}()
) isa DynamicAddProcsMode

Mode that runs `n` worker processes on the current host.

Example:

```julia
runmode = OnLocalhost(n = 4)
task, n = runworkers(runmode)

Threads.@async begin
wait(task)
@info "SLURM workers have terminated."
end

@wait_while nprocs()-1 < n)
```

Workers can also be started manually, use
[`worker_start_command(runmode)`](@ref) to get the system (shell) command and
run it from a separate process or so.
"""
@with_kw struct OnLocalhost <: DynamicAddProcsMode
n::Int
env::Dict{String,String} = Dict{String,String}()
end
export OnLocalhost

function worker_start_command(runmode::OnLocalhost, manager::ElasticManager)
worker_nthreads = nthreads()
julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads`
worker_cmd = worker_local_startcmd(
manager;
julia_flags = julia_flags,
env = runmode.env
)
return worker_cmd, runmode.n, runmode.n
end

function runworkers(runmode::OnLocalhost, manager::ElasticManager)
start_cmd, m, n = worker_start_command(runmode, manager)

task = Threads.@async begin
processes = Base.Process[]
for _ in 1:m
push!(processes, open(start_cmd))
end
@wait_while any(isactive, processes)
end

return task, n
end


#=
# ToDo: Add SSHWorkers or similar:

@with_kw struct SSHWorkers <: RunProcsMode
hosts::Vector{Any}
ssd_flags::Cmd = _default_slurm_flags()
julia_flags::Cmd = _default_julia_flags()
dir = ...
env = ...
tunnel::Bool = false
multiplex::Bool = false
shell::Symbol = :posix
max_parallel::Int = 10
enable_threaded_blas::Bool = true
topology::Symbol = :all_to_all
lazy_connections::Bool = true
end
=#


"""
stopworkers()
stopworkers(pid::Int)
Expand Down
13 changes: 7 additions & 6 deletions src/slurm.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).

"""
SlurmRun(;
OnSlurm(;
slurm_flags::Cmd = {defaults}
julia_flags::Cmd = {defaults}
dir = pwd()
Expand All @@ -20,7 +20,7 @@ Workers are started with current directory set to `dir`.
Example:

```julia
runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
runmode = OnSlurm(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
task = runworkers(runmode)

Threads.@async begin
Expand All @@ -35,19 +35,20 @@ Workers can also be started manually, use
[`worker_start_command(runmode)`](@ref) to get the `srun` start command and
run it from a separate process or so.
"""
@with_kw struct SlurmRun <: DynamicAddProcsMode
@with_kw struct OnSlurm <: DynamicAddProcsMode
slurm_flags::Cmd = _default_slurm_flags()
julia_flags::Cmd = _default_julia_flags()
dir = pwd()
env::Dict{String,String} = Dict{String,String}()
redirect_output::Bool = true
end
export SlurmRun
export OnSlurm

@deprecate SlurmRun OnSlurm

const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1)

function worker_start_command(runmode::SlurmRun, manager::ElasticManager)
function worker_start_command(runmode::OnSlurm, manager::ElasticManager)
slurm_flags = runmode.slurm_flags
julia_flags = runmode.julia_flags
dir = runmode.dir
Expand Down Expand Up @@ -99,7 +100,7 @@ function _slurm_mem_per_task(tc::NamedTuple)
end


function runworkers(runmode::SlurmRun, manager::ElasticManager)
function runworkers(runmode::OnSlurm, manager::ElasticManager)
srun_cmd, m, n = worker_start_command(runmode, manager)
@info "Starting SLURM job: $srun_cmd"
task = Threads.@async begin
Expand Down
Loading