From 72a48e98d7f19b9bb8b1d195b0b63d4762b613b7 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Wed, 16 Jul 2025 12:25:56 +0200 Subject: [PATCH 1/4] Reconcile QQ node dead during delete and redeclare MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Péter Gömöri --- deps/rabbit/src/rabbit_quorum_queue.erl | 89 +++++++++++--- deps/rabbit/test/quorum_queue_SUITE.erl | 153 +++++++++++++++++++++++- 2 files changed, 219 insertions(+), 23 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 4e192df874f5..286067a83c86 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -275,9 +275,12 @@ start_cluster(Q) -> {LeaderNode, FollowerNodes} = rabbit_queue_location:select_leader_and_followers(Q, QuorumSize), LeaderId = {RaName, LeaderNode}, + UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))} + || Node <- [LeaderNode | FollowerNodes]]), NewQ0 = amqqueue:set_pid(Q, LeaderId), NewQ1 = amqqueue:set_type_state(NewQ0, - #{nodes => [LeaderNode | FollowerNodes]}), + #{nodes => [LeaderNode | FollowerNodes], + uids => UIDs}), Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes, rabbit_fifo, version, [], @@ -791,6 +794,24 @@ recover(_Vhost, Queues) -> ServerId = {Name, node()}, QName = amqqueue:get_name(Q0), MutConf = make_mutable_config(Q0), + RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), + QTypeState0 = amqqueue:get_type_state(Q0), + RaUIds = maps:get(uids, QTypeState0, undefined), + QTypeState = case RaUIds of + undefined -> + %% Queue is not aware of node to uid mapping, do nothing + QTypeState0; + #{node() := RaUId} -> + %% Queue is aware and uid for current node is correct, do nothing + QTypeState0; + _ -> + %% Queue is aware but either current node has no UId or it + %% does not match the one returned by ra_directory, regen uid + maybe_delete_data_dir(RaUId), + NewRaUId = ra:new_uid(ra_lib:to_binary(Name)), + QTypeState0#{uids := RaUIds#{node() => NewRaUId}} + end, + Q = amqqueue:set_type_state(Q0, QTypeState), Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of ok -> % queue was restarted, good @@ -803,7 +824,7 @@ recover(_Vhost, Queues) -> [rabbit_misc:rs(QName), Err1]), % queue was never started on this node % so needs to be started from scratch. - case start_server(make_ra_conf(Q0, ServerId)) of + case start_server(make_ra_conf(Q, ServerId)) of ok -> ok; Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" @@ -825,8 +846,7 @@ recover(_Vhost, Queues) -> %% present in the rabbit_queue table and not just in %% rabbit_durable_queue %% So many code paths are dependent on this. - ok = rabbit_db_queue:set_dirty(Q0), - Q = Q0, + ok = rabbit_db_queue:set_dirty(Q), case Res of ok -> {[Q | R0], F0}; @@ -1207,12 +1227,17 @@ cleanup_data_dir() -> maybe_delete_data_dir(UId) -> _ = ra_directory:unregister_name(?RA_SYSTEM, UId), Dir = ra_env:server_data_dir(?RA_SYSTEM, UId), - {ok, Config} = ra_log:read_config(Dir), - case maps:get(machine, Config) of - {module, rabbit_fifo, _} -> - ra_lib:recursive_delete(Dir); - _ -> - ok + case filelib:is_dir(Dir) of + false -> + ok; + true -> + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir); + _ -> + ok + end end. policy_changed(Q) -> @@ -1378,16 +1403,30 @@ add_member(Q, Node, Membership) -> do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT). -do_add_member(Q, Node, Membership, Timeout) - when ?is_amqqueue(Q) andalso - ?amqqueue_is_quorum(Q) andalso +do_add_member(Q0, Node, Membership, Timeout) + when ?is_amqqueue(Q0) andalso + ?amqqueue_is_quorum(Q0) andalso is_atom(Node) -> - {RaName, _} = amqqueue:get_pid(Q), - QName = amqqueue:get_name(Q), + {RaName, _} = amqqueue:get_pid(Q0), + QName = amqqueue:get_name(Q0), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, - Members = members(Q), - + Members = members(Q0), + QTypeState0 = amqqueue:get_type_state(Q0), + RaUIds = maps:get(uids, QTypeState0, undefined), + QTypeState = case RaUIds of + undefined -> + %% Queue is not aware of node to uid mapping, do nothing + QTypeState0; + #{Node := _} -> + %% Queue is aware and uid for targeted node exists, do nothing + QTypeState0; + _ -> + %% Queue is aware but current node has no UId, regen uid + NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), + QTypeState0#{uids := RaUIds#{Node => NewRaUId}} + end, + Q = amqqueue:set_type_state(Q0, QTypeState), MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), case ra:start_server(?RA_SYSTEM, Conf) of @@ -1477,7 +1516,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> Fun = fun(Q1) -> update_type_state( Q1, - fun(#{nodes := Nodes} = Ts) -> + fun(#{nodes := Nodes, + uids := UIds} = Ts) -> + Ts#{nodes => lists:delete(Node, Nodes), + uids => maps:remove(Node, UIds)}; + (#{nodes := Nodes} = Ts) -> Ts#{nodes => lists:delete(Node, Nodes)} end) end, @@ -1986,7 +2029,15 @@ make_ra_conf(Q, ServerId, TickTimeout, QName = amqqueue:get_name(Q), RaMachine = ra_machine(Q), [{ClusterName, _} | _] = Members = members(Q), - UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + {_, Node} = ServerId, + UId = case amqqueue:get_type_state(Q) of + #{uids := #{Node := Id}} -> + Id; + _ -> + %% Queue was declared on an older version of RabbitMQ + %% and does not have the node to uid mappings + ra:new_uid(ra_lib:to_binary(ClusterName)) + end, FName = rabbit_misc:rs(QName), Formatter = {?MODULE, format_ra_event, [QName]}, LogCfg = #{uid => UId, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d4060..a87391f18ebf 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -105,7 +105,9 @@ groups() -> force_checkpoint, policy_repair, gh_12635, - replica_states + replica_states, + restart_after_queue_reincarnation, + no_messages_after_queue_reincarnation ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -2802,15 +2804,21 @@ add_member_wrong_type(Config) -> [<<"/">>, SQ, Server, voter, 5000])). add_member_already_a_member(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + R1 = rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}]), %% idempotent by design ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server, voter, 5000])). + [<<"/">>, QQ, Server, voter, 5000])), + ?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])), + ?assertEqual(ok, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server2, voter, 5000])), + ?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])). add_member_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -4880,6 +4888,140 @@ replica_states(Config) -> end end, Result2). +% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131 +restart_after_queue_reincarnation(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QName = <<"QQ">>, + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + VHost = amqqueue:get_vhost(Q), + + MessagesPublished = 1000, + publish_many(Ch, QName, MessagesPublished), + + %% Trigger a snapshot by purging the queue. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, purge, [Q]), + + %% Stop S3 + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3), + ?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)), + + %% Delete and re-declare queue with the same name. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + % Now S3 should have the old queue state, and S1 and S2 a new one. + St1 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + Status0 = [{proplists:get_value(<<"Node Name">>, S), S} || S <- St1], + S3_Status1 = proplists:get_value(S3, Status0), + Others_Status1 = [V || {_K, V} <- proplists:delete(S3, Status0)], + + S3_LastLogIndex = proplists:get_value(<<"Last Log Index">>, S3_Status1), + S3_LastWritten = proplists:get_value(<<"Last Written">>, S3_Status1), + S3_LastApplied = proplists:get_value(<<"Last Applied">>, S3_Status1), + S3_CommitIndex = proplists:get_value(<<"Commit Index">>, S3_Status1), + S3_Term = proplists:get_value(<<"Term">>, S3_Status1), + + ?assertEqual(noproc, proplists:get_value(<<"Raft State">>, S3_Status1)), + ?assertEqual(unknown, proplists:get_value(<<"Membership">>, S3_Status1)), + [begin + ?assert(S3_LastLogIndex > proplists:get_value(<<"Last Log Index">>, O)), + ?assert(S3_LastWritten > proplists:get_value(<<"Last Written">>, O)), + ?assert(S3_LastApplied > proplists:get_value(<<"Last Applied">>, O)), + ?assert(S3_CommitIndex > proplists:get_value(<<"Commit Index">>, O)), + ?assertEqual(S3_Term, proplists:get_value(<<"Term">>, O)) + end || O <- Others_Status1], + + %% Bumping term in online nodes + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]), + + %% Restart S3 + ?assertEqual(ok, rabbit_control_helper:command(start_app, S3)), + + timer:sleep(1000), + + %% Now all three nodes should have the new state. + Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + % They are either leader or follower. + ?assert( + lists:all( + fun(NodeStatus) -> + NodeRaftState = proplists:get_value(<<"Raft State">>, NodeStatus), + lists:member(NodeRaftState, [leader, follower]) + end, Status2)), + % Remove "Node Name" and "Raft State" from the status. + Status3 = [NE1, NE2, NE3]= [ + begin + R = proplists:delete(<<"Node Name">>, NodeEntry), + proplists:delete(<<"Raft State">>, R) + end || NodeEntry <- Status2], + % Check all other properties have same value on all nodes. + ct:pal("Status3: ~tp", [Status3]), + [ + begin + ?assertEqual(V, proplists:get_value(K, NE2)), + ?assertEqual(V, proplists:get_value(K, NE3)) + end || {K, V} <- NE1 + ]. + +% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366 +no_messages_after_queue_reincarnation(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QName = <<"QQ">>, + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + + publish(Ch, QName, <<"msg1">>), + publish(Ch, QName, <<"msg2">>), + + %% Stop S3 + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3), + ?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)), + + qos(Ch, 1, false), + subscribe(Ch, QName, false, <<"tag0">>, [], 500), + DeliveryTag = receive + {#'basic.deliver'{delivery_tag = DT}, #amqp_msg{}} -> + receive + {#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} -> + ct:fail("did not expect the second one") + after 500 -> + DT + end + after 500 -> + ct:fail("Expected some delivery, but got none") + end, + + %% Delete and re-declare queue with the same name. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Bumping term in online nodes + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]), + + %% Restart S3 + ?assertEqual(ok, rabbit_control_helper:command(start_app, S3)), + + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + %% No message should be delivered after reincarnation + receive + {#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} -> + ct:fail("Expected no deliveries, but got one") + after 500 -> + ok + end. + %%---------------------------------------------------------------------------- same_elements(L1, L2) @@ -4949,7 +5091,10 @@ consume_empty(Ch, Queue, NoAck) -> subscribe(Ch, Queue, NoAck) -> subscribe(Ch, Queue, NoAck, <<"ctag">>, []). + subscribe(Ch, Queue, NoAck, Tag, Args) -> + subscribe(Ch, Queue, NoAck, Tag, Args, ?TIMEOUT). +subscribe(Ch, Queue, NoAck, Tag, Args, Timeout) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, arguments = Args, @@ -4958,7 +5103,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) -> receive #'basic.consume_ok'{consumer_tag = Tag} -> ok - after ?TIMEOUT -> + after Timeout -> flush(100), exit(subscribe_timeout) end. From 93101d4a4f0083ed444f19b3dcf28969cee85ff7 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 24 Jul 2025 09:38:54 +0200 Subject: [PATCH 2/4] Move get_nodes from amqqueue to rabbit_amqqueue We are moving the functionality of getting the nodes/members of an amqqueue from the `amqqueue` module to `rabbit_amqqueue`. This goes in the line of previous PRs work towards reducing direct access to the `QueueTypeState`, such as https://github.com/rabbitmq/rabbitmq-server/pull/13905. Also, we will need to discretize different formats of the `nodes` entry in the `QueueTypeState`, to support both the previous one as a list of nodes and the new one as a map of nodes to Ra UIds. Doing so in a module such as `amqqueue`, which feels like an accessor module around the `amqqueue` record, doesn't feel right. --- deps/rabbit/src/amqqueue.erl | 10 ---------- deps/rabbit/src/rabbit_amqp_management.erl | 2 +- deps/rabbit/src/rabbit_amqqueue.erl | 14 +++++++------ .../rabbit/src/rabbit_jms_selector_parser.erl | 17 ++-------------- deps/rabbit/src/rabbit_queue_location.erl | 2 +- deps/rabbit/src/rabbit_quorum_queue.erl | 2 +- deps/rabbit/src/rabbit_stream_coordinator.erl | 4 ++-- deps/rabbit/test/quorum_queue_SUITE.erl | 20 +++++++++---------- ...etheus_rabbitmq_core_metrics_collector.erl | 3 +-- 9 files changed, 26 insertions(+), 48 deletions(-) diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 8bf5a2345f19..9f5db9f0b2ee 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -29,7 +29,6 @@ % exclusive_owner get_exclusive_owner/1, get_leader_node/1, - get_nodes/1, % name (#resource) get_name/1, set_name/2, @@ -394,15 +393,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader; get_leader_node(#amqqueue{pid = none}) -> none; get_leader_node(#amqqueue{pid = Pid}) -> node(Pid). --spec get_nodes(amqqueue_v2()) -> [node(),...]. - -get_nodes(Q) -> - case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [get_leader_node(Q)] - end. % operator_policy diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 7e7fb84da6fa..8ae21342374a 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -464,7 +464,7 @@ encode_queue(Q, NumMsgs, NumConsumers) -> {Leader :: node() | none, Replicas :: [node(),...]}. queue_topology(Q) -> Leader = amqqueue:get_leader_node(Q), - Replicas = amqqueue:get_nodes(Q), + Replicas = rabbit_amqqueue:get_nodes(Q), {Leader, Replicas}. decode_exchange({map, KVList}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index c2f954ccbfdc..55f83d36a76e 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -23,6 +23,7 @@ -export([list/0, list_durable/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). +-export([get_nodes/1]). -export([count/0]). -export([list_down/1, list_down/2, list_all/1, count/1, list_names/0, list_names/1, list_local_names/0, @@ -1233,6 +1234,12 @@ list() -> count() -> rabbit_db_queue:count(). +-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...]. + +get_nodes(Q) -> + [{members, Nodes}] = info(Q, [members]), + Nodes. + -spec list_names() -> [name()]. list_names() -> @@ -2042,12 +2049,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) ). get_quorum_nodes(Q) -> - case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [] - end. + rabbit_amqqueue:get_nodes(Q). -spec prepend_extra_bcc(Qs) -> Qs when Qs :: [amqqueue:amqqueue() | diff --git a/deps/rabbit/src/rabbit_jms_selector_parser.erl b/deps/rabbit/src/rabbit_jms_selector_parser.erl index 8a62cc841b5d..d3c3d382e544 100644 --- a/deps/rabbit/src/rabbit_jms_selector_parser.erl +++ b/deps/rabbit/src/rabbit_jms_selector_parser.erl @@ -1,6 +1,4 @@ --file("rabbit_jms_selector_parser.yrl", 0). -module(rabbit_jms_selector_parser). --file("rabbit_jms_selector_parser.erl", 3). -export([parse/1, parse_and_scan/1, format_error/1]). -file("rabbit_jms_selector_parser.yrl", 122). @@ -26,9 +24,7 @@ process_escape_char({string, Line, Value}) -> %% %% %CopyrightBegin% %% -%% SPDX-License-Identifier: Apache-2.0 -%% -%% Copyright Ericsson AB 1996-2025. All Rights Reserved. +%% Copyright Ericsson AB 1996-2021. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -50,16 +46,10 @@ process_escape_char({string, Line, Value}) -> -type yecc_ret() :: {'error', _} | {'ok', _}. --ifdef (YECC_PARSE_DOC). --doc ?YECC_PARSE_DOC. --endif. -spec parse(Tokens :: list()) -> yecc_ret(). parse(Tokens) -> yeccpars0(Tokens, {no_func, no_location}, 0, [], []). --ifdef (YECC_PARSE_AND_SCAN_DOC). --doc ?YECC_PARSE_AND_SCAN_DOC. --endif. -spec parse_and_scan({function() | {atom(), atom()}, [_]} | {atom(), atom(), [_]}) -> yecc_ret(). parse_and_scan({F, A}) -> @@ -68,9 +58,6 @@ parse_and_scan({M, F, A}) -> Arity = length(A), yeccpars0([], {{fun M:F/Arity, A}, no_location}, 0, [], []). --ifdef (YECC_FORMAT_ERROR_DOC). --doc ?YECC_FORMAT_ERROR_DOC. --endif. -spec format_error(any()) -> [char() | list()]. format_error(Message) -> case io_lib:deep_char_list(Message) of @@ -212,7 +199,7 @@ yecctoken2string1(Other) -> --file("rabbit_jms_selector_parser.erl", 215). +-file("rabbit_jms_selector_parser.erl", 202). -dialyzer({nowarn_function, yeccpars2/7}). -compile({nowarn_unused_function, yeccpars2/7}). diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index 0f204f97347e..4b63c2b99d2f 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) -> Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]), Queues = GetQueues(), Counters = lists:foldl(fun(Q, Acc) -> - #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), lists:foldl(fun(N, A) when is_map_key(N, A) -> maps:update_with(N, fun(C) -> C+1 end, A); diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 286067a83c86..c26e9da1f485 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -2176,7 +2176,7 @@ force_checkpoint_on_queue(QName) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), - Nodes = amqqueue:get_nodes(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), _ = [ra:cast_aux_command({RaName, Node}, force_checkpoint) || Node <- Nodes], ok; diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 9b25d8f23203..b16f96b73c6f 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -152,8 +152,8 @@ stop() -> new_stream(Q, LeaderNode) when ?is_amqqueue(Q) andalso is_atom(LeaderNode) -> - #{name := StreamId, - nodes := Nodes} = amqqueue:get_type_state(Q), + #{name := StreamId} = amqqueue:get_type_state(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), %% assertion leader is in nodes configuration true = lists:member(LeaderNode, Nodes), process_command({new_stream, StreamId, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index a87391f18ebf..3fa402510b54 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1310,7 +1310,7 @@ force_shrink_member_to_current_member(Config) -> wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), ?assertEqual(3, length(Nodes0)), rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, @@ -1319,7 +1319,7 @@ force_shrink_member_to_current_member(Config) -> wait_for_messages_ready([Server0], RaName, 3), {ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes1} = amqqueue:get_type_state(Q1), + Nodes1 = rabbit_amqqueue:get_nodes(Q1), ?assertEqual(1, length(Nodes1)), %% grow queues back to all nodes @@ -1327,7 +1327,7 @@ force_shrink_member_to_current_member(Config) -> wait_for_messages_ready([Server0], RaName, 3), {ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes2} = amqqueue:get_type_state(Q2), + Nodes2 = rabbit_amqqueue:get_nodes(Q2), ?assertEqual(3, length(Nodes2)) end. @@ -1354,7 +1354,7 @@ force_all_queues_shrink_member_to_current_member(Config) -> rabbit_ct_client_helpers:publish(Ch, Q, 3), wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), ?assertEqual(3, length(Nodes0)) end || Q <- QQs], @@ -1365,7 +1365,7 @@ force_all_queues_shrink_member_to_current_member(Config) -> RaName = ra_name(Q), wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), ?assertEqual(1, length(Nodes0)) end || Q <- QQs], @@ -1376,7 +1376,7 @@ force_all_queues_shrink_member_to_current_member(Config) -> RaName = ra_name(Q), wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), ?assertEqual(3, length(Nodes0)) end || Q <- QQs] end. @@ -1420,7 +1420,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts], @@ -1432,7 +1432,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), case VHost of VHost1 -> ?assertEqual(3, length(Nodes0)); VHost2 -> ?assertEqual(1, length(Nodes0)) @@ -1447,7 +1447,7 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), + Nodes0 = rabbit_amqqueue:get_nodes(Q0), ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts] end. @@ -2955,7 +2955,7 @@ delete_member_member_already_deleted(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server2])), {ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes = rabbit_amqqueue:get_nodes(Q), ?assertEqual(1, length(Nodes)), ok. diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 1e1b00b23aa9..f0f00e21c3b1 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -457,8 +457,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) -> true -> Acc; false -> Type = amqqueue:get_type(Q), - TypeState = amqqueue:get_type_state(Q), - Members = maps:get(nodes, TypeState, []), + Members = rabbit_amqqueue:get_nodes(Q), case membership(amqqueue:get_pid(Q), Members) of not_a_member -> Acc; From 35ef780efdd4fbf7da2f1bb7432f81841993f963 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Fri, 1 Aug 2025 10:43:53 +0200 Subject: [PATCH 3/4] wip: add feature flag and put RaUids in `nodes` --- deps/rabbit/src/rabbit_core_ff.erl | 7 ++ deps/rabbit/src/rabbit_quorum_queue.erl | 94 ++++++++++++++++--------- deps/rabbit/test/quorum_queue_SUITE.erl | 10 ++- 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index fc255f6a4b0b..263a51699ffe 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -211,3 +211,10 @@ stability => stable, depends_on => ['rabbitmq_4.0.0'] }}). + +-rabbit_feature_flag( + {'track_qq_members_uids', + #{desc => "Track queue members UIDs in the metadata store", + stability => stable, + depends_on => [] + }}). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c26e9da1f485..ac0e28ef0ce0 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -278,9 +278,14 @@ start_cluster(Q) -> UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))} || Node <- [LeaderNode | FollowerNodes]]), NewQ0 = amqqueue:set_pid(Q, LeaderId), - NewQ1 = amqqueue:set_type_state(NewQ0, - #{nodes => [LeaderNode | FollowerNodes], - uids => UIDs}), + NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of + false -> + amqqueue:set_type_state(NewQ0, + #{nodes => [LeaderNode | FollowerNodes]}); + true -> + amqqueue:set_type_state(NewQ0, + #{nodes => UIDs}) + end, Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes, rabbit_fifo, version, [], @@ -725,7 +730,7 @@ repair_amqqueue_nodes(Q0) -> {Name, _} = amqqueue:get_pid(Q0), Members = ra_leaderboard:lookup_members(Name), RaNodes = [N || {_, N} <- Members], - #{nodes := Nodes} = amqqueue:get_type_state(Q0), + Nodes = get_nodes(Q0), case lists:sort(RaNodes) =:= lists:sort(Nodes) of true -> %% up to date @@ -734,7 +739,18 @@ repair_amqqueue_nodes(Q0) -> %% update amqqueue record Fun = fun (Q) -> TS0 = amqqueue:get_type_state(Q), - TS = TS0#{nodes => RaNodes}, + TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) + andalso has_uuid_tracking(TS0) + of + false -> + TS0#{nodes => RaNodes}; + true -> + RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of, + [?RA_SYSTEM, Name], + ?RPC_TIMEOUT)} + || N <- RaNodes]), + TS0#{nodes => RaUids} + end, amqqueue:set_type_state(Q, TS) end, _ = rabbit_amqqueue:update(QName, Fun), @@ -795,10 +811,9 @@ recover(_Vhost, Queues) -> QName = amqqueue:get_name(Q0), MutConf = make_mutable_config(Q0), RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), - QTypeState0 = amqqueue:get_type_state(Q0), - RaUIds = maps:get(uids, QTypeState0, undefined), - QTypeState = case RaUIds of - undefined -> + #{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0), + QTypeState = case Nodes of + List when is_list(List) -> %% Queue is not aware of node to uid mapping, do nothing QTypeState0; #{node() := RaUId} -> @@ -809,7 +824,7 @@ recover(_Vhost, Queues) -> %% does not match the one returned by ra_directory, regen uid maybe_delete_data_dir(RaUId), NewRaUId = ra:new_uid(ra_lib:to_binary(Name)), - QTypeState0#{uids := RaUIds#{node() => NewRaUId}} + QTypeState0#{nodes := Nodes#{node() => NewRaUId}} end, Q = amqqueue:set_type_state(Q0, QTypeState), Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of @@ -1412,21 +1427,20 @@ do_add_member(Q0, Node, Membership, Timeout) %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, Members = members(Q0), - QTypeState0 = amqqueue:get_type_state(Q0), - RaUIds = maps:get(uids, QTypeState0, undefined), - QTypeState = case RaUIds of - undefined -> - %% Queue is not aware of node to uid mapping, do nothing - QTypeState0; - #{Node := _} -> - %% Queue is aware and uid for targeted node exists, do nothing - QTypeState0; - _ -> - %% Queue is aware but current node has no UId, regen uid - NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), - QTypeState0#{uids := RaUIds#{Node => NewRaUId}} - end, - Q = amqqueue:set_type_state(Q0, QTypeState), + QTypeState0 = #{nodes := _Nodes}= amqqueue:get_type_state(Q0), + NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), + %QTypeState = case Nodes of + % L when is_list(L) -> + % %% Queue is not aware of node to uid mapping, just add the new node + % QTypeState0#{nodes := lists:usort([Node | Nodes])}; + % #{Node := _} -> + % %% Queue is aware and uid for targeted node exists, do nothing + % QTypeState0; + % _ -> + % %% Queue is aware but current node has no UId, regen uid + % QTypeState0#{nodes := Nodes#{Node => NewRaUId}} + %end, + Q = amqqueue:set_type_state(Q0, QTypeState0), MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), case ra:start_server(?RA_SYSTEM, Conf) of @@ -1442,8 +1456,12 @@ do_add_member(Q0, Node, Membership, Timeout) {ok, {RaIndex, RaTerm}, Leader} -> Fun = fun(Q1) -> Q2 = update_type_state( - Q1, fun(#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:usort([Node | Nodes])} + Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) -> + Ts#{nodes => lists:usort([Node | NodesList])}; + (#{nodes := #{Node := _} = _NodesMap} = Ts) -> + Ts; + (#{nodes := NodesMap} = Ts) when is_map(NodesMap) -> + Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)} end), amqqueue:set_pid(Q2, Leader) end, @@ -1516,12 +1534,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> Fun = fun(Q1) -> update_type_state( Q1, - fun(#{nodes := Nodes, - uids := UIds} = Ts) -> - Ts#{nodes => lists:delete(Node, Nodes), - uids => maps:remove(Node, UIds)}; - (#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:delete(Node, Nodes)} + fun(#{nodes := Nodes} = Ts) when is_list(Nodes) -> + Ts#{nodes => lists:delete(Node, Nodes)}; + (#{nodes := Nodes} = Ts) when is_map(Nodes) -> + Ts#{nodes => maps:remove(Node, Nodes)} end) end, _ = rabbit_amqqueue:update(QName, Fun), @@ -2067,7 +2083,12 @@ make_mutable_config(Q) -> get_nodes(Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), - Nodes. + case Nodes of + List when is_list(List) -> + List; + Map when is_map(Map) -> + maps:keys(Map) + end. get_connected_nodes(Q) when ?is_amqqueue(Q) -> ErlangNodes = [node() | nodes()], @@ -2423,3 +2444,8 @@ queue_vm_stats_sups() -> queue_vm_ets() -> {[quorum_ets], [[ra_log_ets]]}. + +has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) -> + true; +has_uuid_tracking(_QTypeState) -> + false. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 3fa402510b54..a50e8152a433 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -2862,10 +2862,14 @@ add_member_2(Config) -> {<<"x-quorum-initial-group-size">>, long, 1}])), ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, [<<"/">>, QQ, Server0, 5000])), - Info = rpc:call(Server0, rabbit_quorum_queue, infos, - [rabbit_misc:r(<<"/">>, queue, QQ)]), + #{online := Onlines} = ?awaitMatch(#{online := [_One, _Two]}, + maps:from_list(rpc:call(Server0, + rabbit_quorum_queue, + infos, + [rabbit_misc:r(<<"/">>, queue, QQ)])), + 3000), Servers = lists:sort([Server0, Server1]), - ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). + ?assertEqual(Servers, lists:sort(Onlines)). delete_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), From f48962d18020b20cb44290f216d74ad3735cebca Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Wed, 6 Aug 2025 14:18:41 +0200 Subject: [PATCH 4/4] Update nodes entry on make_ra_conf --- deps/rabbit/src/rabbit_quorum_queue.erl | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index ac0e28ef0ce0..cfad17c7059f 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -1427,20 +1427,20 @@ do_add_member(Q0, Node, Membership, Timeout) %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, Members = members(Q0), - QTypeState0 = #{nodes := _Nodes}= amqqueue:get_type_state(Q0), + QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0), NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), - %QTypeState = case Nodes of - % L when is_list(L) -> - % %% Queue is not aware of node to uid mapping, just add the new node - % QTypeState0#{nodes := lists:usort([Node | Nodes])}; - % #{Node := _} -> - % %% Queue is aware and uid for targeted node exists, do nothing - % QTypeState0; - % _ -> - % %% Queue is aware but current node has no UId, regen uid - % QTypeState0#{nodes := Nodes#{Node => NewRaUId}} - %end, - Q = amqqueue:set_type_state(Q0, QTypeState0), + QTypeState = case Nodes of + L when is_list(L) -> + %% Queue is not aware of node to uid mapping, just add the new node + QTypeState0#{nodes => lists:usort([Node | Nodes])}; + #{Node := _} -> + %% Queue is aware and uid for targeted node exists, do nothing + QTypeState0; + _ -> + %% Queue is aware but current node has no UId, regen uid + QTypeState0#{nodes => Nodes#{Node => NewRaUId}} + end, + Q = amqqueue:set_type_state(Q0, QTypeState), MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), case ra:start_server(?RA_SYSTEM, Conf) of @@ -1458,7 +1458,7 @@ do_add_member(Q0, Node, Membership, Timeout) Q2 = update_type_state( Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) -> Ts#{nodes => lists:usort([Node | NodesList])}; - (#{nodes := #{Node := _} = _NodesMap} = Ts) -> + (#{nodes := #{Node := _}} = Ts) -> Ts; (#{nodes := NodesMap} = Ts) when is_map(NodesMap) -> Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)} @@ -2047,7 +2047,7 @@ make_ra_conf(Q, ServerId, TickTimeout, [{ClusterName, _} | _] = Members = members(Q), {_, Node} = ServerId, UId = case amqqueue:get_type_state(Q) of - #{uids := #{Node := Id}} -> + #{nodes := #{Node := Id}} -> Id; _ -> %% Queue was declared on an older version of RabbitMQ