Skip to content

Commit 9ab8c19

Browse files
3paccccccsrinath-ctds
authored andcommitted
[fix][test] Fix flaky KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached (apache#24955)
(cherry picked from commit aeb1bd1) (cherry picked from commit 421d646)
1 parent 6c17641 commit 9ab8c19

File tree

3 files changed

+18
-17
lines changed

3 files changed

+18
-17
lines changed

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public CompletableFuture<ReadHandle> execute() {
256256
} else {
257257
return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
258258
lh.getLedgerMetadata(), lh.entries,
259-
PulsarMockBookKeeper.this::getReadHandleInterceptor));
259+
PulsarMockBookKeeper.this::getReadHandleInterceptor, lh.totalLengthCounter));
260260
}
261261
});
262262
}
@@ -299,6 +299,7 @@ public void shutdown() {
299299
}
300300
for (PulsarMockLedgerHandle ledger : ledgers.values()) {
301301
ledger.entries.clear();
302+
ledger.totalLengthCounter.set(0);
302303
}
303304
scheduler.shutdown();
304305
ledgers.clear();

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
package org.apache.bookkeeper.client;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
import com.google.common.collect.Lists;
2322
import io.netty.buffer.ByteBuf;
2423
import io.netty.buffer.Unpooled;
2524
import java.security.GeneralSecurityException;
2625
import java.util.ArrayDeque;
2726
import java.util.ArrayList;
2827
import java.util.Arrays;
28+
import java.util.Collections;
2929
import java.util.Enumeration;
3030
import java.util.List;
3131
import java.util.Queue;
3232
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.atomic.AtomicLong;
3334
import lombok.Getter;
3435
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
3536
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -53,7 +54,7 @@
5354
*/
5455
public class PulsarMockLedgerHandle extends LedgerHandle {
5556

56-
final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
57+
final List<LedgerEntryImpl> entries = Collections.synchronizedList(new ArrayList<>());
5758
final PulsarMockBookKeeper bk;
5859
final long id;
5960
final DigestType digest;
@@ -63,6 +64,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
6364
@VisibleForTesting
6465
@Getter
6566
boolean fenced = false;
67+
// Count for total length of the entries
68+
final AtomicLong totalLengthCounter = new AtomicLong(0);
6669

6770
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
6871
DigestType digest, byte[] passwd) throws GeneralSecurityException {
@@ -73,7 +76,8 @@ public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
7376
this.digest = digest;
7477
this.passwd = Arrays.copyOf(passwd, passwd.length);
7578

76-
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, bk::getReadHandleInterceptor);
79+
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries,
80+
bk::getReadHandleInterceptor, totalLengthCounter);
7781
}
7882

7983
@Override
@@ -151,6 +155,7 @@ public long addEntry(byte[] data) throws InterruptedException, BKException {
151155
}
152156

153157
lastEntry = entries.size();
158+
totalLengthCounter.addAndGet(data.length);
154159
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data)));
155160
return lastEntry;
156161
}
@@ -185,6 +190,7 @@ public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object
185190
lastEntry = entries.size();
186191
byte[] storedData = new byte[data.readableBytes()];
187192
data.readBytes(storedData);
193+
totalLengthCounter.addAndGet(storedData.length);
188194
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
189195
storedData.length, Unpooled.wrappedBuffer(storedData)));
190196
return FutureUtils.value(lastEntry);
@@ -225,12 +231,7 @@ public long getLastAddConfirmed() {
225231

226232
@Override
227233
public long getLength() {
228-
long length = 0;
229-
for (LedgerEntryImpl entry : entries) {
230-
length += entry.getLength();
231-
}
232-
233-
return length;
234+
return totalLengthCounter.get();
234235
}
235236

236237

testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.atomic.AtomicLong;
2425
import java.util.function.Supplier;
2526
import lombok.extern.slf4j.Slf4j;
2627
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -42,15 +43,18 @@ class PulsarMockReadHandle implements ReadHandle {
4243
private final LedgerMetadata metadata;
4344
private final List<LedgerEntryImpl> entries;
4445
private final Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier;
46+
private final AtomicLong totalLengthCounter;
4547

4648
PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata,
4749
List<LedgerEntryImpl> entries,
48-
Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier) {
50+
Supplier<PulsarMockReadHandleInterceptor> readHandleInterceptorSupplier,
51+
AtomicLong totalLengthCounter) {
4952
this.bk = bk;
5053
this.ledgerId = ledgerId;
5154
this.metadata = metadata;
5255
this.entries = entries;
5356
this.readHandleInterceptorSupplier = readHandleInterceptorSupplier;
57+
this.totalLengthCounter = totalLengthCounter;
5458
}
5559

5660
@Override
@@ -99,12 +103,7 @@ public long getLastAddConfirmed() {
99103

100104
@Override
101105
public long getLength() {
102-
long length = 0;
103-
for (LedgerEntryImpl entry : entries) {
104-
length += entry.getLength();
105-
}
106-
107-
return length;
106+
return totalLengthCounter.get();
108107
}
109108

110109
@Override

0 commit comments

Comments
 (0)