Skip to content

Commit 7002848

Browse files
Loïc Hoguinmergify[bot]
authored andcommitted
CQv1: Don't limit messages in memory based on consume rate
The v1 index is not optimised for reading messages except when the entire segment is read. So we always do that. This change was made because when the read is inefficient and TTL is used the queue can get unresponsive while getting the TTL messages dropped. In that case the queue may drop messages slower than they expire and as a result will not process any Erlang messages until it has dropped all messages in the queue. (cherry picked from commit d3aa298)
1 parent f2c70d7 commit 7002848

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2601,14 +2601,22 @@ maybe_deltas_to_betas(DelsAndAcksFun,
26012601
count = DeltaCount,
26022602
transient = Transient,
26032603
end_seq_id = DeltaSeqIdEnd } = Delta,
2604+
%% For v1 we always want to read messages up to the next segment boundary.
2605+
%% This is because v1 is not optimised for multiple reads from the same
2606+
%% segment: every time we read messages from a segment it has to read
2607+
%% and parse the entire segment from disk, filtering the messages we
2608+
%% requested afterwards.
2609+
%%
2610+
%% For v2 we want to limit the number of messages read at once to lower
2611+
%% the memory footprint. We use the consume rate to determine how many
2612+
%% messages we read.
2613+
DeltaSeqLimit = case Version of
2614+
1 -> DeltaSeqIdEnd;
2615+
2 -> DeltaSeqId + MemoryLimit
2616+
end,
26042617
DeltaSeqId1 =
26052618
lists:min([IndexMod:next_segment_boundary(DeltaSeqId),
2606-
%% We must limit the number of messages read at once
2607-
%% otherwise the queue will attempt to read up to segment_entry_count()
2608-
%% messages from the index each time. The value is determined
2609-
%% using the consuming rate.
2610-
DeltaSeqId + MemoryLimit,
2611-
DeltaSeqIdEnd]),
2619+
DeltaSeqLimit, DeltaSeqIdEnd]),
26122620
{List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState),
26132621
{List, StoreState2} = case Version of
26142622
1 -> {List0, StoreState};

0 commit comments

Comments
 (0)