51
51
% % sequence number initialized at an arbitrary point by the sender."
52
52
% % See rabbit_amqp_session.erl
53
53
-define (INITIAL_DELIVERY_COUNT , 16#ff_ff_ff_ff - 4 ).
54
+ -define (DEFAULT_MAX_LINK_CREDIT , 170 ).
54
55
55
56
-record (pending_ack , {
56
57
delivery_tag ,
@@ -125,7 +126,6 @@ connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
125
126
end .
126
127
127
128
init_source (State = #{source := #{queue := QName0 ,
128
- prefetch_count := Prefetch ,
129
129
consumer_args := Args ,
130
130
current := #{queue_states := QState0 ,
131
131
vhost := VHost } = Current } = Src ,
@@ -138,6 +138,8 @@ init_source(State = #{source := #{queue := QName0,
138
138
false ->
139
139
{credited , credit_api_v1 }
140
140
end ,
141
+ MaxLinkCredit = application :get_env (
142
+ rabbit , max_link_credit , ? DEFAULT_MAX_LINK_CREDIT ),
141
143
QName = rabbit_misc :r (VHost , queue , QName0 ),
142
144
CTag = consumer_tag (Name ),
143
145
case rabbit_amqqueue :with (
@@ -166,7 +168,7 @@ init_source(State = #{source := #{queue := QName0,
166
168
end
167
169
end ) of
168
170
{Remaining , {ok , QState1 }} ->
169
- {ok , QState , Actions } = rabbit_queue_type :credit (QName , CTag , ? INITIAL_DELIVERY_COUNT , Prefetch , false , QState1 ),
171
+ {ok , QState , Actions } = rabbit_queue_type :credit (QName , CTag , ? INITIAL_DELIVERY_COUNT , MaxLinkCredit , false , QState1 ),
170
172
% % TODO handle actions
171
173
State2 = State #{source => Src #{current => Current #{queue_states => QState ,
172
174
consumer_tag => CTag },
0 commit comments