Skip to content

Commit 72a48e9

Browse files
Reconcile QQ node dead during delete and redeclare
Co-authored-by: Péter Gömöri <[email protected]>
1 parent fb9f048 commit 72a48e9

File tree

2 files changed

+219
-23
lines changed

2 files changed

+219
-23
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,12 @@ start_cluster(Q) ->
275275
{LeaderNode, FollowerNodes} =
276276
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
277277
LeaderId = {RaName, LeaderNode},
278+
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
279+
|| Node <- [LeaderNode | FollowerNodes]]),
278280
NewQ0 = amqqueue:set_pid(Q, LeaderId),
279281
NewQ1 = amqqueue:set_type_state(NewQ0,
280-
#{nodes => [LeaderNode | FollowerNodes]}),
282+
#{nodes => [LeaderNode | FollowerNodes],
283+
uids => UIDs}),
281284

282285
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
283286
rabbit_fifo, version, [],
@@ -791,6 +794,24 @@ recover(_Vhost, Queues) ->
791794
ServerId = {Name, node()},
792795
QName = amqqueue:get_name(Q0),
793796
MutConf = make_mutable_config(Q0),
797+
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
798+
QTypeState0 = amqqueue:get_type_state(Q0),
799+
RaUIds = maps:get(uids, QTypeState0, undefined),
800+
QTypeState = case RaUIds of
801+
undefined ->
802+
%% Queue is not aware of node to uid mapping, do nothing
803+
QTypeState0;
804+
#{node() := RaUId} ->
805+
%% Queue is aware and uid for current node is correct, do nothing
806+
QTypeState0;
807+
_ ->
808+
%% Queue is aware but either current node has no UId or it
809+
%% does not match the one returned by ra_directory, regen uid
810+
maybe_delete_data_dir(RaUId),
811+
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
812+
QTypeState0#{uids := RaUIds#{node() => NewRaUId}}
813+
end,
814+
Q = amqqueue:set_type_state(Q0, QTypeState),
794815
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
795816
ok ->
796817
% queue was restarted, good
@@ -803,7 +824,7 @@ recover(_Vhost, Queues) ->
803824
[rabbit_misc:rs(QName), Err1]),
804825
% queue was never started on this node
805826
% so needs to be started from scratch.
806-
case start_server(make_ra_conf(Q0, ServerId)) of
827+
case start_server(make_ra_conf(Q, ServerId)) of
807828
ok -> ok;
808829
Err2 ->
809830
rabbit_log:warning("recover: quorum queue ~w could not"
@@ -825,8 +846,7 @@ recover(_Vhost, Queues) ->
825846
%% present in the rabbit_queue table and not just in
826847
%% rabbit_durable_queue
827848
%% So many code paths are dependent on this.
828-
ok = rabbit_db_queue:set_dirty(Q0),
829-
Q = Q0,
849+
ok = rabbit_db_queue:set_dirty(Q),
830850
case Res of
831851
ok ->
832852
{[Q | R0], F0};
@@ -1207,12 +1227,17 @@ cleanup_data_dir() ->
12071227
maybe_delete_data_dir(UId) ->
12081228
_ = ra_directory:unregister_name(?RA_SYSTEM, UId),
12091229
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
1210-
{ok, Config} = ra_log:read_config(Dir),
1211-
case maps:get(machine, Config) of
1212-
{module, rabbit_fifo, _} ->
1213-
ra_lib:recursive_delete(Dir);
1214-
_ ->
1215-
ok
1230+
case filelib:is_dir(Dir) of
1231+
false ->
1232+
ok;
1233+
true ->
1234+
{ok, Config} = ra_log:read_config(Dir),
1235+
case maps:get(machine, Config) of
1236+
{module, rabbit_fifo, _} ->
1237+
ra_lib:recursive_delete(Dir);
1238+
_ ->
1239+
ok
1240+
end
12161241
end.
12171242

12181243
policy_changed(Q) ->
@@ -1378,16 +1403,30 @@ add_member(Q, Node, Membership) ->
13781403
do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).
13791404

13801405

1381-
do_add_member(Q, Node, Membership, Timeout)
1382-
when ?is_amqqueue(Q) andalso
1383-
?amqqueue_is_quorum(Q) andalso
1406+
do_add_member(Q0, Node, Membership, Timeout)
1407+
when ?is_amqqueue(Q0) andalso
1408+
?amqqueue_is_quorum(Q0) andalso
13841409
is_atom(Node) ->
1385-
{RaName, _} = amqqueue:get_pid(Q),
1386-
QName = amqqueue:get_name(Q),
1410+
{RaName, _} = amqqueue:get_pid(Q0),
1411+
QName = amqqueue:get_name(Q0),
13871412
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
13881413
ServerId = {RaName, Node},
1389-
Members = members(Q),
1390-
1414+
Members = members(Q0),
1415+
QTypeState0 = amqqueue:get_type_state(Q0),
1416+
RaUIds = maps:get(uids, QTypeState0, undefined),
1417+
QTypeState = case RaUIds of
1418+
undefined ->
1419+
%% Queue is not aware of node to uid mapping, do nothing
1420+
QTypeState0;
1421+
#{Node := _} ->
1422+
%% Queue is aware and uid for targeted node exists, do nothing
1423+
QTypeState0;
1424+
_ ->
1425+
%% Queue is aware but current node has no UId, regen uid
1426+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1427+
QTypeState0#{uids := RaUIds#{Node => NewRaUId}}
1428+
end,
1429+
Q = amqqueue:set_type_state(Q0, QTypeState),
13911430
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
13921431
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
13931432
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1477,7 +1516,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
14771516
Fun = fun(Q1) ->
14781517
update_type_state(
14791518
Q1,
1480-
fun(#{nodes := Nodes} = Ts) ->
1519+
fun(#{nodes := Nodes,
1520+
uids := UIds} = Ts) ->
1521+
Ts#{nodes => lists:delete(Node, Nodes),
1522+
uids => maps:remove(Node, UIds)};
1523+
(#{nodes := Nodes} = Ts) ->
14811524
Ts#{nodes => lists:delete(Node, Nodes)}
14821525
end)
14831526
end,
@@ -1986,7 +2029,15 @@ make_ra_conf(Q, ServerId, TickTimeout,
19862029
QName = amqqueue:get_name(Q),
19872030
RaMachine = ra_machine(Q),
19882031
[{ClusterName, _} | _] = Members = members(Q),
1989-
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
2032+
{_, Node} = ServerId,
2033+
UId = case amqqueue:get_type_state(Q) of
2034+
#{uids := #{Node := Id}} ->
2035+
Id;
2036+
_ ->
2037+
%% Queue was declared on an older version of RabbitMQ
2038+
%% and does not have the node to uid mappings
2039+
ra:new_uid(ra_lib:to_binary(ClusterName))
2040+
end,
19902041
FName = rabbit_misc:rs(QName),
19912042
Formatter = {?MODULE, format_ra_event, [QName]},
19922043
LogCfg = #{uid => UId,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 149 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,9 @@ groups() ->
105105
force_checkpoint,
106106
policy_repair,
107107
gh_12635,
108-
replica_states
108+
replica_states,
109+
restart_after_queue_reincarnation,
110+
no_messages_after_queue_reincarnation
109111
]
110112
++ all_tests()},
111113
{cluster_size_5, [], [start_queue,
@@ -2802,15 +2804,21 @@ add_member_wrong_type(Config) ->
28022804
[<<"/">>, SQ, Server, voter, 5000])).
28032805

28042806
add_member_already_a_member(Config) ->
2805-
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2807+
[Server, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
28062808
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
28072809
QQ = ?config(queue_name, Config),
28082810
?assertEqual({'queue.declare_ok', QQ, 0, 0},
28092811
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2812+
R1 = rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}]),
28102813
%% idempotent by design
28112814
?assertEqual(ok,
28122815
rpc:call(Server, rabbit_quorum_queue, add_member,
2813-
[<<"/">>, QQ, Server, voter, 5000])).
2816+
[<<"/">>, QQ, Server, voter, 5000])),
2817+
?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])),
2818+
?assertEqual(ok,
2819+
rpc:call(Server, rabbit_quorum_queue, add_member,
2820+
[<<"/">>, QQ, Server2, voter, 5000])),
2821+
?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])).
28142822

28152823
add_member_not_found(Config) ->
28162824
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -4880,6 +4888,140 @@ replica_states(Config) ->
48804888
end
48814889
end, Result2).
48824890

4891+
% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131
4892+
restart_after_queue_reincarnation(Config) ->
4893+
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4894+
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
4895+
QName = <<"QQ">>,
4896+
4897+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4898+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4899+
4900+
[Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4901+
VHost = amqqueue:get_vhost(Q),
4902+
4903+
MessagesPublished = 1000,
4904+
publish_many(Ch, QName, MessagesPublished),
4905+
4906+
%% Trigger a snapshot by purging the queue.
4907+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, purge, [Q]),
4908+
4909+
%% Stop S3
4910+
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3),
4911+
?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)),
4912+
4913+
%% Delete and re-declare queue with the same name.
4914+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]),
4915+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4916+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4917+
4918+
% Now S3 should have the old queue state, and S1 and S2 a new one.
4919+
St1 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]),
4920+
Status0 = [{proplists:get_value(<<"Node Name">>, S), S} || S <- St1],
4921+
S3_Status1 = proplists:get_value(S3, Status0),
4922+
Others_Status1 = [V || {_K, V} <- proplists:delete(S3, Status0)],
4923+
4924+
S3_LastLogIndex = proplists:get_value(<<"Last Log Index">>, S3_Status1),
4925+
S3_LastWritten = proplists:get_value(<<"Last Written">>, S3_Status1),
4926+
S3_LastApplied = proplists:get_value(<<"Last Applied">>, S3_Status1),
4927+
S3_CommitIndex = proplists:get_value(<<"Commit Index">>, S3_Status1),
4928+
S3_Term = proplists:get_value(<<"Term">>, S3_Status1),
4929+
4930+
?assertEqual(noproc, proplists:get_value(<<"Raft State">>, S3_Status1)),
4931+
?assertEqual(unknown, proplists:get_value(<<"Membership">>, S3_Status1)),
4932+
[begin
4933+
?assert(S3_LastLogIndex > proplists:get_value(<<"Last Log Index">>, O)),
4934+
?assert(S3_LastWritten > proplists:get_value(<<"Last Written">>, O)),
4935+
?assert(S3_LastApplied > proplists:get_value(<<"Last Applied">>, O)),
4936+
?assert(S3_CommitIndex > proplists:get_value(<<"Commit Index">>, O)),
4937+
?assertEqual(S3_Term, proplists:get_value(<<"Term">>, O))
4938+
end || O <- Others_Status1],
4939+
4940+
%% Bumping term in online nodes
4941+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]),
4942+
4943+
%% Restart S3
4944+
?assertEqual(ok, rabbit_control_helper:command(start_app, S3)),
4945+
4946+
timer:sleep(1000),
4947+
4948+
%% Now all three nodes should have the new state.
4949+
Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]),
4950+
% They are either leader or follower.
4951+
?assert(
4952+
lists:all(
4953+
fun(NodeStatus) ->
4954+
NodeRaftState = proplists:get_value(<<"Raft State">>, NodeStatus),
4955+
lists:member(NodeRaftState, [leader, follower])
4956+
end, Status2)),
4957+
% Remove "Node Name" and "Raft State" from the status.
4958+
Status3 = [NE1, NE2, NE3]= [
4959+
begin
4960+
R = proplists:delete(<<"Node Name">>, NodeEntry),
4961+
proplists:delete(<<"Raft State">>, R)
4962+
end || NodeEntry <- Status2],
4963+
% Check all other properties have same value on all nodes.
4964+
ct:pal("Status3: ~tp", [Status3]),
4965+
[
4966+
begin
4967+
?assertEqual(V, proplists:get_value(K, NE2)),
4968+
?assertEqual(V, proplists:get_value(K, NE3))
4969+
end || {K, V} <- NE1
4970+
].
4971+
4972+
% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366
4973+
no_messages_after_queue_reincarnation(Config) ->
4974+
[S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
4975+
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
4976+
QName = <<"QQ">>,
4977+
4978+
?assertEqual({'queue.declare_ok', QName, 0, 0},
4979+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
4980+
4981+
[Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []),
4982+
4983+
publish(Ch, QName, <<"msg1">>),
4984+
publish(Ch, QName, <<"msg2">>),
4985+
4986+
%% Stop S3
4987+
rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3),
4988+
?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)),
4989+
4990+
qos(Ch, 1, false),
4991+
subscribe(Ch, QName, false, <<"tag0">>, [], 500),
4992+
DeliveryTag = receive
4993+
{#'basic.deliver'{delivery_tag = DT}, #amqp_msg{}} ->
4994+
receive
4995+
{#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} ->
4996+
ct:fail("did not expect the second one")
4997+
after 500 ->
4998+
DT
4999+
end
5000+
after 500 ->
5001+
ct:fail("Expected some delivery, but got none")
5002+
end,
5003+
5004+
%% Delete and re-declare queue with the same name.
5005+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]),
5006+
?assertEqual({'queue.declare_ok', QName, 0, 0},
5007+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
5008+
5009+
%% Bumping term in online nodes
5010+
rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]),
5011+
5012+
%% Restart S3
5013+
?assertEqual(ok, rabbit_control_helper:command(start_app, S3)),
5014+
5015+
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
5016+
multiple = false}),
5017+
%% No message should be delivered after reincarnation
5018+
receive
5019+
{#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} ->
5020+
ct:fail("Expected no deliveries, but got one")
5021+
after 500 ->
5022+
ok
5023+
end.
5024+
48835025
%%----------------------------------------------------------------------------
48845026

48855027
same_elements(L1, L2)
@@ -4949,7 +5091,10 @@ consume_empty(Ch, Queue, NoAck) ->
49495091
subscribe(Ch, Queue, NoAck) ->
49505092
subscribe(Ch, Queue, NoAck, <<"ctag">>, []).
49515093

5094+
49525095
subscribe(Ch, Queue, NoAck, Tag, Args) ->
5096+
subscribe(Ch, Queue, NoAck, Tag, Args, ?TIMEOUT).
5097+
subscribe(Ch, Queue, NoAck, Tag, Args, Timeout) ->
49535098
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
49545099
no_ack = NoAck,
49555100
arguments = Args,
@@ -4958,7 +5103,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) ->
49585103
receive
49595104
#'basic.consume_ok'{consumer_tag = Tag} ->
49605105
ok
4961-
after ?TIMEOUT ->
5106+
after Timeout ->
49625107
flush(100),
49635108
exit(subscribe_timeout)
49645109
end.

0 commit comments

Comments
 (0)