diff --git a/.github/workflows/test-make-target.yaml b/.github/workflows/test-make-target.yaml index 9724962ae366..499f03338f8f 100644 --- a/.github/workflows/test-make-target.yaml +++ b/.github/workflows/test-make-target.yaml @@ -119,5 +119,6 @@ jobs: name: CT logs (${{ inputs.plugin }} ${{ inputs.make_target }} OTP-${{ inputs.erlang_version }} ${{ inputs.metadata_store }}${{ inputs.mixed_clusters && ' mixed' || '' }}) path: | logs/ + deps/rabbitmq_cli/logs/ # !logs/**/log_private if-no-files-found: ignore diff --git a/deps/rabbit/src/rabbit_vhosts.erl b/deps/rabbit/src/rabbit_vhosts.erl index dc1734c9a96d..26aff7571c16 100644 --- a/deps/rabbit/src/rabbit_vhosts.erl +++ b/deps/rabbit/src/rabbit_vhosts.erl @@ -124,7 +124,7 @@ start_processes_for_all(Nodes) -> -spec start_processes_for_all() -> 'ok'. start_processes_for_all() -> - start_processes_for_all(rabbit_nodes:list_reachable()). + start_processes_for_all(rabbit_nodes:list_running()). %% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0. -spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 1699517f470d..7d72e297cae9 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -1773,6 +1773,11 @@ link_target_queue_deleted(QType, Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), ok = wait_for_accepted(DTag1), + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock delivery to the target queue to do nothing. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), Mod = rabbit_queue_type, @@ -1833,6 +1838,11 @@ target_queues_deleted_accepted(Config) -> ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)), ok = wait_for_accepted(DTag1), + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock to deliver only to q1. rabbit_ct_broker_helpers:setup_meck(Config, [?MODULE]), Mod = rabbit_queue_type, @@ -3979,7 +3989,7 @@ list_connections(Config) -> end, {ok, StdOut0} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "protocol"]), - Protocols0 = re:split(StdOut0, <<"\n">>, [trim]), + Protocols0 = re:split(string:trim(StdOut0), <<"\n">>, [trim]), %% Remove any whitespaces. Protocols1 = [binary:replace(Subject, <<" ">>, <<>>, [global]) || Subject <- Protocols0], Protocols = lists:sort(Protocols1), @@ -4630,6 +4640,11 @@ idle_time_out_on_server(Config) -> after 30000 -> ct:fail({missing_event, ?LINE}) end, + %% Load test module on the broker: we reference an anonymous function + %% from it during the configuration of meck. + [_ | _] = rabbit_ct_broker_helpers:rpc( + Config, ?MODULE, module_info, []), + %% Mock the server socket to not have received any bytes. rabbit_ct_broker_helpers:setup_meck(Config), ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), diff --git a/deps/rabbit/test/amqp_filter_sql_SUITE.erl b/deps/rabbit/test/amqp_filter_sql_SUITE.erl new file mode 100644 index 000000000000..0168fc815c1f --- /dev/null +++ b/deps/rabbit/test/amqp_filter_sql_SUITE.erl @@ -0,0 +1,492 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% Test suite for SQL expressions filtering from a stream. +-module(amqp_filter_sql_SUITE). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_client/include/amqp10_client.hrl"). +-include_lib("amqp10_common/include/amqp10_filter.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). + +-compile([nowarn_export_all, + export_all]). + +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/1]). +-import(amqp_utils, + [init/1, + connection_config/1, + flush/1, + wait_for_credit/1, + wait_for_accepts/1, + send_message/2, + send_messages/3, + detach_link_sync/1, + end_session_sync/1, + close_connection_sync/1]). + +all() -> + [ + {group, cluster_size_1} + ]. + +groups() -> + [ + {cluster_size_1, [shuffle], + [ + multiple_sections, + filter_few_messages_from_many, + sql_and_bloom_filter, + invalid_filter + ]} + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:merge_app_env( + Config, {rabbit, [{stream_tick_interval, 1000}]}). + +end_per_suite(Config) -> + Config. + +init_per_group(_Group, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, Suffix}]), + rabbit_ct_helpers:run_setup_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + %% Assert that every testcase cleaned up. + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), + %% Wait for sessions to terminate before starting the next test case. + eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, []))), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +multiple_sections(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Now = os:system_time(millisecond), + To = rabbitmq_amqp_address:exchange(<<"some exchange">>, <<"routing key">>), + ReplyTo = rabbitmq_amqp_address:queue(<<"some queue">>), + + ok = send_message( + Sender, + amqp10_msg:new(<<"t1">>, <<"m1">>)), + ok = send_message( + Sender, + amqp10_msg:set_headers( + #{priority => 200}, + amqp10_msg:set_properties( + #{message_id => {ulong, 999}, + user_id => <<"guest">>, + to => To, + subject => <<"🐇"/utf8>>, + reply_to => ReplyTo, + correlation_id => <<"corr-123">>, + content_type => <<"text/plain">>, + content_encoding => <<"some encoding">>, + absolute_expiry_time => Now + 100_000, + creation_time => Now, + group_id => <<"my group ID">>, + group_sequence => 16#ff_ff_ff_ff, + reply_to_group_id => <<"other group ID">>}, + amqp10_msg:set_application_properties( + #{<<"k1">> => -3, + <<"k2">> => false, + <<"k3">> => true, + <<"k4">> => <<"hey👋"/utf8>>}, + amqp10_msg:new(<<"t2">>, <<"m2">>))))), + ok = send_message( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:set_application_properties( + #{<<"k1">> => -4}, + amqp10_msg:new(<<"t3">>, <<"m3">>)))), + + ok = wait_for_accepts(3), + ok = detach_link_sync(Sender), + flush(sent), + + Filter1 = filter(<<"k1 <= -3">>), + {ok, R1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter1), + ok = amqp10_client:flow_link_credit(R1, 10, never, true), + receive {amqp10_msg, R1, R1M2} -> + ?assertEqual([<<"m2">>], amqp10_msg:body(R1M2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, R1, R1M3} -> + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_credit_exhausted(R1, ?LINE), + ok = detach_link_sync(R1), + + Filter2 = filter( + <<"header.priority = 200 AND " + "p.message_id = 999 AND " + "p.user_id = 0x6775657374 AND " + "p.to LIKE '/exch_nges/some=%20exchange/rout%' ESCAPE '=' AND " + "p.subject = '🐇' AND " + "p.reply_to LIKE '/queues/some%' AND " + "p.correlation_id IN ('corr-345', 'corr-123') AND " + "p.content_type = 'text/plain' AND " + "p.content_encoding = 'some encoding' AND " + "p.absolute_expiry_time > UTC() AND " + "p.creation_time > UTC() - 60000 AND " + "p.group_id IS NOT NULL AND " + "p.group_sequence = 4294967295 AND " + "p.reply_to_group_id = 'other group ID' AND " + "k1 < 0 AND " + "NOT k2 AND " + "k3 AND " + "k4 NOT LIKE 'hey' AND " + "k5 IS NULL" + /utf8>>), + {ok, R2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + settled, configuration, Filter2), + ok = amqp10_client:flow_link_credit(R2, 10, never, true), + receive {amqp10_msg, R2, R2M2} -> + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_credit_exhausted(R2, ?LINE), + ok = detach_link_sync(R2), + + Filter3 = filter(<<"absent IS NULL">>), + {ok, R3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + settled, configuration, Filter3), + ok = amqp10_client:flow_link_credit(R3, 10, never, true), + receive {amqp10_msg, R3, R3M1} -> + ?assertEqual([<<"m1">>], amqp10_msg:body(R3M1)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, R3, R3M2} -> + ?assertEqual([<<"m2">>], amqp10_msg:body(R3M2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, R3, R3M3} -> + ?assertEqual([<<"m3">>], amqp10_msg:body(R3M3)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_credit_exhausted(R3, ?LINE), + ok = detach_link_sync(R3), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Filter a small subset from many messages. +%% We test here that flow control still works correctly. +filter_few_messages_from_many(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + {Connection, Session, LinkPair} = init(Config), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = send_message( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t1">>, <<"first msg">>))), + ok = send_messages(Sender, 5000, false), + ok = send_message( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t2">>, <<"last msg">>))), + ok = wait_for_accepts(5002), + flush(sent), + + %% Our filter should cause us to receive only the first and + %% last message out of the 1002 messages in the stream. + Filter = filter(<<"properties.group_id IS NOT NULL">>), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + settled, configuration, Filter), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + settled, configuration, Filter), + receive {amqp10_event, {link, Receiver1, attached}} -> ok + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_event, {link, Receiver2, attached}} -> ok + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + + ok = amqp10_client:flow_link_credit(Receiver1, 3, never, true), + ok = amqp10_client:flow_link_credit(Receiver2, 3, never, false), + + %% For two links filtering on the same session, we expect that RabbitMQ + %% delivers messages concurrently (instead of scanning the entire stream + %% for the 1st receiver before scanning the entire stream for the 2nd receiver). + receive {amqp10_msg, _, First1} -> + ?assertEqual([<<"first msg">>], amqp10_msg:body(First1)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, _, First2} -> + ?assertEqual([<<"first msg">>], amqp10_msg:body(First2)) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + + receive {amqp10_msg, _, Last1} -> + ?assertEqual([<<"last msg">>], amqp10_msg:body(Last1)) + after 60_000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_msg, _, Last2} -> + ?assertEqual([<<"last msg">>], amqp10_msg:body(Last2)) + after 60_000 -> ct:fail({missing_msg, ?LINE}) + end, + + %% We previously set drain=true for Receiver1 + ok = assert_credit_exhausted(Receiver1, ?LINE), + ok = send_message( + Sender, + amqp10_msg:set_properties( + #{group_id => <<"my group ID">>}, + amqp10_msg:new(<<"t3">>, <<"one more">>))), + receive {amqp10_disposition, {accepted, <<"t3">>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + receive {amqp10_msg, R2, Msg1} -> + ?assertEqual([<<"one more">>], amqp10_msg:body(Msg1)), + ?assertEqual(Receiver2, R2) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_credit_exhausted(Receiver2, ?LINE), + + ok = amqp10_client:flow_link_credit(Receiver1, 1_000_000_000, never, true), + receive {amqp10_msg, R1, Msg2} -> + ?assertEqual([<<"one more">>], amqp10_msg:body(Msg2)), + ?assertEqual(Receiver1, R1) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = assert_credit_exhausted(Receiver1, ?LINE), + + receive {amqp10_msg, _, _} -> ct:fail(unexpected_delivery) + after 10 -> ok + end, + + ok = detach_link_sync(Receiver1), + ok = detach_link_sync(Receiver2), + ok = detach_link_sync(Sender), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +%% Test that SQL and Bloom filters can be used together. +sql_and_bloom_filter(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + ok = send_message( + Sender, + amqp10_msg:set_message_annotations( + #{<<"x-stream-filter-value">> => <<"v1">>}, + amqp10_msg:set_headers( + #{priority => 12}, + amqp10_msg:set_properties( + #{subject => <<"v1">>}, + amqp10_msg:new(<<"t1">>, <<"msg">>))))), + receive {amqp10_disposition, {accepted, <<"t1">>}} -> ok + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + ok = detach_link_sync(Sender), + flush(sent), + + Filter = filter(<<"properties.subject = 'v1' AND header.priority > 10">>), + DesiredFilter = maps:put(<<"my bloom filter">>, + #filter{descriptor = <<"rabbitmq:stream-filter">>, + value = {utf8, <<"v1">>}}, + Filter), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, + unsettled, configuration, DesiredFilter), + receive {amqp10_event, + {link, Receiver, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter}}}}}} -> + DesiredFilterNames = lists:sort(maps:keys(DesiredFilter)), + ActualFilterNames = lists:sort([Name || {{symbol, Name}, _} <- ActualFilter]), + ?assertEqual(DesiredFilterNames, ActualFilterNames) + after 9000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:flow_link_credit(Receiver, 1, never), + receive {amqp10_msg, Receiver, M1} -> + ?assertEqual([<<"msg">>], amqp10_msg:body(M1)), + ok = amqp10_client:accept_msg(Receiver, M1) + after 9000 -> ct:fail({missing_msg, ?LINE}) + end, + ok = detach_link_sync(Receiver), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = end_session_sync(Session), + ok = close_connection_sync(Connection). + +invalid_filter(Config) -> + Stream = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(Stream), + + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {ok, #{}} = rabbitmq_amqp_client:declare_queue( + LinkPair, Stream, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"stream">>}}}), + + %% Trigger a lexer error. + Filter1 = #{?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SQL_FILTER, + value = {utf8, <<"@#$%^&">>}}}, + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, Address, + unsettled, configuration, Filter1), + receive {amqp10_event, + {link, Receiver1, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter1}}}}}} -> + %% RabbitMQ should exclude this filter in its reply attach frame because + %% "the sending endpoint [RabbitMQ] sets the filter actually in place". + ?assertMatch([], ActualFilter1) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + ok = detach_link_sync(Receiver1), + + %% Trigger a parser error. We use allowed tokens here, but the grammar is incorrect. + Filter2 = #{?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SQL_FILTER, + value = {utf8, <<"FALSE FALSE">>}}}, + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, Address, + unsettled, configuration, Filter2), + receive {amqp10_event, + {link, Receiver2, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter2}}}}}} -> + ?assertMatch([], ActualFilter2) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + ok = detach_link_sync(Receiver2), + + %% SQL filtering should be mutually exclusive with AMQP property filtering + PropsFilter = [{{symbol, <<"subject">>}, {utf8, <<"some subject">>}}], + Filter3 = #{<<"prop name">> => #filter{descriptor = ?DESCRIPTOR_NAME_PROPERTIES_FILTER, + value = {map, PropsFilter}}, + ?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SQL_FILTER, + value = {utf8, <<"TRUE">>}}}, + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session, <<"receiver 3">>, Address, + unsettled, configuration, Filter3), + receive {amqp10_event, + {link, Receiver3, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter3}}}}}} -> + %% We expect only one of the two filters to be actually in place. + ?assertMatch([_], ActualFilter3) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + ok = detach_link_sync(Receiver3), + + %% Send invalid UTF-8 in the SQL expression. + InvalidUTF8 = <<255>>, + Filter4 = #{?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_CODE_SQL_FILTER, + value = {utf8, InvalidUTF8}}}, + {ok, Receiver4} = amqp10_client:attach_receiver_link( + Session, <<"receiver 4">>, Address, + unsettled, configuration, Filter4), + receive {amqp10_event, + {link, Receiver4, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter4}}}}}} -> + ?assertMatch([], ActualFilter4) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + ok = detach_link_sync(Receiver4), + + %% Send invalid descriptor + Filter5 = #{?FILTER_NAME_SQL => #filter{descriptor = <<"apache.org:invalid:string">>, + value = {utf8, <<"TRUE">>}}}, + {ok, Receiver5} = amqp10_client:attach_receiver_link( + Session, <<"receiver 5">>, Address, + unsettled, configuration, Filter5), + receive {amqp10_event, + {link, Receiver5, + {attached, #'v1_0.attach'{ + source = #'v1_0.source'{filter = {map, ActualFilter5}}}}}} -> + ?assertMatch([], ActualFilter5) + after 9000 -> + ct:fail({missing_event, ?LINE}) + end, + ok = detach_link_sync(Receiver5), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = close_connection_sync(Connection). + +filter(String) + when is_binary(String) -> + #{<<"from start">> => #filter{descriptor = <<"rabbitmq:stream-offset-spec">>, + value = {symbol, <<"first">>}}, + ?FILTER_NAME_SQL => #filter{descriptor = ?DESCRIPTOR_NAME_SQL_FILTER, + value = {utf8, String}}}. + +assert_credit_exhausted(Receiver, Line) -> + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 9000 -> ct:fail({missing_credit_exhausted, Line}) + end. diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index 8a00be3d11dd..a42c25a7aa9a 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -52,7 +52,7 @@ groups() -> suite() -> [ - {timetrap, {minutes, 2}} + {timetrap, {minutes, 5}} ]. init_per_suite(Config) -> diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index 58312f70becf..431350fb6117 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -16,6 +16,7 @@ flush/1, wait_for_credit/1, wait_for_accepts/1, + send_message/2, send_messages/3, send_messages/4, detach_link_sync/1, end_session_sync/1, @@ -87,18 +88,10 @@ wait_for_accepts(N) -> ct:fail({missing_accepted, N}) end. -send_messages(Sender, Left, Settled) -> - send_messages(Sender, Left, Settled, <<>>). - -send_messages(_, 0, _, _) -> - ok; -send_messages(Sender, Left, Settled, BodySuffix) -> - Bin = integer_to_binary(Left), - Body = <>, - Msg = amqp10_msg:new(Bin, Body, Settled), +send_message(Sender, Msg) -> case amqp10_client:send_msg(Sender, Msg) of ok -> - send_messages(Sender, Left - 1, Settled, BodySuffix); + ok; {error, insufficient_credit} -> ok = wait_for_credit(Sender), %% The credited event we just processed could have been received some time ago, @@ -110,9 +103,21 @@ send_messages(Sender, Left, Settled, BodySuffix) -> %% but do not process the credited event in our mailbox. %% So, we must be defensive here and assume that the next amqp10_client:send/2 call might return {error, insufficient_credit} %% again causing us then to really wait to receive a credited event (instead of just processing an old credited event). - send_messages(Sender, Left, Settled, BodySuffix) + send_message(Sender, Msg) end. +send_messages(Sender, Left, Settled) -> + send_messages(Sender, Left, Settled, <<>>). + +send_messages(_, 0, _, _) -> + ok; +send_messages(Sender, Left, Settled, BodySuffix) -> + Bin = integer_to_binary(Left), + Body = <>, + Msg = amqp10_msg:new(Bin, Body, Settled), + ok = send_message(Sender, Msg), + send_messages(Sender, Left - 1, Settled, BodySuffix). + detach_link_sync(Link) -> ok = amqp10_client:detach_link(Link), ok = wait_for_link_detach(Link). diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index 9cc09ceaac98..60d78e5d46df 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -203,7 +203,9 @@ stop_slave_node(Node) -> persistent_term:erase({?MODULE, Node}), ct:pal("- Stopping slave node `~ts`...", [Node]), - _ = peer:stop(NodePid) + Ret = catch peer:stop(NodePid), + ct:pal(" Ret = ~0p", [Ret]), + ok end. connect_nodes([FirstNode | OtherNodes] = Nodes) -> diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl index 854250543846..15ae436c656c 100644 --- a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl @@ -130,7 +130,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn1] = open_connections(Config, [0]), [Chan1] = open_channels(Conn1, 1), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(1, count_channels_in(Config, Username), ?A_TOUT), [#tracked_channel{username = Username}] = channels_in(Config, Username), ?awaitMatch(true, is_process_alive(Conn1), ?A_TOUT), @@ -147,7 +147,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn2] = open_connections(Config, [{0, Username2}]), Chans2 = [_|_] = open_channels(Conn2, 5), ?awaitMatch(1, count_connections_in(Config, Username2), ?A_TOUT), - [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?awaitMatch([#tracked_connection{username = Username2}], connections_in(Config, Username2), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username2), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username2), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username2), ?A_TOUT), @@ -157,7 +157,7 @@ single_node_user_connection_channel_tracking(Config) -> [Conn3] = open_connections(Config, [0]), Chans3 = [_|_] = open_channels(Conn3, 5), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), @@ -172,7 +172,7 @@ single_node_user_connection_channel_tracking(Config) -> [?awaitMatch(true, is_process_alive(Ch), ?A_TOUT) || Ch <- Chans4], kill_connections([Conn4]), ?awaitMatch(1, count_connections_in(Config, Username), ?A_TOUT), - [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?awaitMatch([#tracked_connection{username = Username}], connections_in(Config, Username), ?A_TOUT), ?awaitMatch(5, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(1, tracked_user_connection_count(Config, Username), ?A_TOUT), ?awaitMatch(5, tracked_user_channel_count(Config, Username), ?A_TOUT), @@ -182,9 +182,11 @@ single_node_user_connection_channel_tracking(Config) -> [Conn5] = open_connections(Config, [0]), Chans5 = [_|_] = open_channels(Conn5, 7), ?awaitMatch(2, count_connections_in(Config, Username), ?A_TOUT), - [Username, Username] = - lists:map(fun (#tracked_connection{username = U}) -> U end, - connections_in(Config, Username)), + ?awaitMatch( + [Username, Username], + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username)), + ?A_TOUT), ?awaitMatch(12, count_channels_in(Config, Username), ?A_TOUT), ?awaitMatch(12, tracked_user_channel_count(Config, Username), ?A_TOUT), ?awaitMatch(2, tracked_user_connection_count(Config, Username), ?A_TOUT), diff --git a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl index 601be3e45227..299315761b2d 100644 --- a/deps/rabbit/test/per_user_connection_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_tracking_SUITE.erl @@ -111,22 +111,44 @@ single_node_list_of_user(Config) -> ?assertEqual(0, count_connections_in(Config, Username)), ?assertEqual(0, count_connections_in(Config, Username2)), +<<<<<<< HEAD [Conn1] = open_connections(Config, [0]), ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), +======= + [Conn1] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), +>>>>>>> ed1cdb598 (per_user_connection_tracking_SUITE: Wait for the expected list of connections) close_connections([Conn1]), ?awaitMatch(0, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [Conn2] = open_connections(Config, [{0, Username2}]), ?awaitMatch(1, count_connections_in(Config, Username2), ?AWAIT_TIMEOUT), - [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?awaitMatch( + [#tracked_connection{username = Username2}], + connections_in(Config, Username2), + ?AWAIT_TIMEOUT), +<<<<<<< HEAD [Conn3] = open_connections(Config, [0]), ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), +======= + [Conn3] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), +>>>>>>> ed1cdb598 (per_user_connection_tracking_SUITE: Wait for the expected list of connections) [Conn4] = open_connections(Config, [0]), kill_connections([Conn4]), +<<<<<<< HEAD ?awaitMatch(1, count_connections_in(Config, Username), ?AWAIT_TIMEOUT), [#tracked_connection{username = Username}] = connections_in(Config, Username), @@ -135,6 +157,21 @@ single_node_list_of_user(Config) -> [Username, Username] = lists:map(fun (#tracked_connection{username = U}) -> U end, connections_in(Config, Username)), +======= + ?awaitMatch(1, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [#tracked_connection{username = Username1}], + connections_in(Config, Username1), + ?AWAIT_TIMEOUT), + + [Conn5] = open_connections(Config, [{0, Username1}]), + ?awaitMatch(2, count_connections_in(Config, Username1), ?AWAIT_TIMEOUT), + ?awaitMatch( + [Username1, Username1], + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username1)), + ?AWAIT_TIMEOUT), +>>>>>>> ed1cdb598 (per_user_connection_tracking_SUITE: Wait for the expected list of connections) close_connections([Conn2, Conn3, Conn5]), rabbit_ct_broker_helpers:delete_user(Config, Username2), diff --git a/deps/rabbit/test/proxy_protocol_SUITE.erl b/deps/rabbit/test/proxy_protocol_SUITE.erl index 72e9e37c4b98..a3abc23602e3 100644 --- a/deps/rabbit/test/proxy_protocol_SUITE.erl +++ b/deps/rabbit/test/proxy_protocol_SUITE.erl @@ -10,6 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile(export_all). -define(TIMEOUT, 5000). @@ -65,8 +67,10 @@ proxy_protocol_v1(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. proxy_protocol_v1_tls(Config) -> @@ -80,8 +84,10 @@ proxy_protocol_v1_tls(Config) -> {ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. proxy_protocol_v2_local(Config) -> @@ -97,13 +103,22 @@ proxy_protocol_v2_local(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), + ct:pal("Connection name: ~s", [ConnectionName]), match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]), gen_tcp:close(Socket), + wait_for_connection_close(Config), ok. connection_name() -> - Pids = pg_local:get_members(rabbit_connections), - Pid = lists:nth(1, Pids), + ?awaitMatch([_], pg_local:get_members(rabbit_connections), 30000), + [Pid] = pg_local:get_members(rabbit_connections), {dictionary, Dict} = process_info(Pid, dictionary), {process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict), ConnectionName. + +wait_for_connection_close(Config) -> + ?awaitMatch( + [], + rabbit_ct_broker_helpers:rpc( + Config, 0, pg_local, get_members, [rabbit_connnections]), + 30000). diff --git a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl index a6949065253f..2e0d0aa1075e 100644 --- a/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_dlx_integration_SUITE.erl @@ -230,7 +230,7 @@ delivery_limit(Config) -> {_, #amqp_msg{props = #'P_basic'{headers = Headers}}} = ?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, amqp_channel:call(Ch, #'basic.get'{queue = TargetQ}), - 1000), + 30000), assert_dlx_headers(Headers, <<"delivery_limit">>, SourceQ), ?assertEqual(1, counted(messages_dead_lettered_delivery_limit_total, Config)), eventually(?_assertEqual(1, counted(messages_dead_lettered_confirmed_total, Config))). diff --git a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl index 87f51a4a62b3..1e6657649911 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/jwks_SUITE.erl @@ -896,6 +896,7 @@ test_failed_connection_with_a_token_with_insufficient_resource_permission(Config ?assertExit({{shutdown, {server_initiated_close, 403, _}}, _}, amqp_channel:call(Ch, #'queue.declare'{queue = <<"alt-prefix.eq.1">>, exclusive = true})), + close_connection(Conn). test_failed_token_refresh_case1(Config) -> @@ -941,7 +942,7 @@ test_failed_token_refresh_case2(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 530, _}}}, _}, amqp_connection:open_channel(Conn)), - close_connection(Conn). + wait_for_connection_exit(Conn). cannot_change_username_on_refreshed_token(Config) -> Jwk = @@ -983,4 +984,14 @@ rpc_get_env(Config, Par) -> [rabbitmq_auth_backend_oauth2, Par]). rpc_get_env(Config, Par, Default) -> rpc(Config, 0, application, get_env, - [rabbitmq_auth_backend_oauth2, Par, Default]). \ No newline at end of file + [rabbitmq_auth_backend_oauth2, Par, Default]). + +wait_for_connection_exit(Conn) -> + MRef = erlang:monitor(process, Conn), + receive + {'DOWN', MRef, _Type, _Conn, Reason} -> + ct:pal("Connection ~0p exited: ~p", [Conn, Reason]), + ok + after 30000 -> + ct:fail("Connection ~0p is still up after 30 seconds", [Conn]) + end. diff --git a/deps/rabbitmq_cli/.gitignore b/deps/rabbitmq_cli/.gitignore index 43c231de0dd8..9b987ada02eb 100644 --- a/deps/rabbitmq_cli/.gitignore +++ b/deps/rabbitmq_cli/.gitignore @@ -1 +1,2 @@ /deps/ +/logs diff --git a/deps/rabbitmq_cli/Makefile b/deps/rabbitmq_cli/Makefile index 616e44d640a4..024f71b12e2a 100644 --- a/deps/rabbitmq_cli/Makefile +++ b/deps/rabbitmq_cli/Makefile @@ -123,7 +123,11 @@ tests:: escript test-deps $(verbose) $(MAKE) -C ../../ install-cli $(verbose) $(MAKE) -C ../../ start-background-broker \ PLUGINS="rabbitmq_federation rabbitmq_stomp rabbitmq_stream_management amqp_client" \ - $(if $(filter khepri,$(RABBITMQ_METADATA_STORE)),,RABBITMQ_FEATURE_FLAGS="-khepri_db") + $(if $(filter khepri,$(RABBITMQ_METADATA_STORE)),,RABBITMQ_FEATURE_FLAGS="-khepri_db"); \ + rm -f logs; \ + log_dir=$$(../../sbin/rabbitmqctl eval 'io:format("~s~n", [maps:get(log_base_dir,rabbit_prelaunch:get_context())]).'); \ + log_dir=$$(echo "$$log_dir" | head -n 1); \ + ln -s "$$log_dir" logs $(gen_verbose) $(MIX_TEST) \ $(if $(RABBITMQ_METADATA_STORE),--exclude $(filter-out $(RABBITMQ_METADATA_STORE),khepri mnesia),) \ $(TEST_FILE); \ @@ -160,7 +164,7 @@ endif clean:: clean-mix clean-mix: - $(gen_verbose) rm -f $(ESCRIPT_FILE) $(LINKED_ESCRIPTS) + $(gen_verbose) rm -f $(ESCRIPT_FILE) $(LINKED_ESCRIPTS) logs $(verbose) echo y | mix clean format: diff --git a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl index 35b3a5a70e5b..641f7e8596a3 100644 --- a/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl +++ b/deps/rabbitmq_consistent_hash_exchange/test/rabbit_exchange_type_consistent_hash_SUITE.erl @@ -26,6 +26,9 @@ all() -> {group, khepri_migration} ]. +suite() -> + [{timetrap, {minutes, 5}}]. + groups() -> [ {routing_tests, [], routing_tests()}, @@ -156,7 +159,7 @@ custom_header_undefined(Config) -> Exchange = <<"my exchange">>, Queue = <<"my queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), #'exchange.declare_ok'{} = amqp_channel:call( Ch, #'exchange.declare' { @@ -179,7 +182,7 @@ custom_header_undefined(Config) -> ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, amqp_channel:call(Ch, #'basic.get'{queue = Queue})), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), clean_up_test_topology(Config, Exchange, [Queue]), ok. @@ -373,7 +376,7 @@ test_with_timestamp(Config, Qs) -> Qs). test_mutually_exclusive_arguments(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), process_flag(trap_exit, true), Cmd = #'exchange.declare'{ @@ -384,11 +387,11 @@ test_mutually_exclusive_arguments(Config) -> }, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_non_supported_property(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), process_flag(trap_exit, true), Cmd = #'exchange.declare'{ @@ -398,7 +401,7 @@ test_non_supported_property(Config) -> }, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. rnd() -> @@ -411,13 +414,13 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues) -> test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT). test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) -> - Chan = rabbit_ct_client_helpers:open_channel(Config), - #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), - CHX = <<"e">>, clean_up_test_topology(Config, CHX, Queues), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), + #'exchange.declare_ok'{} = amqp_channel:call(Chan, #'exchange.declare' { @@ -464,11 +467,11 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, Itera [Chi, Obs]), clean_up_test_topology(Config, CHX, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_binding_with_negative_routing_key(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"bind-fail">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -482,15 +485,15 @@ test_binding_with_negative_routing_key(Config) -> Cmd = #'queue.bind'{exchange = <<"bind-fail">>, routing_key = <<"-1">>}, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), - rabbit_ct_client_helpers:close_channel(Chan), - rabbit_ct_client_helpers:close_channel(Ch2), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2), ok. test_binding_with_non_numeric_routing_key(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"bind-fail">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -505,10 +508,11 @@ test_binding_with_non_numeric_routing_key(Config) -> routing_key = <<"not-a-number">>}, ?assertExit(_, amqp_channel:call(Chan, Cmd)), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch2, #'queue.delete'{queue = Q}), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Ch2), ok. %% @@ -516,7 +520,7 @@ test_binding_with_non_numeric_routing_key(Config) -> %% test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_recovery_between_node_restarts">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -547,11 +551,11 @@ test_durable_exchange_hash_ring_recovery_between_node_restarts(Config) -> assert_ring_consistency(Config, X), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_queue_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_queue_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -576,11 +580,11 @@ test_hash_ring_updates_when_queue_is_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, [Q]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_multiple_queues_are_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -611,7 +615,7 @@ test_hash_ring_updates_when_multiple_queues_are_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure(Config) -> @@ -706,7 +710,7 @@ test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closu ok. test_hash_ring_updates_when_exchange_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_exchange_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -734,11 +738,11 @@ test_hash_ring_updates_when_exchange_is_deleted(Config) -> ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_queue_is_unbound(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_queue_is_unbound">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -769,11 +773,11 @@ test_hash_ring_updates_when_queue_is_unbound(Config) -> ?assertEqual(8, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, Queues), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -818,11 +822,11 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_queue_is_deleted(Co assert_ring_consistency(Config, X), clean_up_test_topology(Config, X, [Q1, Q2]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted(Config) -> - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), X = <<"test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted">>, amqp_channel:call(Chan, #'exchange.delete' {exchange = X}), @@ -872,14 +876,14 @@ test_hash_ring_updates_when_duplicate_binding_is_created_and_binding_is_deleted( ?assertEqual(0, count_buckets_of_exchange(Config, X)), clean_up_test_topology(Config, X, [Q1, Q2]), - rabbit_ct_client_helpers:close_channel(Chan), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), ok. %% Follows the setup described in %% https://github.com/rabbitmq/rabbitmq-server/issues/3386#issuecomment-1103929292 node_restart(Config) -> - Chan1 = rabbit_ct_client_helpers:open_channel(Config, 1), - Chan2 = rabbit_ct_client_helpers:open_channel(Config, 2), + {Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1), + {Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2), X = atom_to_binary(?FUNCTION_NAME), #'exchange.declare_ok'{} = amqp_channel:call(Chan1, @@ -903,8 +907,8 @@ node_restart(Config) -> F(Chan1, QsNode1), F(Chan2, QsNode2), - rabbit_ct_client_helpers:close_channel(Chan1), - rabbit_ct_client_helpers:close_channel(Chan2), + rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1), + rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2), rabbit_ct_broker_helpers:restart_node(Config, 1), rabbit_ct_broker_helpers:restart_node(Config, 2), @@ -942,13 +946,14 @@ count_buckets_of_exchange(Config, X) -> from_mnesia_to_khepri(Config) -> Queues = [Q1, Q2, Q3, Q4] = ?RoutingTestQs, IterationCount = ?DEFAULT_SAMPLE_COUNT, - Chan = rabbit_ct_client_helpers:open_channel(Config, 0), - #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), CHX = <<"e">>, clean_up_test_topology(Config, CHX, Queues), + {Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + #'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}), + #'exchange.declare_ok'{} = amqp_channel:call(Chan, #'exchange.declare' { @@ -973,36 +978,32 @@ from_mnesia_to_khepri(Config) -> case rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db) of ok -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_consistent_hash_exchange_raft_based_metadata_store) of - ok -> - [amqp_channel:call(Chan, - #'basic.publish'{exchange = CHX, routing_key = rnd()}, - #amqp_msg{props = #'P_basic'{}, payload = <<>>}) - || _ <- lists:duplicate(IterationCount, const)], - amqp_channel:wait_for_confirms(Chan, 300), - timer:sleep(500), - Counts = - [begin - #'queue.declare_ok'{message_count = M} = - amqp_channel:call(Chan, #'queue.declare' {queue = Q, - exclusive = true}), - M - end || Q <- Queues], - ?assertEqual(IterationCount, lists:sum(Counts)), %% All messages got routed - %% Chi-square test - %% H0: routing keys are not evenly distributed according to weight - Expected = [IterationCount div 6, IterationCount div 6, (IterationCount div 6) * 2, (IterationCount div 6) * 2], - Obs = lists:zip(Counts, Expected), - Chi = lists:sum([((O - E) * (O - E)) / E || {O, E} <- Obs]), - ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", - [Chi, Obs]), - clean_up_test_topology(Config, CHX, Queues), - rabbit_ct_client_helpers:close_channel(Chan), - ok; - Skip -> - Skip - end; + [amqp_channel:call(Chan, + #'basic.publish'{exchange = CHX, routing_key = rnd()}, + #amqp_msg{props = #'P_basic'{}, payload = <<>>}) + || _ <- lists:duplicate(IterationCount, const)], + amqp_channel:wait_for_confirms(Chan, 300), + timer:sleep(500), + Counts = + [begin + #'queue.declare_ok'{message_count = M} = + amqp_channel:call(Chan, #'queue.declare' {queue = Q, + exclusive = true}), + M + end || Q <- Queues], + ?assertEqual(IterationCount, lists:sum(Counts)), %% All messages got routed + %% Chi-square test + %% H0: routing keys are not evenly distributed according to weight + Expected = [IterationCount div 6, IterationCount div 6, (IterationCount div 6) * 2, (IterationCount div 6) * 2], + Obs = lists:zip(Counts, Expected), + Chi = lists:sum([((O - E) * (O - E)) / E || {O, E} <- Obs]), + ct:pal("Chi-square test for 3 degrees of freedom is ~p, p = 0.01 is 11.35, observations (counts, expected): ~p", + [Chi, Obs]), + clean_up_test_topology(Config, CHX, Queues), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), + ok; Skip -> + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan), Skip end. @@ -1010,12 +1011,12 @@ clean_up_test_topology(Config) -> clean_up_test_topology(Config, none, ?AllQs). clean_up_test_topology(Config, none, Qs) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], - rabbit_ct_client_helpers:close_channel(Ch); + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch); clean_up_test_topology(Config, X, Qs) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch, #'exchange.delete' {exchange = X}), [amqp_channel:call(Ch, #'queue.delete' {queue = Q}) || Q <- Qs], - rabbit_ct_client_helpers:close_channel(Ch). + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index f69d80a14c03..5c07395b4559 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -179,8 +179,20 @@ init_per_group(Group, Config) -> _ -> merge_app_env(AuthConfig, Conf) end end] ++ +<<<<<<< HEAD rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). +======= + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + case Config2 of + _ when is_list(Config2) -> + util:enable_plugin(Config2, rabbitmq_mqtt), + Config2; + {skip, _} -> + Config2 + end. +>>>>>>> 5c1456b2d (auth_SUITE: Handle error returned by rabbit_ct_broker_helpers) end_per_group(G, Config) when G =:= v4; @@ -1249,6 +1261,15 @@ vhost_connection_limit(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, <<"/">>). +<<<<<<< HEAD +======= +count_connections_per_vhost(Config) -> + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_connection_tracking, count_local_tracked_items_in_vhost, + [<<"/">>]). + +>>>>>>> 02b156155 (auth_SUITE: Wait for connection tracking to be up-to-date) vhost_queue_limit(Config) -> ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, 1), {ok, C} = connect_anonymous(Config), @@ -1268,6 +1289,7 @@ user_connection_limit(Config) -> ok = rabbit_ct_broker_helpers:set_user_limits(Config, DefaultUser, #{max_connections => 1}), {ok, C1} = connect_anonymous(Config, <<"client1">>), {ok, _} = emqtt:connect(C1), + ?awaitMatch(1, count_connections_per_vhost(Config), 30000), {ok, C2} = connect_anonymous(Config, <<"client2">>), ExpectedError = expected_connection_limit_error(Config), unlink(C2), diff --git a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl index 7cae82eda328..34374f793c72 100644 --- a/deps/rabbitmq_mqtt/test/cluster_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/cluster_SUITE.erl @@ -83,7 +83,18 @@ init_per_testcase(Testcase, Config) -> Config1, [fun merge_app_env/1] ++ setup_steps() ++ +<<<<<<< HEAD rabbit_ct_client_helpers:setup_steps()). +======= + rabbit_ct_client_helpers:setup_steps()), + case Config2 of + _ when is_list(Config2) -> + util:enable_plugin(Config2, rabbitmq_mqtt), + Config2; + {skip, _} -> + Config2 + end. +>>>>>>> 0e36184a6 (cluster_SUITE: Handle error returned by rabbit_ct_broker_helpers) end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:run_steps(Config, diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index d6964017dec1..2d839607d4c1 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -189,10 +189,24 @@ init_per_group(Group, Config0) -> Config0, [{rmq_nodes_count, Nodes}, {rmq_nodename_suffix, Suffix}]), +<<<<<<< HEAD rabbit_ct_helpers:run_steps( Config, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). +======= + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + case Config1 of + _ when is_list(Config1) -> + util:enable_plugin(Config1, rabbitmq_mqtt), + Config1; + {skip, _} -> + Config1 + end. +>>>>>>> 8bdbb0fc2 (mqtt_shared_SUITE: Handle error returned by rabbit_ct_broker_helpers) end_per_group(G, Config) when G =:= cluster_size_1; diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index b14c0897f10e..4ba0ea942ff4 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -63,6 +63,7 @@ groups() -> build_info_product_test ]}, {detailed_metrics, [], [ + stream_pub_sub_metrics, detailed_metrics_no_families_enabled_by_default, queue_consumer_count_single_vhost_per_object_test, queue_consumer_count_all_vhosts_per_object_test, @@ -75,7 +76,11 @@ groups() -> vhost_status_metric, exchange_bindings_metric, exchange_names_metric, +<<<<<<< HEAD stream_pub_sub_metrics +======= + detailed_raft_metrics_test +>>>>>>> 267445680 (rabbit_prometheus_http_SUITE: Run `stream_pub_sub_metrics` first) ]}, {special_chars, [], [core_metrics_special_chars]}, {authentication, [], [basic_auth]} @@ -782,6 +787,10 @@ exchange_names_metric(Config) -> ok. stream_pub_sub_metrics(Config) -> + {_, Body0} = http_get_with_pal(Config, "/metrics", [], 200), + Metrics = parse_response(Body0), + ct:pal("Initial metrics: ~p", [Metrics]), + Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1", MsgPerBatch1 = 2, {ok, S1, C1} = publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config), diff --git a/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl index 96e414adc387..580b6ec947d5 100644 --- a/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_inter_cluster_SUITE.erl @@ -9,6 +9,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). + +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile([export_all, nowarn_export_all]). -import(rabbit_ct_broker_helpers, [rpc/5]). @@ -72,22 +75,23 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). old_to_new_on_old(Config) -> - ok = shovel(?OLD, ?NEW, ?OLD, Config). + ok = shovel(?FUNCTION_NAME, ?OLD, ?NEW, ?OLD, Config). old_to_new_on_new(Config) -> - ok = shovel(?OLD, ?NEW, ?NEW, Config). + ok = shovel(?FUNCTION_NAME, ?OLD, ?NEW, ?NEW, Config). new_to_old_on_old(Config) -> - ok = shovel(?NEW, ?OLD, ?OLD, Config). + ok = shovel(?FUNCTION_NAME, ?NEW, ?OLD, ?OLD, Config). new_to_old_on_new(Config) -> - ok = shovel(?NEW, ?OLD, ?NEW, Config). + ok = shovel(?FUNCTION_NAME, ?NEW, ?OLD, ?NEW, Config). -shovel(SrcNode, DestNode, ShovelNode, Config) -> +shovel(Caller, SrcNode, DestNode, ShovelNode, Config) -> SrcUri = shovel_test_utils:make_uri(Config, SrcNode), DestUri = shovel_test_utils:make_uri(Config, DestNode), - SrcQ = <<"my source queue">>, - DestQ = <<"my destination queue">>, + ShovelName = atom_to_binary(Caller), + SrcQ = <>, + DestQ = <>, Definition = [ {<<"src-uri">>, SrcUri}, {<<"src-protocol">>, <<"amqp10">>}, @@ -96,7 +100,6 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> {<<"dest-protocol">>, <<"amqp10">>}, {<<"dest-address">>, DestQ} ], - ShovelName = <<"my shovel">>, ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, set, [<<"/">>, <<"shovel">>, ShovelName, Definition, none]), ok = shovel_test_utils:await_shovel(Config, ShovelNode, ShovelName), @@ -125,6 +128,7 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> ok = amqp10_client:flow_link_credit(Receiver, NumMsgs, never), Msgs = receive_messages(Receiver, NumMsgs), + ct:pal("~b messages:~n~p", [length(Msgs), Msgs]), lists:map( fun(N) -> Msg = lists:nth(N, Msgs), @@ -136,8 +140,28 @@ shovel(SrcNode, DestNode, ShovelNode, Config) -> ok = rpc(Config, ShovelNode, rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, ShovelName, none]), ExpectedQueueLen = 0, - ?assertEqual([ExpectedQueueLen], rpc(Config, ?OLD, ?MODULE, delete_queues, [])), - ?assertEqual([ExpectedQueueLen], rpc(Config, ?NEW, ?MODULE, delete_queues, [])). + ?awaitMatch( + [{_, ExpectedQueueLen}], + begin + Ret = rpc(Config, ?OLD, ?MODULE, queues_length, []), + ct:pal("Queues on old: ~p", [Ret]), + Ret + end, + 30000), + ?awaitMatch( + [{_, ExpectedQueueLen}], + begin + Ret = rpc(Config, ?NEW, ?MODULE, queues_length, []), + ct:pal("Queues on new: ~p", [Ret]), + Ret + end, + 30000), + ?assertEqual( + [ExpectedQueueLen], + rpc(Config, ?OLD, ?MODULE, delete_queues, [])), + ?assertEqual( + [ExpectedQueueLen], + rpc(Config, ?NEW, ?MODULE, delete_queues, [])). wait_for_credit(Sender) -> receive @@ -170,6 +194,13 @@ flush(Prefix) -> ok end. +queues_length() -> + [begin + #{<<"name">> := Name} = amqqueue:to_printable(Q), + [{messages, N}] = rabbit_amqqueue:info(Q, [messages]), + {Name, N} + end || Q <- rabbit_amqqueue:list()]. + delete_queues() -> [begin {ok, N} = rabbit_amqqueue:delete(Q, false, false, <<"tests">>), diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py index af643737c23c..a08e5f03f51d 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/base.py @@ -126,7 +126,10 @@ def tearDown(self): if self.conn.is_connected(): try: self.conn.disconnect() - except: + except Exception as inst: + print(type(inst)) + print(inst.args) + print(inst) pass elapsed = time.time() - self._started_at print('{} ({}s)'.format(self.id(), round(elapsed, 2))) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt b/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt index fd2cc9d6beb1..789ce525d372 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/requirements.txt @@ -1,3 +1,3 @@ -stomp.py==8.1.0 -pika==1.1.0 +stomp.py==8.2.0 +pika==1.3.2 rabbitman===0.1.0 diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py index 8b15a5b89b4d..32cdd61e9621 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/test_runner.py @@ -20,7 +20,7 @@ def run_unittests(modules): if name.startswith("Test") and issubclass(obj, unittest.TestCase): suite.addTest(unittest.TestLoader().loadTestsFromTestCase(obj)) - ts = unittest.TextTestRunner().run(unittest.TestSuite(suite)) + ts = unittest.TextTestRunner(verbosity=10).run(unittest.TestSuite(suite)) if ts.errors or ts.failures: sys.exit(1) diff --git a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py index 2aed99ec31f9..a5e783d52d75 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py +++ b/deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_name.py @@ -11,6 +11,7 @@ import base import time import os +import test_util class TestUserGeneratedQueueName(base.BaseTest): @@ -29,6 +30,12 @@ def test_exchange_dest(self): pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) channel = connection.channel() + test_util.rabbitmqctl(['list_queues']) + test_util.rabbitmqctl(['list_connections', 'peer_host', 'peer_port', + 'protocol']) + test_util.rabbitmqctl(['list_stomp_connections', 'peer_host', + 'peer_port', 'protocol']) + # publish a message to the named queue channel.basic_publish( exchange='', @@ -36,11 +43,13 @@ def test_exchange_dest(self): body='Hello World!') # check if we receive the message from the STOMP subscription - self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertTrue(self.listener.wait(30), "initial message not received") self.assertEqual(1, len(self.listener.messages)) - self.conn.disconnect() + # self.conn.disconnect() connection.close() + while not connection.is_closed: + time.sleep(1) def test_topic_dest(self): queueName='my-user-generated-queue-name-topic' @@ -57,6 +66,12 @@ def test_topic_dest(self): pika.ConnectionParameters( host='127.0.0.1', port=int(os.environ["AMQP_PORT"]))) channel = connection.channel() + test_util.rabbitmqctl(['list_queues']) + test_util.rabbitmqctl(['list_connections', 'peer_host', 'peer_port', + 'protocol']) + test_util.rabbitmqctl(['list_stomp_connections', 'peer_host', + 'peer_port', 'protocol']) + # publish a message to the named queue channel.basic_publish( exchange='', @@ -64,11 +79,13 @@ def test_topic_dest(self): body='Hello World!') # check if we receive the message from the STOMP subscription - self.assertTrue(self.listener.wait(5), "initial message not received") + self.assertTrue(self.listener.wait(30), "initial message not received") self.assertEqual(1, len(self.listener.messages)) - self.conn.disconnect() + # self.conn.disconnect() connection.close() + while not connection.is_closed: + time.sleep(1) if __name__ == '__main__': diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index e6c69bc17bd1..1b9e6b0b8237 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -211,7 +211,7 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), - log("Stream leader and coordinator leader are on ~p", [L#node.name]), + log("Coordinator leader on: ~0p~nStream leader on: ~0p", [CL, L#node.name]), {ok, So0, C0_00} = stream_test_utils:connect(Config, CL), {ok, So1, C1_00} = stream_test_utils:connect(Config, CF1), diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile b/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile index dae43a1ad68c..994584d771a7 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/Makefile @@ -2,7 +2,8 @@ export PATH :=$(CURDIR):$(PATH) HOSTNAME := $(shell hostname) MVN_FLAGS += -Dstream.port=$(STREAM_PORT) \ -Dstream.port.tls=$(STREAM_PORT_TLS) \ - -Dmanagement.port=$(MANAGEMENT_PORT) + -Dmanagement.port=$(MANAGEMENT_PORT) \ + -Dmaven.wagon.http.retryHandler.count=5 .PHONY: tests clean