diff --git a/.github/workflows/test-make-target.yaml b/.github/workflows/test-make-target.yaml index 547e39f39a9a..a9adcd20a4f2 100644 --- a/.github/workflows/test-make-target.yaml +++ b/.github/workflows/test-make-target.yaml @@ -143,5 +143,6 @@ jobs: name: CT logs (${{ inputs.plugin }} ${{ inputs.make_target }} OTP-${{ inputs.erlang_version }} ${{ inputs.metadata_store }}${{ inputs.mixed_clusters && ' mixed' || '' }}) path: | logs/ + deps/rabbitmq_cli/logs/ # !logs/**/log_private if-no-files-found: ignore diff --git a/deps/rabbit/src/rabbit_vhosts.erl b/deps/rabbit/src/rabbit_vhosts.erl index 7bc44f4135d6..58cc00f3ffa8 100644 --- a/deps/rabbit/src/rabbit_vhosts.erl +++ b/deps/rabbit/src/rabbit_vhosts.erl @@ -127,7 +127,7 @@ start_processes_for_all(Nodes) -> -spec start_processes_for_all() -> 'ok'. start_processes_for_all() -> - start_processes_for_all(rabbit_nodes:list_reachable()). + start_processes_for_all(rabbit_nodes:list_running()). %% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0. -spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index d9386c137d03..82e58829422c 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1867,6 +1867,11 @@ link_target_queue_deleted(QType, Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), ok = wait_for_accepted(DTag1), + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock delivery to the target queue to do nothing. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), Mod = rabbit_queue_type, @@ -1927,6 +1932,11 @@ target_queues_deleted_accepted(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), ok = wait_for_accepted(DTag1), + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock to deliver only to q1. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), Mod = rabbit_queue_type, @@ -4073,7 +4083,7 @@ list_connections(Config) -> end, {ok, StdOut0} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "protocol"]), - Protocols0 = re:split(StdOut0, <<"\n">>, [trim]), + Protocols0 = re:split(string:trim(StdOut0), <<"\n">>, [trim]), %% Remove any whitespaces. Protocols1 = [binary:replace(Subject, <<" ">>, <<>>, [global]) || Subject <- Protocols0], Protocols = lists:sort(Protocols1), @@ -4726,6 +4736,11 @@ idle_time_out_on_server(Config) -> after 30000 -> ct:fail({missing_event, ?LINE}) end, + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock the server socket to not have received any bytes. rabbit_ct_broker_helpers:setup_meck(Config), ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), diff --git a/deps/rabbit/test/amqp_filter_sql_SUITE.erl b/deps/rabbit/test/amqp_filter_sql_SUITE.erl index 6daeb3f5f307..0168fc815c1f 100644 --- a/deps/rabbit/test/amqp_filter_sql_SUITE.erl +++ b/deps/rabbit/test/amqp_filter_sql_SUITE.erl @@ -26,6 +26,7 @@ flush/1, wait_for_credit/1, wait_for_accepts/1, + send_message/2, send_messages/3, detach_link_sync/1, end_session_sync/1, @@ -98,10 +99,10 @@ multiple_sections(Config) -> To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>), ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:set_headers( #{priority => 200}, @@ -125,7 +126,7 @@ multiple_sections(Config) -> <<"k3">> => true, <<"k4">> => <<"hey👋"/utf8>>}, amqp10_msg:new(<<"t2">>, <<"m2">>))))), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:set_properties( #{group_id => <<"my group ID">>}, @@ -222,13 +223,13 @@ filter_few_messages_from_many(Config) -> {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), ok = wait_for_credit(Sender), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:set_properties( #{group_id => <<"my group ID">>}, amqp10_msg:new(<<"t1">>, <<"first msg">>))), ok = send_messages(Sender, 5000, false), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:set_properties( #{group_id => <<"my group ID">>}, @@ -278,7 +279,7 @@ filter_few_messages_from_many(Config) -> %% We previously set drain=true for Receiver1 ok = assert_credit_exhausted(Receiver1, ?LINE), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:set_properties( #{group_id => <<"my group ID">>}, @@ -328,7 +329,7 @@ sql_and_bloom_filter(Config) -> {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), ok = wait_for_credit(Sender), - ok = amqp10_client:send_msg( + ok = send_message( Sender, amqp10_msg:set_message_annotations( #{<<"x-stream-filter-value">> => <<"v1">>}, diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index 8a00be3d11dd..a42c25a7aa9a 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -52,7 +52,7 @@ groups() -> suite() -> [ - {timetrap, {minutes, 2}} + {timetrap, {minutes, 5}} ]. init_per_suite(Config) -> diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index 3db3d621a147..df6599c4ca07 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -16,6 +16,7 @@ flush/1, wait_for_credit/1, wait_for_accepts/1, + send_message/2, send_messages/3, send_messages/4, detach_link_sync/1, end_session_sync/1, @@ -87,18 +88,10 @@ wait_for_accepts(N) -> ct:fail({missing_accepted, N}) end. -send_messages(Sender, Left, Settled) -> - send_messages(Sender, Left, Settled, <<>>). - -send_messages(_, 0, _, _) -> - ok; -send_messages(Sender, Left, Settled, BodySuffix) -> - Bin = integer_to_binary(Left), - Body = <>, - Msg = amqp10_msg:new(Bin, Body, Settled), +send_message(Sender, Msg) -> case amqp10_client:send_msg(Sender, Msg) of ok -> - send_messages(Sender, Left - 1, Settled, BodySuffix); + ok; {error, insufficient_credit} -> ok = wait_for_credit(Sender), %% The credited event we just processed could have been received some time ago, @@ -110,9 +103,21 @@ send_messages(Sender, Left, Settled, BodySuffix) -> %% but do not process the credited event in our mailbox. %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). - send_messages(Sender, Left, Settled, BodySuffix) + send_message(Sender, Msg) end. +send_messages(Sender, Left, Settled) -> + send_messages(Sender, Left, Settled, <<>>). + +send_messages(_, 0, _, _) -> + ok; +send_messages(Sender, Left, Settled, BodySuffix) -> + Bin = integer_to_binary(Left), + Body = <>, + Msg = amqp10_msg:new(Bin, Body, Settled), + ok = send_message(Sender, Msg), + send_messages(Sender, Left - 1, Settled, BodySuffix). + detach_link_sync(Link) -> ok = amqp10_client:detach_link(Link), ok = wait_for_link_detach(Link). diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index 9cc09ceaac98..60d78e5d46df 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -203,7 +203,9 @@ stop_slave_node(Node) -> persistent_term:erase({?MODULE, Node}), ct:pal("- Stopping slave node `~ts`...", [Node]), - _ = peer:stop(NodePid) + Ret = catch peer:stop(NodePid), + ct:pal(" Ret = ~0p", [Ret]), + ok end. connect_nodes([FirstNode | OtherNodes] = Nodes) -> diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl index 854250543846..15ae436c656c 100644 --- a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl @@ -130,7 +130,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn1] = open_connections(Config, [0]), [Chan1] = open_channels(Conn1, 1), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(1, count_channels_in(Config, Username), ?A_TOUT), [#tracked_channel{username = Username}] = channels_in(Config, Username), ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), @@ -147,7 +147,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), - [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?awaitMatch([#tracked_connection{username = Username2}], connections_in(Config, Username2), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), @@ -157,7 +157,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn3] = open_connections(Config, [0]), Chans3 = [_|_] = open_channels(Conn3, 5), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), @@ -172,7 +172,7 @@ single_node_user_connection_channel_tracking(Config) -> [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4], kill_connections([Conn4]), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), @@ -182,9 +182,11 @@ single_node_user_connection_channel_tracking(Config) -> [Conn5] = open_connections(Config, [0]), Chans5 = [_|_] = open_channels(Conn5, 7), ?awaitMatch(2, count_connections_in(Config, Username), ?A_TOUT), - [Username, Username] = - lists:map(fun (#tracked_connection{username = U}) -> U end, - connections_in(Config, Username)), + ?awaitMatch( + [Username, Username], + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username)), + ?A_TOUT), ?awaitMatch(12, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(12, tracked_user_channel_count(Config, Username), ?A_TOUT), ?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT), diff --git a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl index e4884f8eba60..30a0d5e45116 100644 --- a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl @@ -115,28 +115,42 @@ single_node_list_of_user(Config) -> [Conn1] = open_connections(Config, [{0, Username1}]), ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), close_connections([Conn1]), ?awaitMatch(0, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), [Conn2] = open_connections(Config, [{0, Username2}]), ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?awaitMatch( + [#tracked_connection{username = Username2}], + connections_in(Config, Username2), + ?AWAIT_TIMEOUT), [Conn3] = open_connections(Config, [{0, Username1}]), ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), [Conn4] = open_connections(Config, [{0, Username1}]), kill_connections([Conn4]), ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), [Conn5] = open_connections(Config, [{0, Username1}]), ?awaitMatch(2, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), - [Username1, Username1] = - lists:map(fun (#tracked_connection{username = U}) -> U end, - connections_in(Config, Username1)), + ?awaitMatch( + [Username1, Username1], + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username1)), + ?AWAIT_TIMEOUT), close_connections([Conn2, Conn3, Conn5]), rabbit_ct_broker_helpers:delete_user(Config, Username2), diff --git a/deps/rabbit/test/proxy_protocol_SUITE.erl b/deps/rabbit/test/proxy_protocol_SUITE.erl index 72e9e37c4b98..a3abc23602e3 100644 --- a/deps/rabbit/test/proxy_protocol_SUITE.erl +++ b/deps/rabbit/test/proxy_protocol_SUITE.erl @@ -10,6 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile(export_all). -define(TIMEOUT, 5000). @@ -65,8 +67,10 @@ proxy_protocol_v1(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. proxy_protocol_v1_tls(Config) -> @@ -80,8 +84,10 @@ proxy_protocol_v1_tls(Config) -> {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. proxy_protocol_v2_local(Config) -> @@ -97,13 +103,22 @@ proxy_protocol_v2_local(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. connection_name() -> - Pids = pg_local:get_members(rabbit_connections), - Pid = lists:nth(1, Pids), + ?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000), + [Pid] = pg_local:get_members(rabbit_connections), {dictionary, Dict} = process_info(Pid, dictionary), {process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict), ConnectionName. + +wait_for_connection_close(Config) -> + ?awaitMatch( + [], + rabbit_ct_broker_helpers:rpc( + Config, 0, pg_local, get_members, [rabbit_connnections]), + 30000). diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index 20e0842c865a..fc2ad83aa88f 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -230,7 +230,7 @@ delivery_limit(Config) -> {_, #amqp_msg{props = #'P_basic'{headers = Headers}}} = ?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, amqp_channel:call(Ch, #'basic.get'{queue = TargetQ}), - 1000), + 30000), assert_dlx_headers(Headers, <<"delivery_limit">>, SourceQ), ?assertEqual(1, counted(messages_dead_lettered_delivery_limit_total, Config)), eventually(?_assertEqual(1, counted(messages_dead_lettered_confirmed_total, Config))). diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl index 87f51a4a62b3..1e6657649911 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl @@ -896,6 +896,7 @@ test_failed_connection_with_a_token_with_insufficient_resource_permission(Config ?assertExit({{shutdown, {server_initiated_close, 403, _}}, _}, amqp_channel:call(Ch, #'queue.declare'{queue = <<"alt-prefix.eq.1">>, exclusive = true})), + close_connection(Conn). test_failed_token_refresh_case1(Config) -> @@ -941,7 +942,7 @@ test_failed_token_refresh_case2(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 530, _}}}, _}, amqp_connection:open_channel(Conn)), - close_connection(Conn). + wait_for_connection_exit(Conn). cannot_change_username_on_refreshed_token(Config) -> Jwk = @@ -983,4 +984,14 @@ rpc_get_env(Config, Par) -> [rabbitmq_auth_backend_oauth2, Par]). rpc_get_env(Config, Par, Default) -> rpc(Config, 0, application, get_env, - [rabbitmq_auth_backend_oauth2, Par, Default]). \ No newline at end of file + [rabbitmq_auth_backend_oauth2, Par, Default]). + +wait_for_connection_exit(Conn) -> + MRef = erlang:monitor(process, Conn), + receive + {'DOWN', MRef, _Type, _Conn, Reason} -> + ct:pal("Connection ~0p exited: ~p", [Conn, Reason]), + ok + after 30000 -> + ct:fail("Connection ~0p is still up after 30 seconds", [Conn]) + end. diff --git a/deps/rabbitmq_cli/.gitignore b/deps/rabbitmq_cli/.gitignore index 43c231de0dd8..9b987ada02eb 100644 --- a/deps/rabbitmq_cli/.gitignore +++ b/deps/rabbitmq_cli/.gitignore @@ -1 +1,2 @@ /deps/ +/logs diff --git a/deps/rabbitmq_cli/Makefile b/deps/rabbitmq_cli/Makefile index 0361a898c4b0..fc86f22d0ac3 100644 --- a/deps/rabbitmq_cli/Makefile +++ b/deps/rabbitmq_cli/Makefile @@ -123,7 +123,11 @@ tests:: escript test-deps $(verbose) $(MAKE) -C ../../ install-cli $(verbose) $(MAKE) -C ../../ start-background-broker \ PLUGINS="rabbitmq_federation rabbitmq_stomp rabbitmq_stream_management amqp_client" \ - $(if $(filter khepri,$(RABBITMQ_METADATA_STORE)),,RABBITMQ_FEATURE_FLAGS="-khepri_db") + $(if $(filter khepri,$(RABBITMQ_METADATA_STORE)),,RABBITMQ_FEATURE_FLAGS="-khepri_db"); \ + rm -f logs; \ + log_dir=$$(../../sbin/rabbitmqctl eval 'io:format("~s~n", [maps:get(log_base_dir,rabbit_prelaunch:get_context())]).'); \ + log_dir=$$(echo "$$log_dir" | head -n 1); \ + ln -s "$$log_dir" logs $(gen_verbose) $(MIX_TEST) \ $(if $(RABBITMQ_METADATA_STORE),--exclude $(filter-out $(RABBITMQ_METADATA_STORE),khepri mnesia),) \ $(TEST_FILE); \ @@ -160,7 +164,7 @@ endif clean:: clean-mix clean-mix: - $(gen_verbose) rm -f $(ESCRIPT_FILE) $(LINKED_ESCRIPTS) + $(gen_verbose) rm -f $(ESCRIPT_FILE) $(LINKED_ESCRIPTS) logs $(verbose) echo y | mix clean format: diff --git a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl index 35b3a5a70e5b..641f7e8596a3 100644 --- a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl +++ b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl @@ -26,6 +26,9 @@ all() -> {group, khepri_migration} ]. +suite() -> + [{timetrap, {minutes, 5}}]. + groups() -> [ {routing_tests, [], routing_tests()}, @@ -156,7 +159,7 @@ custom_header_undefined(Config) -> Exchange = <<"my exchange">>, Queue = <<"my queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), #'exchange.declare_ok'{} = amqp_channel:call( Ch, #'exchange.declare' { @@ -179,7 +182,7 @@ custom_header_undefined(Config) -> ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, amqp_channel:call(Ch, #'basic.get'{queue = Queue})), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), clean_up_test_topology(Config, Exchange, [Queue]), ok. @@ -373,7 +376,7 @@ test_with_timestamp(Config, Qs) -> Qs). test_mutually_exclusive_arguments(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), process_flag(trap_exit, true), Cmd = #'exchange.declare'{ @@ -384,11 +387,11 @@ test_mutually_exclusive_arguments(Config) -> }, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_non_supported_property(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), process_flag(trap_exit, true), Cmd = #'exchange.declare'{ @@ -398,7 +401,7 @@ test_non_supported_property(Config) -> }, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. rnd() -> @@ -411,13 +414,13 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues) -> test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT). test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) -> - Chan = rabbit_ct_client_helpers:open_channel(Config), - #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), - CHX = <<"e">>, clean_up_test_topology(Config, CHX, Queues), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), + #'exchange.declare_ok'{} = amqp_channel:call(Chan, #'exchange.declare' { @@ -464,11 +467,11 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, Itera [Chi, Obs]), clean_up_test_topology(Config, CHX, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_binding_with_negative_routing_key(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"bind-fail">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -482,15 +485,15 @@ test_binding_with_negative_routing_key(Config) -> Cmd = #'queue.bind'{exchange = <<"bind-fail">>, routing_key = <<"-1">>}, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), - rabbit_ct_client_helpers:close_channel(Chan), - rabbit_ct_client_helpers:close_channel(Ch2), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2), ok. test_binding_with_non_numeric_routing_key(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"bind-fail">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -505,10 +508,11 @@ test_binding_with_non_numeric_routing_key(Config) -> routing_key = <<"not-a-number">>}, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2), ok. %% @@ -516,7 +520,7 @@ test_binding_with_non_numeric_routing_key(Config) -> %% test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_recovery_between_node_restarts">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -547,11 +551,11 @@ test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> assert_ring_consistency(Config, X), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_queue_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_queue_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -576,11 +580,11 @@ test_hash_ring_updates_when_queue_is_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, [Q]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_multiple_queues_are_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -611,7 +615,7 @@ test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure(Config) -> @@ -706,7 +710,7 @@ test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closu ok. test_hash_ring_updates_when_exchange_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_exchange_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -734,11 +738,11 @@ test_hash_ring_updates_when_exchange_is_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_queue_is_unbound(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_queue_is_unbound">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -769,11 +773,11 @@ test_hash_ring_updates_when_queue_is_unbound(Config) -> ?assertEqual(8, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -818,11 +822,11 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Co assert_ring_consistency(Config, X), clean_up_test_topology(Config, X, [Q1, Q2]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -872,14 +876,14 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted( ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, [Q1, Q2]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. %% Follows the setup described in %% https://github.com/rabbitmq/rabbitmq-server/issues/3386#issuecomment-1103929292 node_restart(Config) -> - Chan1 = rabbit_ct_client_helpers:open_channel(Config, 1), - Chan2 = rabbit_ct_client_helpers:open_channel(Config, 2), + {Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1), + {Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2), X = atom_to_binary(?FUNCTION_NAME), #'exchange.declare_ok'{} = amqp_channel:call(Chan1, @@ -903,8 +907,8 @@ node_restart(Config) -> F(Chan1, QsNode1), F(Chan2, QsNode2), - rabbit_ct_client_helpers:close_channel(Chan1), - rabbit_ct_client_helpers:close_channel(Chan2), + rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2), rabbit_ct_broker_helpers:restart_node(Config, 1), rabbit_ct_broker_helpers:restart_node(Config, 2), @@ -942,13 +946,14 @@ count_buckets_of_exchange(Config, X) -> from_mnesia_to_khepri(Config) -> Queues = [Q1, Q2, Q3, Q4] = ?RoutingTestQs, IterationCount = ?DEFAULT_SAMPLE_COUNT, - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), - #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), CHX = <<"e">>, clean_up_test_topology(Config, CHX, Queues), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), + #'exchange.declare_ok'{} = amqp_channel:call(Chan, #'exchange.declare' { @@ -973,36 +978,32 @@ from_mnesia_to_khepri(Config) -> case rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db) of ok -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_consistent_hash_exchange_raft_based_metadata_store) of - ok -> - [amqp_channel:call(Chan, - #'basic.publish'{exchange = CHX, routing_key = rnd()}, - #amqp_msg{props = #'P_basic'{}, payload = <<>>}) - || _ <- lists:duplicate(IterationCount, const)], - amqp_channel:wait_for_confirms(Chan, 300), - timer:sleep(500), - Counts = - [begin - #'queue.declare_ok'{message_count = M} = - amqp_channel:call(Chan, #'queue.declare' {queue = Q, - exclusive = true}), - M - end || Q <- Queues], - ?assertEqual(IterationCount, lists:sum(Counts)), %% All messages got routed - %% Chi-square test - %% H0: routing keys are not evenly distributed according to weight - Expected = [IterationCount div 6, IterationCount div 6, (IterationCount div 6) * 2, (IterationCount div 6) * 2], - Obs = lists:zip(Counts, Expected), - Chi = lists:sum([((O - E) * (O - E)) / E || {O, E} <- Obs]), - ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", - [Chi, Obs]), - clean_up_test_topology(Config, CHX, Queues), - rabbit_ct_client_helpers:close_channel(Chan), - ok; - Skip -> - Skip - end; + [amqp_channel:call(Chan, + #'basic.publish'{exchange = CHX, routing_key = rnd()}, + #amqp_msg{props = #'P_basic'{}, payload = <<>>}) + || _ <- lists:duplicate(IterationCount, const)], + amqp_channel:wait_for_confirms(Chan, 300), + timer:sleep(500), + Counts = + [begin + #'queue.declare_ok'{message_count = M} = + amqp_channel:call(Chan, #'queue.declare' {queue = Q, + exclusive = true}), + M + end || Q <- Queues], + ?assertEqual(IterationCount, lists:sum(Counts)), %% All messages got routed + %% Chi-square test + %% H0: routing keys are not evenly distributed according to weight + Expected = [IterationCount div 6, IterationCount div 6, (IterationCount div 6) * 2, (IterationCount div 6) * 2], + Obs = lists:zip(Counts, Expected), + Chi = lists:sum([((O - E) * (O - E)) / E || {O, E} <- Obs]), + ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", + [Chi, Obs]), + clean_up_test_topology(Config, CHX, Queues), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + ok; Skip -> + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), Skip end. @@ -1010,12 +1011,12 @@ clean_up_test_topology(Config) -> clean_up_test_topology(Config, none, ?AllQs). clean_up_test_topology(Config, none, Qs) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], - rabbit_ct_client_helpers:close_channel(Ch); + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch); clean_up_test_topology(Config, X, Qs) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch, #'exchange.delete' {exchange = X}), [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], - rabbit_ct_client_helpers:close_channel(Ch). + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index 30d30e8f07ff..40bfc3f68cab 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -185,8 +185,13 @@ init_per_group(Group, Config) -> end] ++ rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - util:enable_plugin(Config2, rabbitmq_mqtt), - Config2. + case Config2 of + _ when is_list(Config2) -> + util:enable_plugin(Config2, rabbitmq_mqtt), + Config2; + {skip, _} -> + Config2 + end. end_per_group(G, Config) when G =:= v4; @@ -1257,7 +1262,6 @@ vhost_connection_limit(Config) -> ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, <<"/">>). count_connections_per_vhost(Config) -> - NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 0), rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_connection_tracking, count_local_tracked_items_in_vhost, @@ -1282,6 +1286,7 @@ user_connection_limit(Config) -> ok = rabbit_ct_broker_helpers:set_user_limits(Config, DefaultUser, #{max_connections => 1}), {ok, C1} = connect_anonymous(Config, <<"client1">>), {ok, _} = emqtt:connect(C1), + ?awaitMatch(1, count_connections_per_vhost(Config), 30000), {ok, C2} = connect_anonymous(Config, <<"client2">>), ExpectedError = expected_connection_limit_error(Config), unlink(C2), diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl index e03f4bcfd492..1f4535e609a7 100644 --- a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl @@ -85,8 +85,13 @@ init_per_testcase(Testcase, Config) -> [fun merge_app_env/1] ++ setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - util:enable_plugin(Config2, rabbitmq_mqtt), - Config2. + case Config2 of + _ when is_list(Config2) -> + util:enable_plugin(Config2, rabbitmq_mqtt), + Config2; + {skip, _} -> + Config2 + end. end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:run_steps(Config, diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index acc6ec95ace1..c0316a21689c 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -204,8 +204,13 @@ init_per_group(Group, Config0) -> Config, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - util:enable_plugin(Config1, rabbitmq_mqtt), - Config1. + case Config1 of + _ when is_list(Config1) -> + util:enable_plugin(Config1, rabbitmq_mqtt), + Config1; + {skip, _} -> + Config1 + end. end_per_group(G, Config) when G =:= cluster_size_1; diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 9345e2e6e563..bcedd1cb09be 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -60,6 +60,7 @@ groups() -> build_info_product_test ]}, {detailed_metrics, [], [ + stream_pub_sub_metrics, detailed_metrics_no_families_enabled_by_default, queue_consumer_count_single_vhost_per_object_test, queue_consumer_count_all_vhosts_per_object_test, @@ -72,7 +73,6 @@ groups() -> vhost_status_metric, exchange_bindings_metric, exchange_names_metric, - stream_pub_sub_metrics, detailed_raft_metrics_test ]}, {special_chars, [], [core_metrics_special_chars]}, @@ -797,6 +797,10 @@ exchange_names_metric(Config) -> ok. stream_pub_sub_metrics(Config) -> + {_, Body0} = http_get_with_pal(Config, "/metrics", [], 200), + Metrics = parse_response(Body0), + ct:pal("Initial metrics: ~p", [Metrics]), + Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", MsgPerBatch1 = 2, {ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), diff --git a/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl index 96e414adc387..580b6ec947d5 100644 --- a/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl @@ -9,6 +9,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). + +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile([export_all, nowarn_export_all]). -import(rabbit_ct_broker_helpers, [rpc/5]). @@ -72,22 +75,23 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). old_to_new_on_old(Config) -> - ok = shovel(?OLD, ?NEW, ?OLD, Config). + ok = shovel(?FUNCTION_NAME, ?OLD, ?NEW, ?OLD, Config). old_to_new_on_new(Config) -> - ok = shovel(?OLD, ?NEW, ?NEW, Config). + ok = shovel(?FUNCTION_NAME, ?OLD, ?NEW, ?NEW, Config). new_to_old_on_old(Config) -> - ok = shovel(?NEW, ?OLD, ?OLD, Config). + ok = shovel(?FUNCTION_NAME, ?NEW, ?OLD, ?OLD, Config). new_to_old_on_new(Config) -> - ok = shovel(?NEW, ?OLD, ?NEW, Config). + ok = shovel(?FUNCTION_NAME, ?NEW, ?OLD, ?NEW, Config). -shovel(SrcNode, DestNode, ShovelNode, Config) -> +shovel(Caller, SrcNode, DestNode, ShovelNode, Config) -> SrcUri = shovel_test_utils:make_uri(Config, SrcNode), DestUri = shovel_test_utils:make_uri(Config, DestNode), - SrcQ = <<"my source queue">>, - DestQ = <<"my destination queue">>, + ShovelName = atom_to_binary(Caller), + SrcQ = <>, + DestQ = <>, Definition = [ {<<"src-uri">>, SrcUri}, {<<"src-protocol">>, <<"amqp10">>}, @@ -96,7 +100,6 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, DestQ} ], - ShovelName = <<"my shovel">>, ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, set, [<<"/">>, <<"shovel">>, ShovelName, Definition, none]), ok = shovel_test_utils:await_shovel(Config, ShovelNode, ShovelName), @@ -125,6 +128,7 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never), Msgs = receive_messages(Receiver, NumMsgs), + ct:pal("~b messages:~n~p", [length(Msgs), Msgs]), lists:map( fun(N) -> Msg = lists:nth(N, Msgs), @@ -136,8 +140,28 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, ShovelName, none]), ExpectedQueueLen = 0, - ?assertEqual([ExpectedQueueLen], rpc(Config, ?OLD, ?MODULE, delete_queues, [])), - ?assertEqual([ExpectedQueueLen], rpc(Config, ?NEW, ?MODULE, delete_queues, [])). + ?awaitMatch( + [{_, ExpectedQueueLen}], + begin + Ret = rpc(Config, ?OLD, ?MODULE, queues_length, []), + ct:pal("Queues on old: ~p", [Ret]), + Ret + end, + 30000), + ?awaitMatch( + [{_, ExpectedQueueLen}], + begin + Ret = rpc(Config, ?NEW, ?MODULE, queues_length, []), + ct:pal("Queues on new: ~p", [Ret]), + Ret + end, + 30000), + ?assertEqual( + [ExpectedQueueLen], + rpc(Config, ?OLD, ?MODULE, delete_queues, [])), + ?assertEqual( + [ExpectedQueueLen], + rpc(Config, ?NEW, ?MODULE, delete_queues, [])). wait_for_credit(Sender) -> receive @@ -170,6 +194,13 @@ flush(Prefix) -> ok end. +queues_length() -> + [begin + #{<<"name">> := Name} = amqqueue:to_printable(Q), + [{messages, N}] = rabbit_amqqueue:info(Q, [messages]), + {Name, N} + end || Q <- rabbit_amqqueue:list()]. + delete_queues() -> [begin {ok, N} = rabbit_amqqueue:delete(Q, false, false, <<"tests">>), diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py index af643737c23c..a08e5f03f51d 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py @@ -126,7 +126,10 @@ def tearDown(self): if self.conn.is_connected(): try: self.conn.disconnect() - except: + except Exception as inst: + print(type(inst)) + print(inst.args) + print(inst) pass elapsed = time.time() - self._started_at print('{} ({}s)'.format(self.id(), round(elapsed, 2))) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt b/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt index fd2cc9d6beb1..789ce525d372 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt @@ -1,3 +1,3 @@ -stomp.py==8.1.0 -pika==1.1.0 +stomp.py==8.2.0 +pika==1.3.2 rabbitman===0.1.0 diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py index 8b15a5b89b4d..32cdd61e9621 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py @@ -20,7 +20,7 @@ def run_unittests(modules): if name.startswith("Test") and issubclass(obj, unittest.TestCase): suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj)) - ts = unittest.TextTestRunner().run(unittest.TestSuite(suite)) + ts = unittest.TextTestRunner(verbosity=10).run(unittest.TestSuite(suite)) if ts.errors or ts.failures: sys.exit(1) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py index 2aed99ec31f9..a5e783d52d75 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py @@ -11,6 +11,7 @@ import base import time import os +import test_util class TestUserGeneratedQueueName(base.BaseTest): @@ -29,6 +30,12 @@ def test_exchange_dest(self): pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) channel = connection.channel() + test_util.rabbitmqctl(['list_queues']) + test_util.rabbitmqctl(['list_connections', 'peer_host', 'peer_port', + 'protocol']) + test_util.rabbitmqctl(['list_stomp_connections', 'peer_host', + 'peer_port', 'protocol']) + # publish a message to the named queue channel.basic_publish( exchange='', @@ -36,11 +43,13 @@ def test_exchange_dest(self): body='Hello World!') # check if we receive the message from the STOMP subscription - self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertTrue(self.listener.wait(30), "initial message not received") self.assertEqual(1, len(self.listener.messages)) - self.conn.disconnect() + # self.conn.disconnect() connection.close() + while not connection.is_closed: + time.sleep(1) def test_topic_dest(self): queueName='my-user-generated-queue-name-topic' @@ -57,6 +66,12 @@ def test_topic_dest(self): pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) channel = connection.channel() + test_util.rabbitmqctl(['list_queues']) + test_util.rabbitmqctl(['list_connections', 'peer_host', 'peer_port', + 'protocol']) + test_util.rabbitmqctl(['list_stomp_connections', 'peer_host', + 'peer_port', 'protocol']) + # publish a message to the named queue channel.basic_publish( exchange='', @@ -64,11 +79,13 @@ def test_topic_dest(self): body='Hello World!') # check if we receive the message from the STOMP subscription - self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertTrue(self.listener.wait(30), "initial message not received") self.assertEqual(1, len(self.listener.messages)) - self.conn.disconnect() + # self.conn.disconnect() connection.close() + while not connection.is_closed: + time.sleep(1) if __name__ == '__main__': diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index e6c69bc17bd1..1b9e6b0b8237 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -211,7 +211,7 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), - log("Stream leader and coordinator leader are on ~p", [L#node.name]), + log("Coordinator leader on: ~0p~nStream leader on: ~0p", [CL, L#node.name]), {ok, So0, C0_00} = stream_test_utils:connect(Config, CL), {ok, So1, C1_00} = stream_test_utils:connect(Config, CF1), diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile b/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile index dae43a1ad68c..994584d771a7 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile @@ -2,7 +2,8 @@ export PATH :=$(CURDIR):$(PATH) HOSTNAME := $(shell hostname) MVN_FLAGS += -Dstream.port=$(STREAM_PORT) \ -Dstream.port.tls=$(STREAM_PORT_TLS) \ - -Dmanagement.port=$(MANAGEMENT_PORT) + -Dmanagement.port=$(MANAGEMENT_PORT) \ + -Dmaven.wagon.http.retryHandler.count=5 .PHONY: tests clean