Skip to content

Commit 906130e

Browse files
BewareMyPowersrinath-ctds
authored andcommitted
[fix][broker] Fix deduplication replay might never complete for exceptions (apache#24511)
(cherry picked from commit d95cf3f) (cherry picked from commit 0c1ccb8)
1 parent fd60413 commit 906130e

File tree

7 files changed

+536
-131
lines changed

7 files changed

+536
-131
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import org.apache.pulsar.common.classification.InterfaceAudience;
23+
import org.apache.pulsar.common.classification.InterfaceStability;
24+
25+
@InterfaceAudience.LimitedPrivate
26+
@InterfaceStability.Evolving
27+
public interface EntryProcessor {
28+
29+
void process(Position position, ByteBuf buffer);
30+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
import static org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntries;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Executor;
25+
import lombok.Getter;
26+
import lombok.RequiredArgsConstructor;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.jspecify.annotations.Nullable;
29+
30+
/**
31+
* The task to perform replay on the whole managed ledger from a given position.
32+
*/
33+
@RequiredArgsConstructor
34+
@Slf4j
35+
public class ManagedLedgerReplayTask {
36+
37+
private final String name;
38+
private final Executor executor; // run user-provided processor on entry
39+
private final int maxEntriesPerRead;
40+
@Getter // NOTE: the getter must be called in the callback of `replay` for thread safety
41+
private int numEntriesProcessed = 0;
42+
43+
/**
44+
* This method will read entries from `cursor` until the last confirmed entry. `processor` will be applied on each
45+
* entry.
46+
*
47+
* @param cursor the managed cursor to read entries
48+
* @param processor the user-provided processor that accepts the position and data buffer of the entry
49+
* @return the future of the optional last position processed:
50+
* 1. If there is no more entry to read, return an empty optional.
51+
* 2. Otherwise, if no exception was thrown, it will always be the position of the last entry.
52+
* 3. If any exception is thrown from {@link EntryProcessor#process}, it will be the position of the last
53+
* entry that has been processed successfully.
54+
* 4. If an unexpected exception is thrown, the future will complete exceptionally.
55+
* @apiNote The implementation of `processor` should not call `release()` on the buffer because this method will
56+
* eventually release the buffer after it's processed.
57+
*/
58+
public CompletableFuture<Optional<Position>> replay(ManagedCursor cursor, EntryProcessor processor) {
59+
try {
60+
numEntriesProcessed = 0;
61+
cursor.setAlwaysInactive(); // don't cache the replayed entries
62+
if (!cursor.hasMoreEntries()) {
63+
return CompletableFuture.completedFuture(Optional.empty());
64+
}
65+
return readAndProcess(cursor, null, processor);
66+
} catch (Throwable throwable) {
67+
return CompletableFuture.failedFuture(throwable);
68+
}
69+
}
70+
71+
private CompletableFuture<Optional<Position>> readAndProcess(
72+
ManagedCursor cursor, @Nullable Position lastProcessedPosition, EntryProcessor processor) {
73+
return readEntries(cursor, maxEntriesPerRead, PositionFactory.LATEST).thenComposeAsync(entries -> {
74+
try {
75+
Position processedPosition = lastProcessedPosition;
76+
for (final var entry : entries) {
77+
final var position = entry.getPosition();
78+
final var buffer = entry.getDataBuffer();
79+
// Pass a duplicated buffer to `processor` in case the buffer is retained and stored somewhere else
80+
// and then process all buffers in batch.
81+
try {
82+
processor.process(position, buffer);
83+
} catch (Throwable throwable) {
84+
log.error("[{}] Failed to process entry {}", name, position, throwable);
85+
return CompletableFuture.completedFuture(Optional.ofNullable(processedPosition));
86+
}
87+
// It does not need to be atomic because the update happens before the future completes
88+
numEntriesProcessed++;
89+
processedPosition = position;
90+
}
91+
if (cursor.hasMoreEntries()) {
92+
return readAndProcess(cursor, processedPosition, processor);
93+
} else {
94+
return CompletableFuture.completedFuture(Optional.ofNullable(processedPosition));
95+
}
96+
} finally {
97+
entries.forEach(Entry::release);
98+
}
99+
}, executor);
100+
}
101+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger.util;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
24+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
25+
import org.apache.bookkeeper.mledger.Entry;
26+
import org.apache.bookkeeper.mledger.ManagedCursor;
27+
import org.apache.bookkeeper.mledger.ManagedLedger;
28+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
29+
import org.apache.bookkeeper.mledger.Position;
30+
import org.apache.pulsar.common.classification.InterfaceStability;
31+
32+
/**
33+
* This util class contains some future-based methods to replace callback-based APIs. With a callback-based API, if any
34+
* exception is thrown in the callback, the callback will never have a chance to be called. While with a future-based
35+
* API, if any exception is thrown in future's callback (e.g. `thenApply`), the future will eventually be completed
36+
* exceptionally. In addition, future-based API is easier for users to switch a different executor to execute the
37+
* callback (e.g. `thenApplyAsync`).
38+
*/
39+
@InterfaceStability.Evolving
40+
public class ManagedLedgerUtils {
41+
42+
public static CompletableFuture<ManagedCursor> openCursor(ManagedLedger ml, String cursorName) {
43+
final var future = new CompletableFuture<ManagedCursor>();
44+
ml.asyncOpenCursor(cursorName, new AsyncCallbacks.OpenCursorCallback() {
45+
@Override
46+
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
47+
future.complete(cursor);
48+
}
49+
50+
@Override
51+
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
52+
future.completeExceptionally(exception);
53+
}
54+
}, null);
55+
return future;
56+
}
57+
58+
public static CompletableFuture<List<Entry>> readEntries(ManagedCursor cursor, int numberOfEntriesToRead,
59+
Position maxPosition) {
60+
final var future = new CompletableFuture<List<Entry>>();
61+
cursor.asyncReadEntries(numberOfEntriesToRead, new AsyncCallbacks.ReadEntriesCallback() {
62+
@Override
63+
public void readEntriesComplete(List<Entry> entries, Object ctx) {
64+
future.complete(entries);
65+
}
66+
67+
@Override
68+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
69+
future.completeExceptionally(exception);
70+
}
71+
}, null, maxPosition);
72+
return future;
73+
}
74+
75+
public static CompletableFuture<Void> markDelete(ManagedCursor cursor, Position position,
76+
Map<String, Long> properties) {
77+
final var future = new CompletableFuture<Void>();
78+
cursor.asyncMarkDelete(position, properties, new AsyncCallbacks.MarkDeleteCallback() {
79+
@Override
80+
public void markDeleteComplete(Object ctx) {
81+
future.complete(null);
82+
}
83+
84+
@Override
85+
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
86+
future.completeExceptionally(exception);
87+
}
88+
}, null);
89+
return future;
90+
}
91+
}

0 commit comments

Comments
 (0)