@@ -2601,14 +2601,22 @@ maybe_deltas_to_betas(DelsAndAcksFun,
26012601 count = DeltaCount ,
26022602 transient = Transient ,
26032603 end_seq_id = DeltaSeqIdEnd } = Delta ,
2604+ % % For v1 we always want to read messages up to the next segment boundary.
2605+ % % This is because v1 is not optimised for multiple reads from the same
2606+ % % segment: every time we read messages from a segment it has to read
2607+ % % and parse the entire segment from disk, filtering the messages we
2608+ % % requested afterwards.
2609+ % %
2610+ % % For v2 we want to limit the number of messages read at once to lower
2611+ % % the memory footprint. We use the consume rate to determine how many
2612+ % % messages we read.
2613+ DeltaSeqLimit = case Version of
2614+ 1 -> DeltaSeqIdEnd ;
2615+ 2 -> DeltaSeqId + MemoryLimit
2616+ end ,
26042617 DeltaSeqId1 =
26052618 lists :min ([IndexMod :next_segment_boundary (DeltaSeqId ),
2606- % % We must limit the number of messages read at once
2607- % % otherwise the queue will attempt to read up to segment_entry_count()
2608- % % messages from the index each time. The value is determined
2609- % % using the consuming rate.
2610- DeltaSeqId + MemoryLimit ,
2611- DeltaSeqIdEnd ]),
2619+ DeltaSeqLimit , DeltaSeqIdEnd ]),
26122620 {List0 , IndexState1 } = IndexMod :read (DeltaSeqId , DeltaSeqId1 , IndexState ),
26132621 {List , StoreState2 } = case Version of
26142622 1 -> {List0 , StoreState };
0 commit comments