Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 122 additions & 3 deletions big_tests/tests/mod_event_pusher_rabbit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

-define(RABBIT_HTTP_ENDPOINT, "http://127.0.0.1:15672").

-define(UTILS_MODULE, mod_event_pusher_rabbit_utils).

-type rabbit_binding() :: {Queue :: binary(),
Exchange :: binary(),
RoutingKey :: binary()}.
Expand All @@ -55,7 +57,8 @@ all() ->
{group, chat_message_publish},
{group, group_chat_message_publish},
{group, instrumentation},
{group, filter_and_metadata}
{group, filter_and_metadata},
{group, single_worker}
].

groups() ->
Expand All @@ -68,7 +71,8 @@ groups() ->
{chat_message_publish, [], chat_message_publish_tests()},
{group_chat_message_publish, [], group_chat_message_publish_tests()},
{instrumentation, [], instrumentation_tests()},
{filter_and_metadata, [], filter_and_metadata_tests()}].
{filter_and_metadata, [], filter_and_metadata_tests()},
{single_worker, [{repeat_until_any_fail, 50}], single_worker_tests()}].

pool_startup_tests() ->
[rabbit_pool_starts_with_default_config].
Expand Down Expand Up @@ -108,6 +112,12 @@ filter_and_metadata_tests() ->
[messages_published_events_are_not_executed,
presence_messages_are_properly_formatted_with_metadata].

single_worker_tests() ->
[connection_is_restarted_on_error,
connection_is_restarted_with_retries,
connection_is_restarted_with_retries_and_queue_limit,
worker_is_restarted_after_failed_retries].

suite() ->
escalus:suite().

Expand All @@ -126,13 +136,16 @@ init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp_client),
muc_helper:load_muc(),
mongoose_helper:inject_module(mod_event_pusher_filter),
mongoose_helper:inject_module(?UTILS_MODULE),
rpc(mim(), ?UTILS_MODULE, start, []),
escalus:init_per_suite(Config);
false ->
{skip, "RabbitMQ server is not available on default port."}
end.

end_per_suite(Config) ->
escalus_fresh:clean(),
rpc(mim(), ?UTILS_MODULE, stop, []),
muc_helper:unload_muc(),
escalus:end_per_suite(Config),
instrument_helper:stop().
Expand Down Expand Up @@ -579,6 +592,105 @@ messages_published_events_are_executed(Config) ->
end, #{expected_count => 2}) % for sender and receiver
end).

connection_is_restarted_on_error(Config) ->
escalus:story(
Config, [{bob, 1}],
fun(Bob) ->
%% GIVEN intermittent rabbit connection failure
BobJID = client_lower_short_jid(Bob),
listen_to_presence_events_from_rabbit([BobJID], Config),
{ok, Worker} = get_rabbit_worker(),
simulate_rabbit_connection_error(),

%% WHEN user sends presence
send_presence_stanzas([Bob], 1),

%% THEN event is delivered because the worker kept its queue
?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID},
#amqp_msg{}}, timer:seconds(5)),
?assertEqual({ok, Worker}, get_rabbit_worker())
end).

connection_is_restarted_with_retries(Config) ->
escalus:story(
Config, [{bob, 1}],
fun(Bob) ->
%% GIVEN an intermittent rabbit connection failure lasting for 2 connect attempts
BobJID = client_lower_short_jid(Bob),
listen_to_presence_events_from_rabbit([BobJID], Config),
{ok, Worker} = get_rabbit_worker(),
simulate_rabbit_connection_error(2),

%% WHEN user sends presence
send_presence_stanzas([Bob], 1),

%% THEN event is delivered because the worker kept its queue
?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID},
#amqp_msg{}}, timer:seconds(5)),
?assertEqual({ok, Worker}, get_rabbit_worker())
end).

connection_is_restarted_with_retries_and_queue_limit(Config) ->
escalus:story(
Config, [{bob, 1}],
fun(Bob) ->
%% GIVEN an intermittent rabbit connection failure lasting for 2 connect attempts
BobJID = client_lower_short_jid(Bob),
listen_to_presence_events_from_rabbit([BobJID], Config),
{ok, Worker} = get_rabbit_worker(),
simulate_rabbit_connection_error(2),

%% WHEN user sends 2 presences
send_presence_stanzas([Bob], 2),

%% THEN: - first event is delivered because the worker kept its queue
%% - second event is dropped because max_worker_queue_len is 1
DecodedMessage = get_decoded_message_from_rabbit(BobJID),
?assertMatch(#{<<"present">> := false}, DecodedMessage),
assert_no_message_from_rabbit([BobJID]),
?assertEqual({ok, Worker}, get_rabbit_worker())
end).

worker_is_restarted_after_failed_retries(Config) ->
escalus:story(
Config, [{bob, 1}],
fun(Bob) ->
%% GIVEN an intermittent rabbit connection failure lasting for 3 connect attempts
BobJID = client_lower_short_jid(Bob),
listen_to_presence_events_from_rabbit([BobJID], Config),
{ok, Worker} = get_rabbit_worker(),
simulate_rabbit_connection_error(3),

%% WHEN user sends presence
send_presence_stanzas([Bob], 1),

%% THEN event is dropped because worker is restarted (reconnect.attempts was 2)
assert_no_message_from_rabbit([BobJID]),
wait_for_new_rabbit_worker(Worker), % wait until there is a new worker
send_presence_stanzas([Bob], 1),
?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID}, #amqp_msg{}},
timer:seconds(5))
end).

simulate_rabbit_connection_error() ->
rpc(mim(), ?UTILS_MODULE, ?FUNCTION_NAME, [domain(), 5671, 0]).

simulate_rabbit_connection_error(Count) ->
rpc(mim(), ?UTILS_MODULE, ?FUNCTION_NAME, [domain(), 5671, Count]).

clean_up_rabbit_reconnect_errors() ->
rpc(mim(), ?UTILS_MODULE, ?FUNCTION_NAME, []).

%% Make sure there is a rabbit worker again
wait_for_new_rabbit_worker(Worker) ->
{ok, {ok, NewWorker}} =
wait_helper:wait_until(fun get_rabbit_worker/0, true,
#{validator => fun({ok, W}) -> W =/= Worker end}),
NewWorker.

get_rabbit_worker() ->
rpc(mim(), mongoose_wpool, get_worker, [rabbit, domain(), event_pusher]).

%%--------------------------------------------------------------------
%% Test helpers
%%--------------------------------------------------------------------
Expand Down Expand Up @@ -809,13 +921,20 @@ start_rabbit_tls_wpool(Host, GroupName) ->
BasicConnOpts = #{tls => tls_config(), port => 5671, virtual_host => ?VHOST},
ConnOpts = maps:merge(BasicConnOpts, extra_conn_opts(GroupName)),
ensure_vhost(?VHOST),
start_rabbit_wpool(Host, BasicOpts#{conn_opts => ConnOpts}).
start_rabbit_wpool(Host, maps:merge(BasicOpts#{conn_opts => ConnOpts}, extra_opts(GroupName))).

extra_conn_opts(presence_status_publish_with_confirms) ->
#{confirms_enabled => true};
extra_conn_opts(single_worker) ->
#{reconnect => #{attempts => 2, delay => 1000}}; % Note: in case of flaky tests, increase delay
extra_conn_opts(_GroupName) ->
#{}.

extra_opts(single_worker) ->
#{opts => #{workers => 1, max_worker_queue_len => 1}};
extra_opts(_) ->
#{}.

tls_config() ->
#{certfile => "priv/ssl/fake_cert.pem",
keyfile => "priv/ssl/fake_key.pem",
Expand Down
35 changes: 35 additions & 0 deletions big_tests/tests/mod_event_pusher_rabbit_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-module(mod_event_pusher_rabbit_utils).

-export([start/0, stop/0, simulate_rabbit_connection_error/3]).

start() ->
meck:new(gen_tcp, [unstick, no_link, passthrough]).

stop() ->
meck:unload(gen_tcp).

simulate_rabbit_connection_error(HostType, Port, ReconnectFailures) ->
{ok, Worker} = mongoose_wpool:get_worker(rabbit, HostType, event_pusher),
State = sys:get_state(Worker),
c:c(wpool_process, [{d, 'TEST'}]), % export get_state/1
#{connection := Connection} = wpool_process:get_state(State),
simulate_tcp_connect_errors(Port, ReconnectFailures),
MonitorRef = monitor(process, Connection),
Connection ! {socket_error, simulated},
receive {'DOWN', MonitorRef, process, Connection, _} -> ok end.

simulate_tcp_connect_errors(_Port, 0) ->
ok;
simulate_tcp_connect_errors(Port, Count) ->
persistent_term:put({tcp_connect_errors, Port}, Count),
meck:expect(gen_tcp, connect, fun tcp_connect/4).

tcp_connect(Address, Port, Opts, Timeout) ->
case persistent_term:get({tcp_connect_errors, Port}, 0) of
0 ->
persistent_term:erase({tcp_connect_errors, Port}),
meck:passthrough([Address, Port, Opts, Timeout]);
N when N > 0 ->
persistent_term:put({tcp_connect_errors, Port}, N - 1),
{error, simulated_reconnect_error}
end.
21 changes: 21 additions & 0 deletions doc/configuration/outgoing-connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,27 @@ Sets the RabbitMQ Virtual Host. The host needs to exist, as it is **not** create

Enables/disables one-to-one publishers confirms.

### `outgoing_pools.rabbit.*.connection.reconnect.attempts`
* **Syntax:** non-negative integer
* **Default:** 0
* **Example:** `reconnect.attempts = 5`

By default, a failed connection attempt results in an immediate restart of the affected worker.
When this happens, its incoming request queue is lost, and any requests present in the queue are dropped.
To avoid this, you can use this option to specify a number of reconnection attempts before the worker is restarted.

!!! Warning
Using this option might result in a lot of requests being accumulated in the worker queues - especially if `reconnect.delay` multiplied by `reconnect.attempts` is a long time period.
Thus, we recommend using the [`max_worker_queue_len`](#outgoing_poolsmax_worker_queue_len) option as a safety valve is such cases.

### `outgoing_pools.rabbit.*.connection.reconnect.delay`
* **Syntax:** non-negative integer (milliseconds)
* **Default:** 2000
* **Example:** `reconnect.delay = 5000`

Delay (in milliseconds) between consecutive reconnection attempts.
This option is effective only if the value of `reconnect.attempts` is positive.

---
To enable TLS, you need to include the [TLS section](#tls-options) in the connection options.

Expand Down
13 changes: 12 additions & 1 deletion src/config/mongoose_config_spec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,8 @@ outgoing_pool_connection(<<"rabbit">>) ->
<<"virtual_host">> => #option{type = binary,
validate = non_empty},
<<"confirms_enabled">> => #option{type = boolean},
<<"tls">> => tls([client])
<<"tls">> => tls([client]),
<<"reconnect">> => rabbit_reconnect()
},
include = always,
defaults = #{<<"host">> => "localhost",
Expand Down Expand Up @@ -631,6 +632,16 @@ sql_tls() ->
sql_tls_extra() ->
#section{items = #{<<"required">> => #option{type = boolean}}}.

%% path: outgoing_pools.rabbit.*.connection.reconnect
rabbit_reconnect() ->
#section{items = #{<<"attempts">> => #option{type = integer, validate = non_negative},
<<"delay">> => #option{type = integer, validate = non_negative}},
defaults = #{<<"attempts">> => 0,
<<"delay">> => 2000 % milliseconds
},
include = always
}.

%% TLS options

tls(Entities) when is_list(Entities) ->
Expand Down
58 changes: 46 additions & 12 deletions src/mongoose_rabbit_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@
username := binary(),
password := binary(),
virtual_host := binary(),
confirms_enabled := boolean()}.
confirms_enabled := boolean(),
reconnect := reconnect()}.

-type reconnect() :: #{attempts := non_neg_integer(),
delay := non_neg_integer() % milliseconds
}.

-type publish_result() :: boolean() | timeout | {channel_exception, any(), any()}.

Expand Down Expand Up @@ -86,6 +91,11 @@ handle_cast({amqp_publish, Method, Payload}, State) ->
handle_amqp_publish(Method, Payload, State).

-spec handle_info(term(), state()) -> {noreply, state()}.
handle_info({'DOWN', _Ref, process, Connection, _}, State) ->
{noreply, case State of
#{connection := Connection} -> establish_rabbit_connection(State);
#{} -> State % probably already reconnected
end};
handle_info(Req, State) ->
?UNEXPECTED_INFO(Req),
{noreply, State}.
Expand Down Expand Up @@ -161,37 +171,61 @@ maybe_wait_for_confirms(_, _) ->
true.

-spec maybe_restart_rabbit_connection(state()) -> state().
maybe_restart_rabbit_connection(#{connection := Conn} = State) ->
case is_process_alive(Conn) of
maybe_restart_rabbit_connection(#{connection := Connection, opts := Opts} = State) ->
case is_process_alive(Connection) of
true ->
{ok, Channel} = amqp_connection:open_channel(Conn),
State#{channel := Channel};
State#{channel := open_amqp_channel(Connection, Opts)};
false ->
establish_rabbit_connection(State)
end.

-spec establish_rabbit_connection(state()) -> state().
establish_rabbit_connection(State) ->
establish_rabbit_connection(State = #{opts := #{reconnect := #{attempts := Attempts}}}) ->
establish_rabbit_connection(State, Attempts).

-spec establish_rabbit_connection(state(), non_neg_integer()) -> state().
establish_rabbit_connection(State, RemainingAttempts) ->
case start_amqp_connection(State) of
{ok, NewState} ->
NewState;
{error, Error} when RemainingAttempts > 0 ->
?LOG_WARNING(#{what => rabbit_connection_failed, reason => Error, worker_state => State,
remaining_attempts => RemainingAttempts}),
#{opts := #{reconnect := #{delay := Delay}}} = State,
timer:sleep(Delay),
establish_rabbit_connection(State, RemainingAttempts - 1);
{error, Error} when RemainingAttempts =:= 0 ->
ErrorInfo = #{what => rabbit_connection_failed, reason => Error, worker_state => State},
?LOG_ERROR(ErrorInfo),
exit(ErrorInfo)
end.

-spec start_amqp_connection(state()) -> {ok, state()} | {error, term()}.
start_amqp_connection(State) ->
#{opts := Opts, host_type := HostType, pool_tag := PoolTag} = State,
case amqp_connection:start(mongoose_amqp:network_params(Opts)) of
{ok, Connection} ->
monitor(process, Connection), % resulting ref is ignored as there is only one monitor
mongoose_instrument:execute(wpool_rabbit_connections,
#{host_type => HostType, pool_tag => PoolTag},
#{active => 1, opened => 1}),
{ok, Channel} = amqp_connection:open_channel(Connection),
maybe_enable_confirms(Channel, Opts),
Channel = open_amqp_channel(Connection, Opts),
?LOG_DEBUG(#{what => rabbit_connection_established,
host_type => HostType, pool_tag => PoolTag, opts => Opts}),
State#{connection => Connection, channel => Channel};
{ok, State#{connection => Connection, channel => Channel}};
{error, Error} ->
mongoose_instrument:execute(wpool_rabbit_connections,
#{host_type => HostType, pool_tag => PoolTag},
#{failed => 1}),
?LOG_ERROR(#{what => rabbit_connection_failed, reason => Error,
host_type => HostType, pool_tag => PoolTag, opts => Opts}),
exit("connection to a Rabbit server failed")
{error, Error}
end.

-spec open_amqp_channel(pid(), opts()) -> pid().
open_amqp_channel(Connection, Opts) ->
{ok, Channel} = amqp_connection:open_channel(Connection),
maybe_enable_confirms(Channel, Opts),
Channel.

-spec close_rabbit_connection(Connection :: pid(), Channel :: pid(),
HostType :: mongooseim:host_type_or_global(), PoolTag :: atom()) ->
ok | no_return().
Expand Down
5 changes: 4 additions & 1 deletion test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ options("outgoing_pools") ->
servers => ["ldap-server.example.com"]}},
#{type => rabbit, scope => host_type, tag => event_pusher,
opts => #{workers => 20, max_worker_queue_len => 100},
conn_opts => #{confirms_enabled => true}},
conn_opts => #{confirms_enabled => true,
reconnect => #{attempts => 5, delay => 2000}}},
#{type => rdbms,
opts => #{workers => 5},
conn_opts => #{query_timeout => 5000, keepalive_interval => 30,
Expand Down Expand Up @@ -1300,6 +1301,8 @@ default_config([outgoing_pools, Type, _Tag, opts]) ->
default_pool_wpool_opts(Type);
default_config([outgoing_pools, Type, _Tag, conn_opts]) ->
default_pool_conn_opts(Type);
default_config([outgoing_pools, rabbit, _Tag, conn_opts, reconnect]) ->
#{attempts => 10, delay => 5000};
default_config([outgoing_pools, _Type, _Tag, conn_opts, tls]) ->
maps:merge(default_tls(), #{server_name_indication => default_sni()});
default_config([outgoing_pools, _Type, _Tag, conn_opts, tls, server_name_indication]) ->
Expand Down
Loading