Skip to content

Commit a93f20d

Browse files
kjnilssonmergify[bot]
authored andcommitted
Quorum queues: avoid potential crash when returning message.
Returns reaching a Ra member that used to be leader but now has stepped down would cause that follower to crash and restart. This commit avoids this scenario as well as giving the return commands a good chance of being resent to the new leader in a timeley manner. (see the Ra release for this). (cherry picked from commit 01f6d0f)
1 parent 1c98761 commit a93f20d

File tree

4 files changed

+36
-5
lines changed

4 files changed

+36
-5
lines changed

MODULE.bazel

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ erlang_package.hex_package(
267267
erlang_package.hex_package(
268268
name = "ra",
269269
build_file = "@rabbitmq-server//bazel:BUILD.ra",
270-
sha256 = "321353d6f25bf3ac629698999a5d402308d21bf236bd68a6f4e92bee095f2e5f",
271-
version = "2.5.0",
270+
sha256 = "f5479a68c66bfee4ce96dcca81e4457cc99e4ab06bd350b83a0e5ac2ff29c78a",
271+
version = "2.5.1-pre.1",
272272
)
273273

274274
erlang_package.hex_package(

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,8 +1003,8 @@ handle_aux(leader, _, garbage_collection, Aux, Log, MacState) ->
10031003
{no_reply, force_eval_gc(Log, MacState, Aux), Log};
10041004
handle_aux(follower, _, garbage_collection, Aux, Log, MacState) ->
10051005
{no_reply, force_eval_gc(Log, MacState, Aux), Log};
1006-
handle_aux(leader, cast, {#return{msg_ids = MsgIds,
1007-
consumer_id = ConsumerId}, Corr, Pid},
1006+
handle_aux(_RaftState, cast, {#return{msg_ids = MsgIds,
1007+
consumer_id = ConsumerId}, Corr, Pid},
10081008
Aux0, Log0, #?MODULE{cfg = #cfg{delivery_limit = undefined},
10091009
consumers = Consumers}) ->
10101010
case Consumers of

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ groups() ->
6969
metrics_cleanup_on_leadership_takeover,
7070
metrics_cleanup_on_leader_crash,
7171
consume_in_minority,
72+
reject_after_leader_transfer,
7273
shrink_all,
7374
rebalance,
7475
file_handle_reservations,
@@ -907,6 +908,36 @@ consume_in_minority(Config) ->
907908
ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
908909
ok.
909910

911+
reject_after_leader_transfer(Config) ->
912+
[Server0, Server1, Server2] =
913+
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
914+
915+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
916+
QQ = ?config(queue_name, Config),
917+
RaName = binary_to_atom(<<"%2F_", QQ/binary>>, utf8),
918+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
919+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
920+
publish(Ch, QQ),
921+
922+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
923+
{#'basic.get_ok'{delivery_tag = Tag}, #amqp_msg{}} =
924+
amqp_channel:call(Ch2, #'basic.get'{queue = QQ, no_ack = false}),
925+
926+
ServerId1 = {RaName, Server1},
927+
ct:pal("transfser leadership ~p",
928+
[rabbit_ct_broker_helpers:rpc(Config, 0, ra,
929+
transfer_leadership, [ServerId1, ServerId1])]),
930+
ok = amqp_channel:call(Ch2, #'basic.reject'{delivery_tag = Tag,
931+
requeue = true}),
932+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
933+
934+
{#'basic.get_ok'{delivery_tag = Tag2}, #amqp_msg{}} =
935+
amqp_channel:call(Ch2, #'basic.get'{queue = QQ, no_ack = false}),
936+
937+
ok = amqp_channel:call(Ch2, #'basic.reject'{delivery_tag = Tag2,
938+
requeue = true}),
939+
ok.
940+
910941
shrink_all(Config) ->
911942
[Server0, Server1, Server2] =
912943
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ dep_cowboy = hex 2.8.0
115115
dep_cowlib = hex 2.12.1
116116
dep_looking_glass = git https://github.com/rabbitmq/looking_glass.git master
117117
dep_prometheus = hex 4.10.0
118-
dep_ra = hex 2.5.0
118+
dep_ra = hex 2.5.1-pre.1
119119
dep_ranch = hex 2.1.0
120120
dep_recon = hex 2.5.3
121121
dep_redbug = hex 2.0.7

0 commit comments

Comments
 (0)