Skip to content

Commit aeb1bd1

Browse files
authored
[fix][test] Fix flaky KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (#24955)
1 parent 0896c0a commit aeb1bd1

File tree

3 files changed

+18
-17
lines changed

3 files changed

+18
-17
lines changed

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public CompletableFuture<ReadHandle> execute() {
260260
} else {
261261
return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
262262
lh.getLedgerMetadata(), lh.entries,
263-
PulsarMockBookKeeper.this::getReadHandleInterceptor));
263+
PulsarMockBookKeeper.this::getReadHandleInterceptor, lh.totalLengthCounter));
264264
}
265265
});
266266
}
@@ -303,6 +303,7 @@ public void shutdown() {
303303
}
304304
for (PulsarMockLedgerHandle ledger : ledgers.values()) {
305305
ledger.entries.clear();
306+
ledger.totalLengthCounter.set(0);
306307
}
307308
scheduler.shutdown();
308309
ledgers.clear();

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
package org.apache.bookkeeper.client;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
import com.google.common.collect.Lists;
2322
import io.netty.buffer.ByteBuf;
2423
import io.netty.buffer.Unpooled;
2524
import java.security.GeneralSecurityException;
2625
import java.util.ArrayDeque;
2726
import java.util.ArrayList;
2827
import java.util.Arrays;
28+
import java.util.Collections;
2929
import java.util.Enumeration;
3030
import java.util.List;
3131
import java.util.Queue;
3232
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.atomic.AtomicLong;
3334
import lombok.Getter;
3435
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
3536
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -53,7 +54,7 @@
5354
*/
5455
public class PulsarMockLedgerHandle extends LedgerHandle {
5556

56-
final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
57+
final List<LedgerEntryImpl> entries = Collections.synchronizedList(new ArrayList<>());
5758
final PulsarMockBookKeeper bk;
5859
final long id;
5960
final DigestType digest;
@@ -63,6 +64,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
6364
@VisibleForTesting
6465
@Getter
6566
boolean fenced = false;
67+
// Count for total length of the entries
68+
final AtomicLong totalLengthCounter = new AtomicLong(0);
6669

6770
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
6871
DigestType digest, byte[] passwd) throws GeneralSecurityException {
@@ -73,7 +76,8 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
7376
this.digest = digest;
7477
this.passwd = Arrays.copyOf(passwd, passwd.length);
7578

76-
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor);
79+
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries,
80+
bk::getReadHandleInterceptor, totalLengthCounter);
7781
}
7882

7983
@Override
@@ -159,6 +163,7 @@ public long addEntry(byte[] data) throws InterruptedException, BKException {
159163
}
160164

161165
lastEntry = entries.size();
166+
totalLengthCounter.addAndGet(data.length);
162167
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data)));
163168
return lastEntry;
164169
}
@@ -191,6 +196,7 @@ public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object
191196
lastEntry = entries.size();
192197
byte[] storedData = new byte[data.readableBytes()];
193198
data.readBytes(storedData);
199+
totalLengthCounter.addAndGet(storedData.length);
194200
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
195201
storedData.length, Unpooled.wrappedBuffer(storedData)));
196202
return FutureUtils.value(lastEntry);
@@ -231,12 +237,7 @@ public long getLastAddConfirmed() {
231237

232238
@Override
233239
public long getLength() {
234-
long length = 0;
235-
for (LedgerEntryImpl entry : entries) {
236-
length += entry.getLength();
237-
}
238-
239-
return length;
240+
return totalLengthCounter.get();
240241
}
241242

242243

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.atomic.AtomicLong;
2425
import java.util.function.Supplier;
2526
import lombok.extern.slf4j.Slf4j;
2627
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -42,15 +43,18 @@ class PulsarMockReadHandle implements ReadHandle {
4243
private final LedgerMetadata metadata;
4344
private final List<LedgerEntryImpl> entries;
4445
private final Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier;
46+
private final AtomicLong totalLengthCounter;
4547

4648
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata,
4749
List<LedgerEntryImpl> entries,
48-
Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier) {
50+
Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier,
51+
AtomicLong totalLengthCounter) {
4952
this.bk = bk;
5053
this.ledgerId = ledgerId;
5154
this.metadata = metadata;
5255
this.entries = entries;
5356
this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
57+
this.totalLengthCounter = totalLengthCounter;
5458
}
5559

5660
@Override
@@ -99,12 +103,7 @@ public long getLastAddConfirmed() {
99103

100104
@Override
101105
public long getLength() {
102-
long length = 0;
103-
for (LedgerEntryImpl entry : entries) {
104-
length += entry.getLength();
105-
}
106-
107-
return length;
106+
return totalLengthCounter.get();
108107
}
109108

110109
@Override

0 commit comments

Comments
 (0)