Skip to content

Commit bb5e1d9

Browse files
committed
Local shovels: handle credit on sender side
1 parent 78167f0 commit bb5e1d9

File tree

2 files changed

+30
-37
lines changed

2 files changed

+30
-37
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -318,10 +318,10 @@ handle_dest(_Msg, State) ->
318318
State.
319319

320320
ack(DeliveryTag, Multiple, State) ->
321-
settle(complete, DeliveryTag, Multiple, State).
321+
maybe_grant_or_stash_credit(settle(complete, DeliveryTag, Multiple, State)).
322322

323323
nack(DeliveryTag, Multiple, State) ->
324-
settle(discard, DeliveryTag, Multiple, State).
324+
maybe_grant_or_stash_credit(settle(discard, DeliveryTag, Multiple, State)).
325325

326326
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
327327
unacked := Unacked} = Dest,
@@ -398,11 +398,6 @@ parse_parameter(Param, Fun, Value) ->
398398
fail({invalid_parameter_value, Param, Err})
399399
end.
400400

401-
parse_non_negative_integer(N) when is_integer(N) andalso N >= 0 ->
402-
N;
403-
parse_non_negative_integer(N) ->
404-
fail({require_non_negative_integer, N}).
405-
406401
parse_binary(Binary) when is_binary(Binary) ->
407402
Binary;
408403
parse_binary(NotABinary) ->
@@ -431,8 +426,8 @@ handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
431426
fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
432427
DeliveryTag = next_tag(S0),
433428
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
434-
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, sent_delivery(S))
435-
end, State, Msgs)).
429+
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
430+
end, sent_delivery(State, length(Msgs)), Msgs)).
436431

437432
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
438433
DeliveryTag.
@@ -443,15 +438,13 @@ increase_next_tag(#{source := Source = #{current := Current = #{next_tag := Deli
443438
handle_dest_queue_actions(Actions, State) ->
444439
lists:foldl(
445440
fun({settled, _QName, MsgSeqNos}, S0) ->
446-
maybe_grant_or_stash_credit(
447-
confirm_to_inbound(fun(Tag, StateX) ->
448-
rabbit_shovel_behaviour:ack(Tag, false, StateX)
449-
end, MsgSeqNos, S0));
441+
confirm_to_inbound(fun(Tag, StateX) ->
442+
rabbit_shovel_behaviour:ack(Tag, false, StateX)
443+
end, MsgSeqNos, S0);
450444
({rejected, _QName, MsgSeqNos}, S0) ->
451-
maybe_grant_or_stash_credit(
452-
confirm_to_inbound(fun(Tag, StateX) ->
453-
rabbit_shovel_behaviour:nack(Tag, false, StateX)
454-
end, MsgSeqNos, S0));
445+
confirm_to_inbound(fun(Tag, StateX) ->
446+
rabbit_shovel_behaviour:nack(Tag, false, StateX)
447+
end, MsgSeqNos, S0);
455448
%% TODO handle {block, QName}
456449
(_Action, S0) ->
457450
S0
@@ -646,25 +639,25 @@ confirm_to_inbound(ConfirmFun, SeqNos, State)
646639
confirm_to_inbound(ConfirmFun, Seq,
647640
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
648641
#{Seq := InTag} = Unacked,
649-
State = ConfirmFun(InTag, State0),
650642
Unacked1 = maps:remove(Seq, Unacked),
651-
rabbit_shovel_behaviour:decr_remaining(
652-
1, State#{dest => Dst#{unacked => Unacked1}}).
643+
State = rabbit_shovel_behaviour:decr_remaining(
644+
1, State0#{dest => Dst#{unacked => Unacked1}}),
645+
ConfirmFun(InTag, State).
653646

654647
sent_delivery(#{source := #{delivery_count := DeliveryCount0,
655648
credit := Credit0,
656649
queue_delivery_count := QDeliveryCount0,
657650
queue_credit := QCredit0} = Src
658-
} = State0) ->
659-
DeliveryCount = serial_number:add(DeliveryCount0, 1),
660-
Credit = max(0, Credit0 - 1),
661-
QDeliveryCount = serial_number:add(QDeliveryCount0, 1),
662-
QCredit = max(0, QCredit0 - 1),
663-
State = State0#{source => Src#{credit => Credit,
664-
delivery_count => DeliveryCount,
665-
queue_credit => QCredit,
666-
queue_delivery_count => QDeliveryCount
667-
}}.
651+
} = State0, NumMsgs) ->
652+
DeliveryCount = serial_number:add(DeliveryCount0, NumMsgs),
653+
Credit = max(0, Credit0 - NumMsgs),
654+
QDeliveryCount = serial_number:add(QDeliveryCount0, NumMsgs),
655+
QCredit = max(0, QCredit0 - NumMsgs),
656+
State0#{source => Src#{credit => Credit,
657+
delivery_count => DeliveryCount,
658+
queue_credit => QCredit,
659+
queue_delivery_count => QDeliveryCount
660+
}}.
668661

669662
maybe_grant_or_stash_credit(#{source := #{queue := QName0,
670663
credit := Credit,

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -958,8 +958,8 @@ local_to_local_credit_flow(Config, AckMode) ->
958958
{<<"dest-queue">>, Dest},
959959
{<<"ack-mode">>, AckMode}
960960
]),
961-
publish_many(Sess, Src, Dest, <<"tag1">>, 500),
962-
expect_many(Sess, Dest, 500)
961+
publish_many(Sess, Src, Dest, <<"tag1">>, 1000),
962+
expect_many(Sess, Dest, 1000)
963963
end).
964964

965965
local_to_local_quorum_credit_flow_on_confirm(Config) ->
@@ -988,8 +988,8 @@ local_to_local_quorum_credit_flow(Config, AckMode) ->
988988
{<<"dest-predeclared">>, true},
989989
{<<"ack-mode">>, AckMode}
990990
]),
991-
publish_many(Sess, Src, Dest, <<"tag1">>, 500),
992-
expect_many(Sess, Dest, 500)
991+
publish_many(Sess, Src, Dest, <<"tag1">>, 1000),
992+
expect_many(Sess, Dest, 1000)
993993
end).
994994

995995
local_to_local_stream_credit_flow_on_confirm(Config) ->
@@ -1020,12 +1020,12 @@ local_to_local_stream_credit_flow(Config, AckMode) ->
10201020
]),
10211021

10221022
Receiver = subscribe(Sess, Dest),
1023-
publish_many(Sess, Src, Dest, <<"tag1">>, 500),
1024-
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 500}, _}],
1023+
publish_many(Sess, Src, Dest, <<"tag1">>, 1000),
1024+
?awaitMatch([{_Name, dynamic, {running, _}, #{forwarded := 1000}, _}],
10251025
rabbit_ct_broker_helpers:rpc(Config, 0,
10261026
rabbit_shovel_status, status, []),
10271027
30000),
1028-
_ = expect(Receiver, 500, []),
1028+
_ = expect(Receiver, 1000, []),
10291029
amqp10_client:detach_link(Receiver)
10301030
end).
10311031

0 commit comments

Comments
 (0)