Skip to content

Commit fd90564

Browse files
authored
Improve OnLocalhost, harmonize run-mode names. (#25)
1 parent e7e9da7 commit fd90564

File tree

6 files changed

+108
-94
lines changed

6 files changed

+108
-94
lines changed

docs/src/index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ This Julia package provides some tools to ease multithreaded and distributed pro
77

88
Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier.
99

10-
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`SlurmRun`](@ref)), or via HTCondor ([`HTCondorRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).
10+
An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)), via SLURM ([`OnSlurm`](@ref)), or via HTCondor ([`OnHTCondor`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome).
1111

1212
The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy.
1313

@@ -107,7 +107,7 @@ using ParallelProcessingTools, Distributed
107107
pinthreads_auto()
108108
end
109109

110-
_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
110+
_, n = runworkers(OnSlurm(slurm_flags = `--cpu-bind=cores --mem-bind=local`))
111111
@wait_while maxtime=240 nprocs() < n + 1
112112

113113
resources = worker_resources()

src/ParallelProcessingTools.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ include("procinit.jl")
4040
include("workerpool.jl")
4141
include("onworkers.jl")
4242
include("runworkers.jl")
43+
include("localhost.jl")
4344
include("slurm.jl")
4445
include("htcondor.jl")
4546
include("deprecated.jl")

src/htcondor.jl

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
22

33
"""
4-
HTCondorRun(;
4+
OnHTCondor(;
55
n::Int = 1
66
condor_flags::Cmd = _default_condor_flags()
77
condor_settings::Dict{String,String} = Dict{String,String}()
@@ -19,7 +19,7 @@ Condor submit script and steering `.sh` files are stored in `jobfile_dir`.
1919
Example:
2020
2121
```julia-repl
22-
julia> runmode = HTCondorRun(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
22+
julia> runmode = OnHTCondor(n = 10; condor_settings=Dict("universe" => "vanilla", "+queue" => "short", "request_memory" => "4GB"))
2323
task = runworkers(runmode)
2424
2525
julia> runworkers(runmode)
@@ -39,7 +39,7 @@ Workers can also be started manually, use
3939
[`worker_start_command(runmode)`](@ref) to get the `condor_submit` start command and
4040
run it from a separate process or so.
4141
"""
42-
@with_kw struct HTCondorRun <: DynamicAddProcsMode
42+
@with_kw struct OnHTCondor <: DynamicAddProcsMode
4343
n::Int = 1
4444
condor_flags::Cmd = _default_condor_flags()
4545
condor_settings::Dict{String,String} = Dict{String,String}()
@@ -49,12 +49,14 @@ run it from a separate process or so.
4949
env::Dict{String,String} = Dict{String,String}()
5050
redirect_output::Bool = true
5151
end
52-
export HTCondorRun
52+
export OnHTCondor
53+
54+
@deprecate HTCondorRun OnHTCondor
5355

5456
_default_condor_flags() = ``
5557
const _g_condor_nextjlstep = Base.Threads.Atomic{Int}(1)
5658

57-
function worker_start_command(runmode::HTCondorRun, manager::ElasticManager)
59+
function worker_start_command(runmode::OnHTCondor, manager::ElasticManager)
5860
flags = runmode.condor_flags
5961
n_workers = runmode.n
6062
temp_name = tempname(runmode.jobfile_dir)
@@ -66,7 +68,7 @@ function worker_start_command(runmode::HTCondorRun, manager::ElasticManager)
6668
return `condor_submit $flags $submit_file_path`, 1, n_workers
6769
end
6870

69-
function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager::ElasticManager)
71+
function _generate_condor_worker_script(filename, runmode::OnHTCondor, manager::ElasticManager)
7072
julia_flags = runmode.julia_flags
7173

7274
request_memory = get(runmode.condor_settings, "request_memory", "2GB")
@@ -94,7 +96,7 @@ function _generate_condor_worker_script(filename, runmode::HTCondorRun, manager:
9496
end
9597
end
9698

97-
function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::HTCondorRun)
99+
function _generate_condor_submit_file(submit_file_path, worker_script_path, runmode::OnHTCondor)
98100
jlstep = atomic_add!(_g_condor_nextjlstep, 1)
99101
jobname = "julia-$(getpid())-$jlstep"
100102
default_dict = Dict(
@@ -117,7 +119,7 @@ function _generate_condor_submit_file(submit_file_path, worker_script_path, runm
117119
end
118120
end
119121

120-
function runworkers(runmode::HTCondorRun, manager::ElasticManager)
122+
function runworkers(runmode::OnHTCondor, manager::ElasticManager)
121123
run_cmd, m, n = worker_start_command(runmode, manager)
122124
@info "Submitting HTCondor job: $run_cmd"
123125
process = run(run_cmd)

src/localhost.jl

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
2+
3+
"""
4+
OnLocalhost(;
5+
n::Integer = 1
6+
env::Dict{String,String} = Dict{String,String}()
7+
julia_flags::Cmd = _default_julia_flags()
8+
dir = pwd()
9+
) isa DynamicAddProcsMode
10+
11+
Mode that runs `n` worker processes on the current host.
12+
13+
Example:
14+
15+
```julia
16+
runmode = OnLocalhost(n = 4)
17+
task, n = runworkers(runmode)
18+
19+
Threads.@async begin
20+
wait(task)
21+
@info "SLURM workers have terminated."
22+
end
23+
24+
@wait_while nprocs()-1 < n)
25+
```
26+
27+
Workers can also be started manually, use
28+
[`worker_start_command(runmode)`](@ref) to get the system (shell) command and
29+
run it from a separate process or so.
30+
"""
31+
@with_kw struct OnLocalhost <: DynamicAddProcsMode
32+
n::Int
33+
julia_flags::Cmd = _default_julia_flags()
34+
dir = pwd()
35+
env::Dict{String,String} = Dict{String,String}()
36+
end
37+
export OnLocalhost
38+
39+
function worker_start_command(runmode::OnLocalhost, manager::ElasticManager)
40+
julia_flags = runmode.julia_flags
41+
# Doesn't run on Windows otherwise:
42+
dir = replace(runmode.dir, '\\' => '/')
43+
44+
jl_threads_flag = any(occursin.(Ref("--threads"), string.(julia_flags))) ? `` : `--threads=$(nthreads())`
45+
jl_dir_flags = `-e "cd(\"$(dir)\")"`
46+
additional_julia_flags = `$jl_threads_flag $julia_flags $jl_dir_flags`
47+
48+
worker_cmd = worker_local_startcmd(
49+
manager;
50+
julia_flags = additional_julia_flags,
51+
env = runmode.env
52+
)
53+
return worker_cmd, runmode.n, runmode.n
54+
end
55+
56+
function runworkers(runmode::OnLocalhost, manager::ElasticManager)
57+
start_cmd, m, n = worker_start_command(runmode, manager)
58+
59+
task = Threads.@async begin
60+
processes = Base.Process[]
61+
for _ in 1:m
62+
push!(processes, open(start_cmd))
63+
end
64+
@wait_while any(isactive, processes)
65+
end
66+
67+
return task, n
68+
end
69+
70+
71+
#=
72+
# ToDo: Add SSHWorkers or similar:
73+
74+
@with_kw struct SSHWorkers <: RunProcsMode
75+
hosts::Vector{Any}
76+
ssd_flags::Cmd = _default_slurm_flags()
77+
julia_flags::Cmd = _default_julia_flags()
78+
dir = ...
79+
env = ...
80+
tunnel::Bool = false
81+
multiplex::Bool = false
82+
shell::Symbol = :posix
83+
max_parallel::Int = 10
84+
enable_threaded_blas::Bool = true
85+
topology::Symbol = :all_to_all
86+
lazy_connections::Bool = true
87+
end
88+
=#

src/runworkers.jl

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -297,84 +297,6 @@ function worker_local_startcmd(
297297
end
298298

299299

300-
"""
301-
OnLocalhost(;
302-
n::Integer = 1
303-
env::Dict{String,String} = Dict{String,String}()
304-
) isa DynamicAddProcsMode
305-
306-
Mode that runs `n` worker processes on the current host.
307-
308-
Example:
309-
310-
```julia
311-
runmode = OnLocalhost(n = 4)
312-
task, n = runworkers(runmode)
313-
314-
Threads.@async begin
315-
wait(task)
316-
@info "SLURM workers have terminated."
317-
end
318-
319-
@wait_while nprocs()-1 < n)
320-
```
321-
322-
Workers can also be started manually, use
323-
[`worker_start_command(runmode)`](@ref) to get the system (shell) command and
324-
run it from a separate process or so.
325-
"""
326-
@with_kw struct OnLocalhost <: DynamicAddProcsMode
327-
n::Int
328-
env::Dict{String,String} = Dict{String,String}()
329-
end
330-
export OnLocalhost
331-
332-
function worker_start_command(runmode::OnLocalhost, manager::ElasticManager)
333-
worker_nthreads = nthreads()
334-
julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads`
335-
worker_cmd = worker_local_startcmd(
336-
manager;
337-
julia_flags = julia_flags,
338-
env = runmode.env
339-
)
340-
return worker_cmd, runmode.n, runmode.n
341-
end
342-
343-
function runworkers(runmode::OnLocalhost, manager::ElasticManager)
344-
start_cmd, m, n = worker_start_command(runmode, manager)
345-
346-
task = Threads.@async begin
347-
processes = Base.Process[]
348-
for _ in 1:m
349-
push!(processes, open(start_cmd))
350-
end
351-
@wait_while any(isactive, processes)
352-
end
353-
354-
return task, n
355-
end
356-
357-
358-
#=
359-
# ToDo: Add SSHWorkers or similar:
360-
361-
@with_kw struct SSHWorkers <: RunProcsMode
362-
hosts::Vector{Any}
363-
ssd_flags::Cmd = _default_slurm_flags()
364-
julia_flags::Cmd = _default_julia_flags()
365-
dir = ...
366-
env = ...
367-
tunnel::Bool = false
368-
multiplex::Bool = false
369-
shell::Symbol = :posix
370-
max_parallel::Int = 10
371-
enable_threaded_blas::Bool = true
372-
topology::Symbol = :all_to_all
373-
lazy_connections::Bool = true
374-
end
375-
=#
376-
377-
378300
"""
379301
stopworkers()
380302
stopworkers(pid::Int)

src/slurm.jl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
22

33
"""
4-
SlurmRun(;
4+
OnSlurm(;
55
slurm_flags::Cmd = {defaults}
66
julia_flags::Cmd = {defaults}
77
dir = pwd()
@@ -20,7 +20,7 @@ Workers are started with current directory set to `dir`.
2020
Example:
2121
2222
```julia
23-
runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
23+
runmode = OnSlurm(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
2424
task = runworkers(runmode)
2525
2626
Threads.@async begin
@@ -35,19 +35,20 @@ Workers can also be started manually, use
3535
[`worker_start_command(runmode)`](@ref) to get the `srun` start command and
3636
run it from a separate process or so.
3737
"""
38-
@with_kw struct SlurmRun <: DynamicAddProcsMode
38+
@with_kw struct OnSlurm <: DynamicAddProcsMode
3939
slurm_flags::Cmd = _default_slurm_flags()
4040
julia_flags::Cmd = _default_julia_flags()
4141
dir = pwd()
4242
env::Dict{String,String} = Dict{String,String}()
4343
redirect_output::Bool = true
4444
end
45-
export SlurmRun
45+
export OnSlurm
4646

47+
@deprecate SlurmRun OnSlurm
4748

4849
const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1)
4950

50-
function worker_start_command(runmode::SlurmRun, manager::ElasticManager)
51+
function worker_start_command(runmode::OnSlurm, manager::ElasticManager)
5152
slurm_flags = runmode.slurm_flags
5253
julia_flags = runmode.julia_flags
5354
dir = runmode.dir
@@ -99,7 +100,7 @@ function _slurm_mem_per_task(tc::NamedTuple)
99100
end
100101

101102

102-
function runworkers(runmode::SlurmRun, manager::ElasticManager)
103+
function runworkers(runmode::OnSlurm, manager::ElasticManager)
103104
srun_cmd, m, n = worker_start_command(runmode, manager)
104105
@info "Starting SLURM job: $srun_cmd"
105106
task = Threads.@async begin

0 commit comments

Comments
 (0)