Skip to content

Commit ff98f6f

Browse files
committed
Fix flake in stream plugin test suite
The closing sequence must account for consumer update and metadata update frames the broker sends when a consumer group changes and when a stream is deleted.
1 parent cdd9ba1 commit ff98f6f

File tree

1 file changed

+45
-5
lines changed

1 file changed

+45
-5
lines changed

deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,20 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) ->
177177
delete_stream(stream_port(Config, 0), S),
178178

179179
%% online consumers should receive a metadata update frame (stream deleted)
180-
%% we unqueue the this frame before closing the connection
180+
%% we unqueue this frame before closing the connection
181181
%% directly closing the connection of the cancelled consumer
182+
%% Edge case:
183+
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
184+
%% This is because the active consumer is removed from the group and this triggers
185+
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
186+
%% stream is deleted, so the rebalancing does not take place.
187+
%% We just tolerate an extra frame when closing their respective connections.
182188
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
183189
log("Expecting frame in consumer ~p", [K]),
184190
{Cmd1, C1} = receive_commands(S0, C0),
185191
log("Received ~p", [Cmd1]),
186192
log("Closing"),
187-
{ok, _} = stream_test_utils:close(S0, C1);
193+
{ok, _} = close_connection(S0, C1);
188194
(K, {S0, C0}) ->
189195
log("Closing ~p", [K]),
190196
{ok, _} = stream_test_utils:close(S0, C0)
@@ -290,12 +296,18 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co
290296
%% online consumers should receive a metadata update frame (stream deleted)
291297
%% we unqueue this frame before closing the connection
292298
%% directly closing the connection of the cancelled consumer
299+
%% Edge case:
300+
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
301+
%% This is because the active consumer is removed from the group and this triggers
302+
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
303+
%% stream is deleted, so the rebalancing does not take place.
304+
%% We just tolerate an extra frame when closing their respective connections.
293305
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
294306
log("Expecting frame in consumer ~p", [K]),
295307
{Cmd1, C1} = receive_commands(S0, C0),
296308
log("Received ~p", [Cmd1]),
297309
log("Closing"),
298-
{ok, _} = stream_test_utils:close(S0, C1);
310+
{ok, _} = close_connection(S0, C1);
299311
(K, {S0, C0}) ->
300312
log("Closing ~p", [K]),
301313
{ok, _} = stream_test_utils:close(S0, C0)
@@ -395,12 +407,18 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) -
395407
%% online consumers should receive a metadata update frame (stream deleted)
396408
%% we unqueue this frame before closing the connection
397409
%% directly closing the connection of the cancelled consumer
410+
%% Edge case:
411+
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
412+
%% This is because the active consumer is removed from the group and this triggers
413+
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
414+
%% stream is deleted, so the rebalancing does not take place.
415+
%% We just tolerate an extra frame when closing their respective connections.
398416
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
399417
log("Expecting frame in consumer ~p", [K]),
400418
{Cmd1, C1} = receive_commands(S0, C0),
401419
log("Received ~p", [Cmd1]),
402420
log("Closing"),
403-
{ok, _} = stream_test_utils:close(S0, C1);
421+
{ok, _} = close_connection(S0, C1);
404422
(K, {S0, C0}) ->
405423
log("Closing ~p", [K]),
406424
{ok, _} = stream_test_utils:close(S0, C0)
@@ -516,12 +534,18 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit
516534
%% online consumers should receive a metadata update frame (stream deleted)
517535
%% we unqueue this frame before closing the connection
518536
%% directly closing the connection of the cancelled consumer
537+
%% Edge case:
538+
%% the waiting consumer can get 2 frames: consumer_update then metadata_update.
539+
%% This is because the active consumer is removed from the group and this triggers
540+
%% a rebalancing. The 2 remaining consumers are most of the time cancelled when the
541+
%% stream is deleted, so the rebalancing does not take place.
542+
%% We just tolerate an extra frame when closing their respective connections.
519543
maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId ->
520544
log("Expecting frame in consumer ~p", [K]),
521545
{Cmd1, C1} = receive_commands(S0, C0),
522546
log("Received ~p", [Cmd1]),
523547
log("Closing"),
524-
{ok, _} = stream_test_utils:close(S0, C1);
548+
{ok, _} = close_connection(S0, C1);
525549
(K, {S0, C0}) ->
526550
log("Closing ~p", [K]),
527551
{ok, _} = stream_test_utils:close(S0, C0)
@@ -858,3 +882,19 @@ log(Format) ->
858882

859883
log(Format, Args) ->
860884
ct:pal(Format, Args).
885+
886+
close_connection(Sock, C) ->
887+
CloseReason = <<"OK">>,
888+
CloseFrame = rabbit_stream_core:frame({request, 1, {close, ?RESPONSE_CODE_OK, CloseReason}}),
889+
ok = gen_tcp:send(Sock, CloseFrame),
890+
pump_until_close(Sock, C, 10).
891+
892+
pump_until_close(_, _, 0) ->
893+
ct:fail("did not get close response");
894+
pump_until_close(Sock, C0, N) ->
895+
case stream_test_utils:receive_stream_commands(Sock, C0) of
896+
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C1} ->
897+
{ok, C1};
898+
{_Cmd, C1} ->
899+
pump_until_close(Sock, C1, N - 1)
900+
end.

0 commit comments

Comments
 (0)