diff --git a/lib/broadway.ex b/lib/broadway.ex index 1dd951f..7933031 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -1268,6 +1268,13 @@ defmodule Broadway do when set to `:flush`, the batch the message is in is immediately delivered. When set to `:bulk`, batch is delivered when its size or timeout is reached. """ + ], + batcher: [ + type: :atom, + default: :default, + doc: """ + the name of the batcher to use for the test message. + """ ] ] @@ -1372,6 +1379,7 @@ defmodule Broadway do defp test_messages(broadway, data, batch_mode, opts) when is_broadway_name(broadway) do metadata = opts |> Keyword.fetch!(:metadata) |> Map.new() + batcher = Keyword.get(opts, :batcher, :default) acknowledger = Keyword.get(opts, :acknowledger, fn _data, ack_ref -> @@ -1383,7 +1391,14 @@ defmodule Broadway do messages = Enum.map(data, fn data -> ack = acknowledger.(data, {self(), ref}) - %Message{data: data, acknowledger: ack, batch_mode: batch_mode, metadata: metadata} + + %Message{ + data: data, + acknowledger: ack, + batch_mode: batch_mode, + metadata: metadata, + batcher: batcher + } end) :ok = push_messages(broadway, messages) diff --git a/test/broadway_test.exs b/test/broadway_test.exs index 36f8553..5ae0f96 100644 --- a/test/broadway_test.exs +++ b/test/broadway_test.exs @@ -791,6 +791,39 @@ defmodule BroadwayTest do assert_receive {:ack, ^ref, [%{status: :ok}, %{status: :ok}], []} end + test "successful messages are marked as :ok with specific batcher name" do + broadway_name = new_unique_name() + + handle_message = fn message, _ -> + case message.data do + :fail -> Message.failed(message, "Failed message") + _ -> message + end + end + + handle_batch = fn _, batch, _, _ -> + send(self(), {:batch_handled, batch}) + batch + end + + context = %{ + handle_message: handle_message, + handle_batch: handle_batch + } + + {:ok, _broadway} = + Broadway.start_link(CustomHandlers, + name: broadway_name, + context: context, + producer: [module: {ManualProducer, []}], + processors: [default: [concurrency: 1, min_demand: 1, max_demand: 2]], + batchers: [default: [batch_size: 2], custom_batcher: [batch_size: 2]] + ) + + ref = Broadway.test_batch(broadway_name, [1, 2], batcher: :custom_batcher) + assert_receive {:ack, ^ref, [%{data: 1, status: :ok}, %{data: 2, status: :ok}], []} + end + test "failed messages are marked as {:failed, reason}", %{broadway: broadway} do ref = Broadway.test_message(broadway, :fail) assert_receive {:ack, ^ref, _, [%{status: {:failed, "Failed message"}}]}