Skip to content

Commit 4f15eca

Browse files
kjnilssonmergify[bot]
authored andcommitted
Streams: make at-most-once dead lettering to work
Previously osiris did not support uncorrelated writes which means we could not use a "stateless" queue type delivery and these were silently dropped. This had the impact that at-most-once dead letter was not possible where the dead letter target is a stream. This change bumps the osiris version that has the required API to allow for uncorrelated writes (osiris:write/2). Currently there is no feature flag to control this as osiris writer processes just logs and drops any messages they don't understand. (cherry picked from commit e7d7f6f)
1 parent 08aac6b commit 4f15eca

File tree

4 files changed

+47
-9
lines changed

4 files changed

+47
-9
lines changed

MODULE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ erlang_package.hex_package(
247247

248248
erlang_package.git_package(
249249
repository = "rabbitmq/osiris",
250-
tag = "v1.4.3",
250+
tag = "v1.5.0",
251251
)
252252

253253
erlang_package.hex_package(

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
148148
PLT_APPS += mnesia
149149

150150
dep_syslog = git https://github.com/schlagert/syslog 4.0.0
151-
dep_osiris = git https://github.com/rabbitmq/osiris v1.4.3
151+
dep_osiris = git https://github.com/rabbitmq/osiris v1.5.0
152152
dep_systemd = hex 0.6.1
153153
dep_seshat = hex 0.4.0
154154

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,11 @@ credit(QName, CTag, Credit, Drain, #stream_client{readers = Readers0,
365365
{State#stream_client{readers = Readers}, [{send_credit_reply, length(Msgs)},
366366
{deliver, CTag, true, Msgs}] ++ Actions}.
367367

368-
deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
368+
deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) ->
369369
lists:foldl(
370-
fun({_Q, stateless}, {Qs, Actions}) ->
371-
%% TODO what do we do with stateless?
372-
%% QRef = amqqueue:get_pid(Q),
373-
%% ok = rabbit_fifo_client:untracked_enqueue(
374-
%% [QRef], Delivery#delivery.message),
370+
fun({Q, stateless}, {Qs, Actions}) ->
371+
LeaderPid = amqqueue:get_pid(Q),
372+
ok = osiris:write(LeaderPid, msg_to_iodata(Msg)),
375373
{Qs, Actions};
376374
({Q, S0}, {Qs, Actions}) ->
377375
{S, As} = deliver(Confirm, Delivery, S0),

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ all_tests() ->
124124
update_retention_policy,
125125
queue_info,
126126
tracking_status,
127-
restart_stream
127+
restart_stream,
128+
dead_letter_target
128129
].
129130

130131
%% -------------------------------------------------------------------
@@ -2329,6 +2330,45 @@ purge(Config) ->
23292330
amqp_channel:call(Ch, #'queue.purge'{queue = Q})),
23302331
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
23312332

2333+
dead_letter_target(Config) ->
2334+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2335+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2336+
Q = ?config(queue_name, Config),
2337+
?assertEqual({'queue.declare_ok', Q, 0, 0},
2338+
declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
2339+
2340+
SourceQ = <<Q/binary, "_source">>,
2341+
?assertEqual({'queue.declare_ok', SourceQ, 0, 0},
2342+
declare(Ch, SourceQ, [{<<"x-queue-type">>, longstr, <<"classic">>},
2343+
{<<"x-dead-letter-exchange">>, longstr, <<>>},
2344+
{<<"x-dead-letter-routing-key">>, longstr, Q}
2345+
])),
2346+
2347+
publish_confirm(Ch, SourceQ, [<<"msg">>]),
2348+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
2349+
qos(Ch1, 1, false),
2350+
CTag = <<"ctag">>,
2351+
amqp_channel:subscribe(Ch1,
2352+
#'basic.consume'{queue = SourceQ,
2353+
no_ack = false,
2354+
consumer_tag = CTag},
2355+
self()),
2356+
receive
2357+
#'basic.consume_ok'{consumer_tag = CTag} ->
2358+
ok
2359+
after 5000 ->
2360+
exit(basic_consume_ok_timeout)
2361+
end,
2362+
receive
2363+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
2364+
ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag,
2365+
requeue =false,
2366+
multiple = false}),
2367+
quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]])
2368+
after 5000 ->
2369+
exit(timeout)
2370+
end,
2371+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
23322372
%%----------------------------------------------------------------------------
23332373

23342374
delete_queues(Qs) when is_list(Qs) ->

0 commit comments

Comments
 (0)