Skip to content

Commit 3f0c358

Browse files
committed
Apply improvements from package ElasticClusterManager to CustomClusterManagers
1 parent 809eed2 commit 3f0c358

File tree

1 file changed

+33
-7
lines changed

1 file changed

+33
-7
lines changed

src/custom_cluster_managers.jl

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,22 @@ import Pkg
1414
using Distributed: launch, manage, kill, init_worker, connect
1515
# ==================================================================
1616

17+
export ElasticManager, elastic_worker
18+
1719

1820
# The master process listens on a well-known port
1921
# Launched workers connect to the master and redirect their STDOUTs to the same
2022
# Workers can join and leave the cluster on demand.
2123

22-
export ElasticManager, elastic_worker
23-
2424
const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN
2525

26+
@static if Base.VERSION >= v"1.7-"
27+
# Base.errormonitor() is only available in Julia 1.7+
28+
my_errormonitor(t) = Base.errormonitor(t)
29+
else
30+
my_errormonitor(t) = nothing
31+
end
32+
2633
struct ElasticManager <: Distributed.ClusterManager
2734
active::Dict{Int, Distributed.WorkerConfig} # active workers
2835
pending::Channel{Sockets.TCPSocket} # to be added workers
@@ -47,20 +54,23 @@ struct ElasticManager <: Distributed.ClusterManager
4754
error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.")
4855
end
4956
end
50-
57+
5158
l_sock = Distributed.listen(addr, port)
5259

5360
lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback, printing_kwargs)
5461

55-
@async begin
62+
t1 = @async begin
5663
while true
5764
let s = Sockets.accept(l_sock)
58-
@async process_worker_conn(lman, s)
65+
t2 = @async process_worker_conn(lman, s)
66+
my_errormonitor(t2)
5967
end
6068
end
6169
end
70+
my_errormonitor(t1)
6271

63-
@async process_pending_connections(lman)
72+
t3 = @async process_pending_connections(lman)
73+
my_errormonitor(t3)
6474

6575
lman
6676
end
@@ -153,7 +163,7 @@ function Base.show(io::IO, mgr::ElasticManager)
153163

154164
println(iob, " Worker connect command : ")
155165
print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...))
156-
166+
157167
print(io, String(take!(iob)))
158168
end
159169

@@ -176,5 +186,21 @@ function elastic_worker(
176186
Distributed.start_worker(c, cookie)
177187
end
178188

189+
function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=())
190+
ip = string(em.sockname[1])
191+
port = convert(Int,em.sockname[2])
192+
cookie = Distributed.cluster_cookie()
193+
exename = absolute_exename ? joinpath(Sys.BINDIR, Base.julia_exename()) : "julia"
194+
project = same_project ? ("--project=$(Pkg.API.Context().env.project_file)",) : ()
195+
196+
join([
197+
exename,
198+
exeflags...,
199+
project...,
200+
"-e 'import ElasticClusterManager; ElasticClusterManager.elastic_worker(\"$cookie\",\"$ip\",$port)'"
201+
]," ")
202+
203+
end
204+
179205

180206
end # module CustomClusterManagers

0 commit comments

Comments
 (0)