Skip to content

Commit 90d6171

Browse files
committed
Local shovel: fix initial delivery count and state handling
1 parent 0833155 commit 90d6171

File tree

3 files changed

+17
-50
lines changed

3 files changed

+17
-50
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,10 @@ init_source(State = #{source := #{queue := QName0,
131131
vhost := VHost} = Current} = Src,
132132
name := Name,
133133
ack_mode := AckMode}) ->
134-
%% Should it just use v2 ?
134+
%% TODO put this shovel behind the rabbitmq_4.0.0 feature flag
135135
Mode = case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
136136
true ->
137-
{credited, Prefetch};
137+
{credited, ?INITIAL_DELIVERY_COUNT};
138138
false ->
139139
{credited, credit_api_v1}
140140
end,
@@ -152,7 +152,7 @@ init_source(State = #{source := #{queue := QName0,
152152
channel_pid => self(),
153153
limiter_pid => none,
154154
limiter_active => false,
155-
mode => Mode, %%{simple_prefetch, Prefetch},
155+
mode => Mode,
156156
consumer_tag => CTag,
157157
exclusive_consume => false,
158158
args => Args,
@@ -168,10 +168,11 @@ init_source(State = #{source := #{queue := QName0,
168168
{Remaining, {ok, QState1}} ->
169169
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, ?INITIAL_DELIVERY_COUNT, Prefetch, false, QState1),
170170
%% TODO handle actions
171-
State#{source => Src#{current => Current#{queue_states => QState,
172-
consumer_tag => CTag},
173-
remaining => Remaining,
174-
remaining_unacked => Remaining}};
171+
State2 = State#{source => Src#{current => Current#{queue_states => QState,
172+
consumer_tag => CTag},
173+
remaining => Remaining,
174+
remaining_unacked => Remaining}},
175+
handle_queue_actions(Actions, State2);
175176
{0, {error, autodelete}} ->
176177
exit({shutdown, autodelete});
177178
{_Remaining, {error, Reason}} ->
@@ -262,12 +263,6 @@ close_source(_) ->
262263
%% No consumer tag, no consumer to cancel
263264
ok.
264265

265-
handle_source(#'basic.ack'{delivery_tag = Seq, multiple = Multiple},
266-
State = #{ack_mode := on_confirm}) ->
267-
confirm_to_inbound(fun(Tag, Multi, StateX) ->
268-
rabbit_shovel_behaviour:ack(Tag, Multi, StateX)
269-
end, Seq, Multiple, State);
270-
271266
handle_source({queue_event, #resource{name = Queue,
272267
kind = queue,
273268
virtual_host = VHost} = QRef, Evt},
@@ -413,8 +408,10 @@ handle_queue_actions(Actions, State) ->
413408
lists:foldl(
414409
fun({deliver, _CTag, AckRequired, Msgs}, S0) ->
415410
handle_deliver(AckRequired, Msgs, S0);
416-
(_, _) ->
417-
not_handled
411+
(Action, S0) ->
412+
%% TODO handle credit_reply
413+
rabbit_log:warning("ACTION NOT HANDLED ~p", [Action]),
414+
S0
418415
%% ({queue_down, QRef}, S0) ->
419416
%% State;
420417
%% ({block, QName}, S0) ->
@@ -563,9 +560,9 @@ settle(Op, DeliveryTag, Multiple, #{unacked_message_q := UAMQ0,
563560
MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked],
564561
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
565562
{ok, QState1, Actions} ->
566-
QState = handle_queue_actions(Actions, QState1),
567-
State#{source => Src#{current => Current#{queue_states => QState}},
568-
unacked_message_q => UAMQ};
563+
State#{source => Src#{current => Current#{queue_states => QState1}},
564+
unacked_message_q => UAMQ},
565+
handle_queue_actions(Actions, State);
569566
{'protocol_error', Type, Reason, Args} ->
570567
rabbit_log:error("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
571568
[Op, Type, io_lib:format(Reason, Args)]),

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ handle_info(Msg, State) ->
108108
handle_msg(Msg, State).
109109

110110
handle_msg(Msg, State = #state{config = Config, name = Name}) ->
111-
rabbit_log:warning("HANDLING MESSAGE ~p", [Msg]),
112111
case rabbit_shovel_behaviour:handle_source(Msg, Config) of
113112
not_handled ->
114113
case rabbit_shovel_behaviour:handle_dest(Msg, Config) of

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ groups() ->
5151
local_to_local_no_ack,
5252
local_to_local_quorum_no_ack,
5353
local_to_local_stream_no_ack,
54-
local_to_local_dest_stream_no_ack,
5554
local_to_local_on_publish,
5655
local_to_local_quorum_on_publish,
5756
local_to_local_stream_on_publish,
@@ -611,42 +610,14 @@ local_to_local_quorum_no_ack(Config) ->
611610
local_to_local_stream_no_ack(Config) ->
612611
Src = ?config(srcq, Config),
613612
Dest = ?config(destq, Config),
614-
VHost = <<"/">>,
615-
declare_queue(Config, VHost, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
616-
declare_queue(Config, VHost, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
617-
shovel_test_utils:set_param_nowait(Config, ?PARAM,
618-
[{<<"src-protocol">>, <<"local">>},
619-
{<<"src-predeclared">>, true},
620-
{<<"src-queue">>, Src},
621-
{<<"dest-protocol">>, <<"local">>},
622-
{<<"dest-predeclared">>, true},
623-
{<<"dest-queue">>, Dest},
624-
{<<"ack-mode">>, <<"no-ack">>}
625-
]),
626-
%% Streams require consumer acknowledgments
627-
shovel_test_utils:await_no_shovel(Config, ?PARAM),
628-
%% The shovel parameter is only deleted when 'delete-after'
629-
%% is used. In any other failure, the shovel should
630-
%% remain and try to restart
631-
?awaitMatch([{_Name, dynamic, {terminated, _}, _, _}],
632-
rabbit_ct_broker_helpers:rpc(Config, 0,
633-
rabbit_shovel_status, status, []),
634-
30000),
635-
?assertNotMatch(
636-
not_found,
637-
rabbit_ct_broker_helpers:rpc(
638-
Config, 0, rabbit_runtime_parameters, lookup,
639-
[VHost, <<"shovel">>, ?PARAM])).
640-
641-
local_to_local_dest_stream_no_ack(Config) ->
642-
Src = ?config(srcq, Config),
643-
Dest = ?config(destq, Config),
613+
declare_queue(Config, <<"/">>, Src, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
644614
declare_queue(Config, <<"/">>, Dest, [{<<"x-queue-type">>, longstr, <<"stream">>}]),
645615
with_session(Config,
646616
fun (Sess) ->
647617
shovel_test_utils:set_param(Config, ?PARAM,
648618
[{<<"src-protocol">>, <<"local">>},
649619
{<<"src-queue">>, Src},
620+
{<<"src-predeclared">>, true},
650621
{<<"dest-protocol">>, <<"local">>},
651622
{<<"dest-predeclared">>, true},
652623
{<<"dest-queue">>, Dest},

0 commit comments

Comments
 (0)