Skip to content

Commit 2b4f573

Browse files
authored
Cleanup exit messages on Task.async_stream (#5612)
1 parent 7cf4369 commit 2b4f573

File tree

4 files changed

+108
-70
lines changed

4 files changed

+108
-70
lines changed

lib/elixir/lib/task.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,10 @@ defmodule Task do
287287
concurrently on each item in `enumerable`.
288288
289289
Each item will be appended to the given `args` and processed by its
290-
own task. The tasks will be linked to the current process similar to
291-
`async/3`.
290+
own task. The tasks will be linked to an intermediate process that is
291+
then linked to the current process. This means a failure in a task
292+
terminates the current process and a failure in the current process
293+
terminates all tasks.
292294
293295
When streamed, each task will emit `{:ok, val}` upon successful
294296
completion or `{:exit, val}` if the caller is trapping exits. Results

lib/elixir/lib/task/supervised.ex

Lines changed: 74 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -149,99 +149,103 @@ defmodule Task.Supervised do
149149
parent = self()
150150

151151
# Start a process responsible for translating down messages.
152-
{monitor_pid, monitor_ref} = spawn_monitor(fn -> stream_monitor(parent) end)
152+
{:trap_exit, trap_exit} =
153+
Process.info(self(), :trap_exit)
154+
{monitor_pid, monitor_ref} =
155+
Process.spawn(fn -> stream_monitor(parent, mfa, spawn, trap_exit) end, [:link, :monitor])
156+
153157
send(monitor_pid, {parent, monitor_ref})
154158

155159
stream_reduce(acc, max_concurrency, 0, 0, %{}, next,
156-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
160+
reducer, monitor_pid, monitor_ref, timeout)
157161
end
158162

159-
defp stream_reduce({:halt, acc}, _max, _spawned, _delivered, waiting, next,
160-
_reducer, _mfa, _spawn, monitor_pid, monitor_ref, timeout) do
163+
defp stream_reduce({:halt, acc}, _max, _spawned, _delivered, _waiting, next,
164+
_reducer, monitor_pid, monitor_ref, timeout) do
165+
stream_close(monitor_pid, monitor_ref, timeout)
161166
is_function(next) && next.({:halt, []})
162-
stream_close(waiting, monitor_pid, monitor_ref, timeout)
163167
{:halted, acc}
164168
end
165169

166170
defp stream_reduce({:suspend, acc}, max, spawned, delivered, waiting, next,
167-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
171+
reducer, monitor_pid, monitor_ref, timeout) do
168172
{:suspended, acc, &stream_reduce(&1, max, spawned, delivered, waiting, next,
169-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)}
173+
reducer, monitor_pid, monitor_ref, timeout)}
170174
end
171175

172176
# All spawned, all delivered, next is done.
173-
defp stream_reduce({:cont, acc}, _max, spawned, delivered, waiting, next,
174-
_reducer, _mfa, _spawn, monitor_pid, monitor_ref, timeout)
177+
defp stream_reduce({:cont, acc}, _max, spawned, delivered, _waiting, next,
178+
_reducer, monitor_pid, monitor_ref, timeout)
175179
when spawned == delivered and next == :done do
176-
stream_close(waiting, monitor_pid, monitor_ref, timeout)
180+
stream_close(monitor_pid, monitor_ref, timeout)
177181
{:done, acc}
178182
end
179183

180-
# No more tasks to spawn because max == 0 or next is done.
184+
# No more tasks to spawned because max == 0 or next is done.
181185
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
182-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
186+
reducer, monitor_pid, monitor_ref, timeout)
183187
when max == 0
184188
when next == :done do
185189
receive do
186190
{{^monitor_ref, position}, value} ->
187191
%{^position => {pid, :running}} = waiting
188192
waiting = Map.put(waiting, position, {pid, {:ok, value}})
189193
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
190-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
194+
reducer, monitor_pid, monitor_ref, timeout)
191195
{:down, {^monitor_ref, position}, reason} ->
192196
waiting =
193197
case waiting do
194198
%{^position => {_, {:ok, _} = ok}} -> Map.put(waiting, position, {nil, ok})
195199
%{^position => {_, :running}} -> Map.put(waiting, position, {nil, {:exit, reason}})
196200
end
197201
stream_deliver({:cont, acc}, max + 1, spawned, delivered, waiting, next,
198-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
202+
reducer, monitor_pid, monitor_ref, timeout)
199203
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
200-
stream_cleanup_inbox(monitor_ref)
204+
stream_cleanup_inbox(monitor_pid, monitor_ref)
201205
exit({reason, {__MODULE__, :stream, [timeout]}})
202206
after
203207
timeout ->
204-
stream_close(waiting, monitor_pid, monitor_ref, timeout)
208+
stream_close(monitor_pid, monitor_ref, timeout)
205209
exit({:timeout, {__MODULE__, :stream, [timeout]}})
206210
end
207211
end
208212

209213
defp stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
210-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
214+
reducer, monitor_pid, monitor_ref, timeout) do
211215
try do
212216
next.({:cont, []})
213217
catch
214218
kind, reason ->
215219
stacktrace = System.stacktrace
216-
stream_close(waiting, monitor_pid, monitor_ref, timeout)
220+
stream_close(monitor_pid, monitor_ref, timeout)
217221
:erlang.raise(kind, reason, stacktrace)
218222
else
219223
{:suspended, [value], next} ->
220-
waiting = stream_spawn(value, spawned, waiting, mfa, spawn, monitor_pid, monitor_ref)
224+
waiting = stream_spawn(value, spawned, waiting, monitor_pid, monitor_ref, timeout)
221225
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next,
222-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
226+
reducer, monitor_pid, monitor_ref, timeout)
223227
{_, [value]} ->
224-
waiting = stream_spawn(value, spawned, waiting, mfa, spawn, monitor_pid, monitor_ref)
228+
waiting = stream_spawn(value, spawned, waiting, monitor_pid, monitor_ref, timeout)
225229
stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done,
226-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
230+
reducer, monitor_pid, monitor_ref, timeout)
227231
{_, []} ->
228232
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done,
229-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
233+
reducer, monitor_pid, monitor_ref, timeout)
230234
end
231235
end
232236

233237
defp stream_deliver({:suspend, acc}, max, spawned, delivered, waiting, next,
234-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
238+
reducer, monitor_pid, monitor_ref, timeout) do
235239
{:suspended, acc, &stream_deliver(&1, max, spawned, delivered, waiting, next,
236-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)}
240+
reducer, monitor_pid, monitor_ref, timeout)}
237241
end
238242
defp stream_deliver({:halt, acc}, max, spawned, delivered, waiting, next,
239-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
243+
reducer, monitor_pid, monitor_ref, timeout) do
240244
stream_reduce({:halt, acc}, max, spawned, delivered, waiting, next,
241-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
245+
reducer, monitor_pid, monitor_ref, timeout)
242246
end
243247
defp stream_deliver({:cont, acc}, max, spawned, delivered, waiting, next,
244-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout) do
248+
reducer, monitor_pid, monitor_ref, timeout) do
245249
case waiting do
246250
%{^delivered => {nil, reply}} ->
247251
try do
@@ -250,38 +254,44 @@ defmodule Task.Supervised do
250254
kind, reason ->
251255
stacktrace = System.stacktrace
252256
is_function(next) && next.({:halt, []})
253-
stream_close(waiting, monitor_pid, monitor_ref, timeout)
257+
stream_close(monitor_pid, monitor_ref, timeout)
254258
:erlang.raise(kind, reason, stacktrace)
255259
else
256260
pair ->
257261
stream_deliver(pair, max, spawned, delivered + 1, Map.delete(waiting, delivered), next,
258-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
262+
reducer, monitor_pid, monitor_ref, timeout)
259263
end
260264
%{} ->
261265
stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next,
262-
reducer, mfa, spawn, monitor_pid, monitor_ref, timeout)
266+
reducer, monitor_pid, monitor_ref, timeout)
263267
end
264268
end
265269

266-
defp stream_close(waiting, monitor_pid, monitor_ref, timeout) do
267-
for {_, {pid, _}} <- waiting do
268-
pid && Process.unlink(pid)
269-
end
270-
send(monitor_pid, {:down, monitor_ref})
270+
defp stream_close(monitor_pid, monitor_ref, timeout) do
271+
send(monitor_pid, {:stop, monitor_ref})
271272
receive do
272-
{:DOWN, ^monitor_ref, _, _, {:shutdown, ^monitor_ref}} ->
273+
{:DOWN, ^monitor_ref, _, _, :normal} ->
274+
stream_cleanup_inbox(monitor_pid, monitor_ref)
273275
:ok
274276
{:DOWN, ^monitor_ref, _, _, reason} ->
277+
stream_cleanup_inbox(monitor_pid, monitor_ref)
275278
exit({reason, {__MODULE__, :stream, [timeout]}})
276279
end
277-
stream_cleanup_inbox(monitor_ref)
280+
end
281+
282+
defp stream_cleanup_inbox(monitor_pid, monitor_ref) do
283+
receive do
284+
{:EXIT, ^monitor_pid, _} -> stream_cleanup_inbox(monitor_ref)
285+
after
286+
0 -> stream_cleanup_inbox(monitor_ref)
287+
end
278288
end
279289

280290
defp stream_cleanup_inbox(monitor_ref) do
281291
receive do
282292
{{^monitor_ref, _}, _} ->
283293
stream_cleanup_inbox(monitor_ref)
284-
{:DOWN, {^monitor_ref, _}, _} ->
294+
{:down, {^monitor_ref, _}, _} ->
285295
stream_cleanup_inbox(monitor_ref)
286296
after
287297
0 ->
@@ -292,38 +302,47 @@ defmodule Task.Supervised do
292302
defp stream_mfa({mod, fun, args}, arg), do: {mod, fun, [arg | args]}
293303
defp stream_mfa(fun, arg), do: {:erlang, :apply, [fun, [arg]]}
294304

295-
defp stream_spawn(value, spawned, waiting, mfa, spawn, monitor_pid, monitor_ref) do
296-
owner = self()
297-
{type, pid} = spawn.(owner, stream_mfa(mfa, value))
298-
send(monitor_pid, {:up, owner, monitor_ref, spawned, type, pid})
299-
Map.put(waiting, spawned, {pid, :running})
305+
defp stream_spawn(value, spawned, waiting, monitor_pid, monitor_ref, timeout) do
306+
send(monitor_pid, {:spawn, spawned, value})
307+
308+
receive do
309+
{:spawned, {^monitor_ref, ^spawned}, pid} ->
310+
send(pid, {self(), {monitor_ref, spawned}})
311+
Map.put(waiting, spawned, {pid, :running})
312+
{:DOWN, ^monitor_ref, _, ^monitor_pid, reason} ->
313+
stream_cleanup_inbox(monitor_pid, monitor_ref)
314+
exit({reason, {__MODULE__, :stream, [timeout]}})
315+
end
300316
end
301317

302-
defp stream_monitor(parent_pid) do
318+
defp stream_monitor(parent_pid, mfa, spawn, trap_exit) do
319+
Process.flag(:trap_exit, trap_exit)
303320
parent_ref = Process.monitor(parent_pid)
304321
receive do
305322
{^parent_pid, monitor_ref} ->
306-
stream_monitor(parent_pid, parent_ref, monitor_ref, %{})
323+
stream_monitor(parent_pid, parent_ref, mfa, spawn, monitor_ref, %{})
307324
{:DOWN, ^parent_ref, _, _, reason} ->
308325
exit(reason)
309326
end
310327
end
311328

312-
defp stream_monitor(parent_pid, parent_ref, monitor_ref, counters) do
329+
defp stream_monitor(parent_pid, parent_ref, mfa, spawn, monitor_ref, counters) do
313330
receive do
314-
{:up, owner, ^monitor_ref, counter, type, pid} ->
331+
{:spawn, counter, value} ->
332+
{type, pid} = spawn.(parent_pid, stream_mfa(mfa, value))
315333
ref = Process.monitor(pid)
316-
send(pid, {owner, {monitor_ref, counter}})
334+
send(parent_pid, {:spawned, {monitor_ref, counter}, pid})
317335
counters = Map.put(counters, ref, {counter, type, pid})
318-
stream_monitor(parent_pid, parent_ref, monitor_ref, counters)
319-
{:down, ^monitor_ref} ->
336+
stream_monitor(parent_pid, parent_ref, mfa, spawn, monitor_ref, counters)
337+
{:stop, ^monitor_ref} ->
338+
Process.flag(:trap_exit, true)
320339
for {ref, {_counter, _, pid}} <- counters do
321340
Process.exit(pid, :kill)
322341
receive do
323342
{:DOWN, ^ref, _, _, _} -> :ok
324343
end
325344
end
326-
exit({:shutdown, monitor_ref})
345+
exit(:normal)
327346
{:DOWN, ^parent_ref, _, _, reason} ->
328347
for {_ref, {_counter, :link, pid}} <- counters do
329348
Process.exit(pid, reason)
@@ -332,7 +351,9 @@ defmodule Task.Supervised do
332351
{:DOWN, ref, _, _, reason} ->
333352
{{counter, _, _}, counters} = Map.pop(counters, ref)
334353
send(parent_pid, {:down, {monitor_ref, counter}, reason})
335-
stream_monitor(parent_pid, parent_ref, monitor_ref, counters)
354+
stream_monitor(parent_pid, parent_ref, mfa, spawn, monitor_ref, counters)
355+
{:EXIT, _, _} ->
356+
stream_monitor(parent_pid, parent_ref, mfa, spawn, monitor_ref, counters)
336357
end
337358
end
338359
end

lib/elixir/test/elixir/task_test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ defmodule TaskTest do
492492
Process.flag(:trap_exit, true)
493493
assert 1..4 |> Task.async_stream(&exit/1, @opts) |> Enum.to_list ==
494494
[exit: 1, exit: 2, exit: 3, exit: 4]
495+
refute_received {:EXIT, _, _}
495496
end
496497

497498
test "shuts down unused tasks" do

lib/logger/test/logger/translator_test.exs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@ defmodule Logger.TranslatorTest do
150150
"""s
151151
end
152152

153+
test "translates Task async_stream crashes with neighbour" do
154+
fun = fn -> Task.async_stream([:oops], :erlang, :error, []) |> Enum.to_list() end
155+
{:ok, pid} = Task.start(__MODULE__, :task, [self(), fun])
156+
157+
assert capture_log(:debug, fn ->
158+
ref = Process.monitor(pid)
159+
send(pid, :go)
160+
receive do: ({:DOWN, ^ref, _, _, _} -> :ok)
161+
end) =~ ~r"""
162+
Neighbours:
163+
#{inspect pid}
164+
Initial Call: Logger\.TranslatorTest\.task/2
165+
"""
166+
end
167+
153168
test "translates Task undef module crash" do
154169
assert capture_log(fn ->
155170
{:ok, pid} = Task.start(:module_does_not_exist, :undef, [])
@@ -244,12 +259,25 @@ defmodule Logger.TranslatorTest do
244259
"""
245260
end
246261

262+
test "translates Process crashes" do
263+
assert capture_log(:info, fn ->
264+
{_, ref} = spawn_monitor(fn() -> raise "oops" end)
265+
receive do: ({:DOWN, ^ref, _, _, _} -> :ok)
266+
# Even though the monitor has been received the emulator may not have
267+
# sent the message to the error logger
268+
Process.sleep(200)
269+
end) =~ ~r"""
270+
\[error\] Process #PID<\d+\.\d+\.\d+>\ raised an exception
271+
\*\* \(RuntimeError\) oops
272+
"""
273+
end
274+
247275
test "translates :proc_lib crashes" do
248276
{:ok, pid} = Task.start_link(__MODULE__, :task, [self()])
249277

250278
assert capture_log(:info, fn ->
251279
ref = Process.monitor(pid)
252-
send(pid, :message)
280+
253281
send(pid, :go)
254282
receive do: ({:DOWN, ^ref, _, _, _} -> :ok)
255283
end) =~ ~r"""
@@ -262,19 +290,6 @@ defmodule Logger.TranslatorTest do
262290
"""s
263291
end
264292

265-
test "translates Process crashes" do
266-
assert capture_log(:info, fn ->
267-
{_, ref} = spawn_monitor(fn() -> raise "oops" end)
268-
receive do: ({:DOWN, ^ref, _, _, _} -> :ok)
269-
# Even though the monitor has been received the emulator may not have
270-
# sent the message to the error logger
271-
Process.sleep(200)
272-
end) =~ ~r"""
273-
\[error\] Process #PID<\d+\.\d+\.\d+>\ raised an exception
274-
\*\* \(RuntimeError\) oops
275-
"""
276-
end
277-
278293
test "translates :proc_lib crashes with name" do
279294
{:ok, pid} = Task.start_link(__MODULE__, :task,
280295
[self(), fn() ->
@@ -648,5 +663,4 @@ defmodule Logger.TranslatorTest do
648663
:proc_lib.init_ack({:ok, self()})
649664
receive do: ({:EXIT, _, _} -> exit(:stop))
650665
end
651-
652666
end

0 commit comments

Comments
 (0)