Skip to content

Commit 35ef780

Browse files
committed
wip: add feature flag and put RaUids in nodes
1 parent 93101d4 commit 35ef780

File tree

3 files changed

+74
-37
lines changed

3 files changed

+74
-37
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,10 @@
211211
stability => stable,
212212
depends_on => ['rabbitmq_4.0.0']
213213
}}).
214+
215+
-rabbit_feature_flag(
216+
{'track_qq_members_uids',
217+
#{desc => "Track queue members UIDs in the metadata store",
218+
stability => stable,
219+
depends_on => []
220+
}}).

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,14 @@ start_cluster(Q) ->
278278
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
279279
|| Node <- [LeaderNode | FollowerNodes]]),
280280
NewQ0 = amqqueue:set_pid(Q, LeaderId),
281-
NewQ1 = amqqueue:set_type_state(NewQ0,
282-
#{nodes => [LeaderNode | FollowerNodes],
283-
uids => UIDs}),
281+
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
282+
false ->
283+
amqqueue:set_type_state(NewQ0,
284+
#{nodes => [LeaderNode | FollowerNodes]});
285+
true ->
286+
amqqueue:set_type_state(NewQ0,
287+
#{nodes => UIDs})
288+
end,
284289

285290
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
286291
rabbit_fifo, version, [],
@@ -725,7 +730,7 @@ repair_amqqueue_nodes(Q0) ->
725730
{Name, _} = amqqueue:get_pid(Q0),
726731
Members = ra_leaderboard:lookup_members(Name),
727732
RaNodes = [N || {_, N} <- Members],
728-
#{nodes := Nodes} = amqqueue:get_type_state(Q0),
733+
Nodes = get_nodes(Q0),
729734
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
730735
true ->
731736
%% up to date
@@ -734,7 +739,18 @@ repair_amqqueue_nodes(Q0) ->
734739
%% update amqqueue record
735740
Fun = fun (Q) ->
736741
TS0 = amqqueue:get_type_state(Q),
737-
TS = TS0#{nodes => RaNodes},
742+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
743+
andalso has_uuid_tracking(TS0)
744+
of
745+
false ->
746+
TS0#{nodes => RaNodes};
747+
true ->
748+
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
749+
[?RA_SYSTEM, Name],
750+
?RPC_TIMEOUT)}
751+
|| N <- RaNodes]),
752+
TS0#{nodes => RaUids}
753+
end,
738754
amqqueue:set_type_state(Q, TS)
739755
end,
740756
_ = rabbit_amqqueue:update(QName, Fun),
@@ -795,10 +811,9 @@ recover(_Vhost, Queues) ->
795811
QName = amqqueue:get_name(Q0),
796812
MutConf = make_mutable_config(Q0),
797813
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
798-
QTypeState0 = amqqueue:get_type_state(Q0),
799-
RaUIds = maps:get(uids, QTypeState0, undefined),
800-
QTypeState = case RaUIds of
801-
undefined ->
814+
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
815+
QTypeState = case Nodes of
816+
List when is_list(List) ->
802817
%% Queue is not aware of node to uid mapping, do nothing
803818
QTypeState0;
804819
#{node() := RaUId} ->
@@ -809,7 +824,7 @@ recover(_Vhost, Queues) ->
809824
%% does not match the one returned by ra_directory, regen uid
810825
maybe_delete_data_dir(RaUId),
811826
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
812-
QTypeState0#{uids := RaUIds#{node() => NewRaUId}}
827+
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
813828
end,
814829
Q = amqqueue:set_type_state(Q0, QTypeState),
815830
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
@@ -1412,21 +1427,20 @@ do_add_member(Q0, Node, Membership, Timeout)
14121427
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
14131428
ServerId = {RaName, Node},
14141429
Members = members(Q0),
1415-
QTypeState0 = amqqueue:get_type_state(Q0),
1416-
RaUIds = maps:get(uids, QTypeState0, undefined),
1417-
QTypeState = case RaUIds of
1418-
undefined ->
1419-
%% Queue is not aware of node to uid mapping, do nothing
1420-
QTypeState0;
1421-
#{Node := _} ->
1422-
%% Queue is aware and uid for targeted node exists, do nothing
1423-
QTypeState0;
1424-
_ ->
1425-
%% Queue is aware but current node has no UId, regen uid
1426-
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1427-
QTypeState0#{uids := RaUIds#{Node => NewRaUId}}
1428-
end,
1429-
Q = amqqueue:set_type_state(Q0, QTypeState),
1430+
QTypeState0 = #{nodes := _Nodes}= amqqueue:get_type_state(Q0),
1431+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1432+
%QTypeState = case Nodes of
1433+
% L when is_list(L) ->
1434+
% %% Queue is not aware of node to uid mapping, just add the new node
1435+
% QTypeState0#{nodes := lists:usort([Node | Nodes])};
1436+
% #{Node := _} ->
1437+
% %% Queue is aware and uid for targeted node exists, do nothing
1438+
% QTypeState0;
1439+
% _ ->
1440+
% %% Queue is aware but current node has no UId, regen uid
1441+
% QTypeState0#{nodes := Nodes#{Node => NewRaUId}}
1442+
%end,
1443+
Q = amqqueue:set_type_state(Q0, QTypeState0),
14301444
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
14311445
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
14321446
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1442,8 +1456,12 @@ do_add_member(Q0, Node, Membership, Timeout)
14421456
{ok, {RaIndex, RaTerm}, Leader} ->
14431457
Fun = fun(Q1) ->
14441458
Q2 = update_type_state(
1445-
Q1, fun(#{nodes := Nodes} = Ts) ->
1446-
Ts#{nodes => lists:usort([Node | Nodes])}
1459+
Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) ->
1460+
Ts#{nodes => lists:usort([Node | NodesList])};
1461+
(#{nodes := #{Node := _} = _NodesMap} = Ts) ->
1462+
Ts;
1463+
(#{nodes := NodesMap} = Ts) when is_map(NodesMap) ->
1464+
Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)}
14471465
end),
14481466
amqqueue:set_pid(Q2, Leader)
14491467
end,
@@ -1516,12 +1534,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
15161534
Fun = fun(Q1) ->
15171535
update_type_state(
15181536
Q1,
1519-
fun(#{nodes := Nodes,
1520-
uids := UIds} = Ts) ->
1521-
Ts#{nodes => lists:delete(Node, Nodes),
1522-
uids => maps:remove(Node, UIds)};
1523-
(#{nodes := Nodes} = Ts) ->
1524-
Ts#{nodes => lists:delete(Node, Nodes)}
1537+
fun(#{nodes := Nodes} = Ts) when is_list(Nodes) ->
1538+
Ts#{nodes => lists:delete(Node, Nodes)};
1539+
(#{nodes := Nodes} = Ts) when is_map(Nodes) ->
1540+
Ts#{nodes => maps:remove(Node, Nodes)}
15251541
end)
15261542
end,
15271543
_ = rabbit_amqqueue:update(QName, Fun),
@@ -2067,7 +2083,12 @@ make_mutable_config(Q) ->
20672083

20682084
get_nodes(Q) when ?is_amqqueue(Q) ->
20692085
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2070-
Nodes.
2086+
case Nodes of
2087+
List when is_list(List) ->
2088+
List;
2089+
Map when is_map(Map) ->
2090+
maps:keys(Map)
2091+
end.
20712092

20722093
get_connected_nodes(Q) when ?is_amqqueue(Q) ->
20732094
ErlangNodes = [node() | nodes()],
@@ -2423,3 +2444,8 @@ queue_vm_stats_sups() ->
24232444
queue_vm_ets() ->
24242445
{[quorum_ets],
24252446
[[ra_log_ets]]}.
2447+
2448+
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2449+
true;
2450+
has_uuid_tracking(_QTypeState) ->
2451+
false.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2862,10 +2862,14 @@ add_member_2(Config) ->
28622862
{<<"x-quorum-initial-group-size">>, long, 1}])),
28632863
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
28642864
[<<"/">>, QQ, Server0, 5000])),
2865-
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
2866-
[rabbit_misc:r(<<"/">>, queue, QQ)]),
2865+
#{online := Onlines} = ?awaitMatch(#{online := [_One, _Two]},
2866+
maps:from_list(rpc:call(Server0,
2867+
rabbit_quorum_queue,
2868+
infos,
2869+
[rabbit_misc:r(<<"/">>, queue, QQ)])),
2870+
3000),
28672871
Servers = lists:sort([Server0, Server1]),
2868-
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
2872+
?assertEqual(Servers, lists:sort(Onlines)).
28692873

28702874
delete_member_not_running(Config) ->
28712875
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)