Skip to content

Commit d42cdde

Browse files
committed
Local shovels: renew credit
1 parent 0950a8f commit d42cdde

File tree

2 files changed

+64
-12
lines changed

2 files changed

+64
-12
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ init_source(State = #{source := #{queue := QName0,
138138
false ->
139139
{credited, credit_api_v1}
140140
end,
141-
MaxLinkCredit = application:get_env(
142-
rabbit, max_link_credit, ?DEFAULT_MAX_LINK_CREDIT),
141+
MaxLinkCredit = max_link_credit(),
143142
QName = rabbit_misc:r(VHost, queue, QName0),
144143
CTag = consumer_tag(Name),
145144
case rabbit_amqqueue:with(
@@ -173,7 +172,9 @@ init_source(State = #{source := #{queue := QName0,
173172
State2 = State#{source => Src#{current => Current#{queue_states => QState,
174173
consumer_tag => CTag},
175174
remaining => Remaining,
176-
remaining_unacked => Remaining}},
175+
remaining_unacked => Remaining,
176+
delivery_count => ?INITIAL_DELIVERY_COUNT,
177+
credit => MaxLinkCredit}},
177178
handle_queue_actions(Actions, State2);
178179
{0, {error, autodelete}} ->
179180
exit({shutdown, autodelete});
@@ -410,8 +411,10 @@ handle_queue_actions(Actions, State) ->
410411
lists:foldl(
411412
fun({deliver, _CTag, AckRequired, Msgs}, S0) ->
412413
handle_deliver(AckRequired, Msgs, S0);
413-
(Action, S0) ->
414+
({credit_reply, _, _, _, _, _}, S0) ->
414415
%% TODO handle credit_reply
416+
S0;
417+
(Action, S0) ->
415418
rabbit_log:warning("ACTION NOT HANDLED ~p", [Action]),
416419
S0
417420
%% ({queue_down, QRef}, S0) ->
@@ -426,8 +429,8 @@ handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
426429
lists:foldl(fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
427430
DeliveryTag = next_tag(S0),
428431
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
429-
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
430-
end, State, Msgs).
432+
sent_pending_delivery(rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S))
433+
end, State, Msgs).
431434

432435
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
433436
DeliveryTag.
@@ -642,3 +645,31 @@ confirm_to_inbound(ConfirmFun, Seq, Multiple,
642645
rabbit_shovel_behaviour:decr_remaining(Removed,
643646
State#{dest =>
644647
Dst#{unacked => Unacked1}}).
648+
649+
sent_pending_delivery(#{source := #{current := #{consumer_tag := CTag,
650+
vhost := VHost,
651+
queue_states := QState0
652+
} = Current,
653+
delivery_count := DeliveryCount0,
654+
credit := Credit0,
655+
queue := QName0} = Src} = State0) ->
656+
%% TODO add check for credit request in-flight
657+
QName = rabbit_misc:r(VHost, queue, QName0),
658+
DeliveryCount = serial_number:add(DeliveryCount0, 1),
659+
Credit = max(0, Credit0 - 1),
660+
{ok, QState, Actions} = case Credit =:= 0 of
661+
true ->
662+
rabbit_queue_type:credit(
663+
QName, CTag, DeliveryCount, max_link_credit(),
664+
false, QState0);
665+
false ->
666+
{ok, QState0, []}
667+
end,
668+
State = State0#{source => Src#{current => Current#{queue_states => QState},
669+
credit => Credit,
670+
delivery_count => DeliveryCount
671+
}},
672+
handle_queue_actions(Actions, State).
673+
674+
max_link_credit() ->
675+
application:get_env(rabbit, max_link_credit, ?DEFAULT_MAX_LINK_CREDIT).

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ groups() ->
6565
local_to_local_delete_src_queue,
6666
local_to_local_delete_dest_queue,
6767
local_to_local_vhost_access,
68-
local_to_local_user_access
68+
local_to_local_user_access,
69+
local_to_local_credit_flow
6970
]}
7071
].
7172

@@ -916,6 +917,21 @@ local_to_local_user_access(Config) ->
916917
none]),
917918
shovel_test_utils:await_no_shovel(Config, ?PARAM).
918919

920+
local_to_local_credit_flow(Config) ->
921+
Src = ?config(srcq, Config),
922+
Dest = ?config(destq, Config),
923+
with_session(Config,
924+
fun (Sess) ->
925+
shovel_test_utils:set_param(Config, ?PARAM,
926+
[{<<"src-protocol">>, <<"local">>},
927+
{<<"src-queue">>, Src},
928+
{<<"dest-protocol">>, <<"local">>},
929+
{<<"dest-queue">>, Dest}
930+
]),
931+
publish_many(Sess, Src, Dest, <<"tag1">>, 500),
932+
expect_many(Sess, Dest, 500)
933+
end).
934+
919935
%%----------------------------------------------------------------------------
920936
with_session(Config, Fun) ->
921937
with_session(Config, <<"/">>, Fun).
@@ -944,27 +960,32 @@ publish(Sender, Tag, Payload) when is_binary(Payload) ->
944960
exit(publish_disposition_not_received)
945961
end.
946962

947-
publish(Session, Source, Dest, Tag, Payload) ->
963+
publish(Session, Source, Dest, Tag, Payloads) ->
948964
LinkName = <<"dynamic-sender-", Dest/binary>>,
949965
{ok, Sender} = amqp10_client:attach_sender_link(Session, LinkName, Source,
950966
unsettled, unsettled_state),
951967
ok = await_amqp10_event(link, Sender, attached),
952-
publish(Sender, Tag, Payload),
968+
case is_list(Payloads) of
969+
true ->
970+
[publish(Sender, Tag, Payload) || Payload <- Payloads];
971+
false ->
972+
publish(Sender, Tag, Payloads)
973+
end,
953974
amqp10_client:detach_link(Sender).
954975

955976
publish_expect(Session, Source, Dest, Tag, Payload) ->
956977
publish(Session, Source, Dest, Tag, Payload),
957978
expect_one(Session, Dest).
958979

959980
publish_many(Session, Source, Dest, Tag, N) ->
960-
[publish(Session, Source, Dest, Tag, integer_to_binary(Payload))
961-
|| Payload <- lists:seq(1, N)].
981+
Payloads = [integer_to_binary(Payload) || Payload <- lists:seq(1, N)],
982+
publish(Session, Source, Dest, Tag, Payloads).
962983

963984
await_amqp10_event(On, Ref, Evt) ->
964985
receive
965986
{amqp10_event, {On, Ref, Evt}} -> ok
966987
after 5000 ->
967-
exit({amqp10_event_timeout, On, Ref, Evt})
988+
exit({amqp10_event_timeout, On, Ref, Evt})
968989
end.
969990

970991
expect_one(Session, Dest) ->

0 commit comments

Comments
 (0)