@@ -339,30 +339,39 @@ function init_proc(state, p, log_sink)
339
339
340
340
state. worker_loadavg[p. pid] = (0.0 , 0.0 , 0.0 )
341
341
end
342
- lock (WORKER_MONITOR_LOCK) do
343
- wid = p. pid
344
- if ! haskey (WORKER_MONITOR_TASKS, wid)
345
- t = @async begin
346
- try
347
- # Wait until this connection is terminated
348
- remotecall_fetch (sleep, wid, typemax (UInt64))
349
- catch err
350
- if err isa ProcessExitedException
342
+ if p. pid != 1
343
+ lock (WORKER_MONITOR_LOCK) do
344
+ wid = p. pid
345
+ if ! haskey (WORKER_MONITOR_TASKS, wid)
346
+ t = @async begin
347
+ try
348
+ # Wait until this connection is terminated
349
+ remotecall_fetch (sleep, wid, typemax (UInt64))
350
+ catch err
351
+ # TODO : Report other kinds of errors? IOError, etc.
352
+ # if !(err isa ProcessExitedException)
353
+ # end
354
+ finally
351
355
lock (WORKER_MONITOR_LOCK) do
352
356
d = WORKER_MONITOR_CHANS[wid]
353
357
for uid in keys (d)
354
- put! (d[uid], (wid, OSProc (wid), nothing , (ProcessExitedException (wid), nothing )))
358
+ try
359
+ put! (d[uid], (wid, OSProc (wid), nothing , (ProcessExitedException (wid), nothing )))
360
+ catch
361
+ end
355
362
end
356
363
empty! (d)
357
364
delete! (WORKER_MONITOR_CHANS, wid)
365
+ delete! (WORKER_MONITOR_TASKS, wid)
358
366
end
359
367
end
360
368
end
369
+ errormonitor_tracked (t)
370
+ WORKER_MONITOR_TASKS[wid] = t
371
+ WORKER_MONITOR_CHANS[wid] = Dict {UInt64,RemoteChannel} ()
361
372
end
362
- WORKER_MONITOR_TASKS[wid] = t
363
- WORKER_MONITOR_CHANS[wid] = Dict {UInt64,RemoteChannel} ()
373
+ WORKER_MONITOR_CHANS[wid][state. uid] = state. chan
364
374
end
365
- WORKER_MONITOR_CHANS[wid][state. uid] = state. chan
366
375
end
367
376
368
377
# Setup worker-to-scheduler channels
@@ -379,18 +388,26 @@ function init_proc(state, p, log_sink)
379
388
end
380
389
function _cleanup_proc (uid, log_sink)
381
390
empty! (CHUNK_CACHE) # FIXME : Should be keyed on uid!
391
+ proc_states (uid) do states
392
+ for (proc, state) in states
393
+ istate = state. state
394
+ istate. done[] = true
395
+ notify (istate. reschedule)
396
+ end
397
+ empty! (states)
398
+ end
382
399
end
383
400
function cleanup_proc (state, p, log_sink)
384
401
ctx = Context (Int[]; log_sink)
385
- timespan_start (ctx, :cleanup_proc , p. pid, 0 )
402
+ wid = p. pid
403
+ timespan_start (ctx, :cleanup_proc , wid, 0 )
386
404
lock (WORKER_MONITOR_LOCK) do
387
- wid = p. pid
388
405
if haskey (WORKER_MONITOR_CHANS, wid)
389
406
delete! (WORKER_MONITOR_CHANS[wid], state. uid)
390
- remote_do (_cleanup_proc, wid, state. uid, log_sink)
391
407
end
392
408
end
393
- timespan_finish (ctx, :cleanup_proc , p. pid, 0 )
409
+ remote_do (_cleanup_proc, wid, state. uid, log_sink)
410
+ timespan_finish (ctx, :cleanup_proc , wid, 0 )
394
411
end
395
412
396
413
" Process-local condition variable (and lock) indicating task completion."
@@ -1096,6 +1113,7 @@ struct ProcessorInternalState
1096
1113
tasks:: Dict{Int,Task}
1097
1114
proc_occupancy:: Base.RefValue{UInt32}
1098
1115
time_pressure:: Base.RefValue{UInt64}
1116
+ done:: Base.RefValue{Bool}
1099
1117
end
1100
1118
struct ProcessorState
1101
1119
state:: ProcessorInternalState
@@ -1144,6 +1162,9 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1144
1162
reset (istate. reschedule)
1145
1163
end
1146
1164
timespan_finish (ctx, :proc_run_wait , to_proc, nothing )
1165
+ if istate. done[]
1166
+ return
1167
+ end
1147
1168
end
1148
1169
1149
1170
# Fetch a new task to execute
@@ -1270,7 +1291,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1270
1291
else
1271
1292
t. sticky = false
1272
1293
end
1273
- tasks[thunk_id] = errormonitor (schedule (t))
1294
+ tasks[thunk_id] = errormonitor_tracked (schedule (t))
1274
1295
proc_occupancy[] += task_occupancy
1275
1296
time_pressure[] += time_util
1276
1297
end
@@ -1283,7 +1304,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
1283
1304
else
1284
1305
proc_run_task. sticky = false
1285
1306
end
1286
- return errormonitor (schedule (proc_run_task))
1307
+ return errormonitor_tracked (schedule (proc_run_task))
1287
1308
end
1288
1309
1289
1310
"""
@@ -1307,7 +1328,8 @@ function do_tasks(to_proc, return_queue, tasks)
1307
1328
istate = ProcessorInternalState (ctx, to_proc,
1308
1329
queue_locked, reschedule,
1309
1330
Dict {Int,Task} (),
1310
- Ref (UInt32 (0 )), Ref (UInt64 (0 )))
1331
+ Ref (UInt32 (0 )), Ref (UInt64 (0 )),
1332
+ Ref (false ))
1311
1333
runner = start_processor_runner! (istate, uid, return_queue)
1312
1334
@static if VERSION < v " 1.9"
1313
1335
reschedule. waiter = runner
0 commit comments