Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,13 @@ defmodule Broadway do
when set to `:flush`, the batch the message is in is immediately delivered. When set
to `:bulk`, batch is delivered when its size or timeout is reached.
"""
],
batcher: [
type: :atom,
default: :default,
doc: """
the name of the batcher to use for the test message.
"""
]
]

Expand Down Expand Up @@ -1372,6 +1379,7 @@ defmodule Broadway do

defp test_messages(broadway, data, batch_mode, opts) when is_broadway_name(broadway) do
metadata = opts |> Keyword.fetch!(:metadata) |> Map.new()
batcher = Keyword.get(opts, :batcher, :default)

acknowledger =
Keyword.get(opts, :acknowledger, fn _data, ack_ref ->
Expand All @@ -1383,7 +1391,14 @@ defmodule Broadway do
messages =
Enum.map(data, fn data ->
ack = acknowledger.(data, {self(), ref})
%Message{data: data, acknowledger: ack, batch_mode: batch_mode, metadata: metadata}

%Message{
data: data,
acknowledger: ack,
batch_mode: batch_mode,
metadata: metadata,
batcher: batcher
}
end)

:ok = push_messages(broadway, messages)
Expand Down
33 changes: 33 additions & 0 deletions test/broadway_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,39 @@ defmodule BroadwayTest do
assert_receive {:ack, ^ref, [%{status: :ok}, %{status: :ok}], []}
end

test "successful messages are marked as :ok with specific batcher name" do
broadway_name = new_unique_name()

handle_message = fn message, _ ->
case message.data do
:fail -> Message.failed(message, "Failed message")
_ -> message
end
end

handle_batch = fn _, batch, _, _ ->
send(self(), {:batch_handled, batch})
batch
end

context = %{
handle_message: handle_message,
handle_batch: handle_batch
}

{:ok, _broadway} =
Broadway.start_link(CustomHandlers,
name: broadway_name,
context: context,
producer: [module: {ManualProducer, []}],
processors: [default: [concurrency: 1, min_demand: 1, max_demand: 2]],
batchers: [default: [batch_size: 2], custom_batcher: [batch_size: 2]]
)

ref = Broadway.test_batch(broadway_name, [1, 2], batcher: :custom_batcher)
assert_receive {:ack, ^ref, [%{data: 1, status: :ok}, %{data: 2, status: :ok}], []}
end

test "failed messages are marked as {:failed, reason}", %{broadway: broadway} do
ref = Broadway.test_message(broadway, :fail)
assert_receive {:ack, ^ref, _, [%{status: {:failed, "Failed message"}}]}
Expand Down