Skip to content

Commit f2740c1

Browse files
dao-junBewareMyPower
authored andcommitted
[fix][ml] Fix the possibility of message loss or disorder when ML PayloadProcessor processing fails (apache#24522)
Co-authored-by: Yunze Xu <[email protected]> (cherry picked from commit a14794a) (cherry picked from commit a378bdc)
1 parent 456ee77 commit f2740c1

File tree

7 files changed

+261
-19
lines changed

7 files changed

+261
-19
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,23 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
420420
*/
421421
long getOffloadedSize();
422422

423+
/**
424+
* Resets the exception thrown by the PayloadProcessor during an add entry operation to null.
425+
* <p>
426+
* **Context:** When an add entry operation fails due to an interceptor, all subsequent incoming add entry
427+
* operations will also fail. This behavior ensures message ordering and consistency.
428+
* <p>
429+
* **Important:** This method MUST only be called after all pending add operations are fully completed
430+
* (e.g., after a Topic is unfenced). Calling it prematurely will prevent the Managed Ledger (ML)
431+
* from being able to write indefinitely.
432+
* <p>
433+
* **Implementation Note:** Downstream projects that support the ML PayloadProcessor should implement
434+
* this method. Otherwise, do not implement it.
435+
*/
436+
default void unfenceForInterceptorException() {
437+
// Default implementation does nothing
438+
}
439+
423440
/**
424441
* Get last offloaded ledgerId. If no offloaded yet, it returns 0.
425442
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
238238
private volatile long lastOffloadSuccessTimestamp = 0;
239239
private volatile long lastOffloadFailureTimestamp = 0;
240240

241+
protected volatile ManagedLedgerException interceptorException = null;
242+
241243
private int minBacklogCursorsForCaching = 0;
242244
private int minBacklogEntriesForCaching = 1000;
243245
private int maxBacklogBetweenCursorsForCaching = 1000;
@@ -4305,6 +4307,15 @@ public long getOffloadedSize() {
43054307
return offloadedSize;
43064308
}
43074309

4310+
@Override
4311+
public void unfenceForInterceptorException() {
4312+
this.interceptorException = null;
4313+
}
4314+
4315+
protected void fenceForInterceptorException(ManagedLedgerException e) {
4316+
this.interceptorException = e;
4317+
}
4318+
43084319
@Override
43094320
public long getLastOffloadedLedgerId() {
43104321
return lastOffloadLedgerId;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,16 @@ public void setCloseWhenDone(boolean closeWhenDone) {
132132

133133
public void initiate() {
134134
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {
135-
ByteBuf duplicateBuffer = data.retainedDuplicate();
135+
// Fail the add operation if the managed ledger is in a state that prevents adding entries.
136+
ManagedLedgerException exbw = ml.interceptorException;
137+
if (exbw != null) {
138+
ml.pendingAddEntries.remove(this);
139+
this.failed(exbw);
140+
// Don't recycle the object here, see: https://lists.apache.org/thread/po08w0tkhc7q8gc5khpdft6stxnr1v2y
141+
return;
142+
}
136143

144+
ByteBuf duplicateBuffer = data.retainedDuplicate();
137145
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
138146
lastInitTime = System.nanoTime();
139147
if (ml.getManagedLedgerInterceptor() != null) {
@@ -142,10 +150,14 @@ public void initiate() {
142150
payloadProcessorHandle = ml.getManagedLedgerInterceptor()
143151
.processPayloadBeforeLedgerWrite(this.getCtx(), duplicateBuffer);
144152
} catch (Exception e) {
153+
ManagedLedgerException mle = new ManagedLedgerException.ManagedLedgerInterceptException(e);
154+
ml.fenceForInterceptorException(mle);
145155
ml.pendingAddEntries.remove(this);
146156
ReferenceCountUtil.safeRelease(duplicateBuffer);
147157
log.error("[{}] Error processing payload before ledger write", ml.getName(), e);
148-
this.failed(new ManagedLedgerException.ManagedLedgerInterceptException(e));
158+
this.failed(mle);
159+
// Don't recycle the object here
160+
// see: https://lists.apache.org/thread/po08w0tkhc7q8gc5khpdft6stxnr1v2y
149161
return;
150162
}
151163
if (payloadProcessorHandle != null) {
@@ -170,7 +182,7 @@ public void initiateShadowWrite() {
170182
//Use entryId in PublishContext and call addComplete directly.
171183
this.addComplete(BKException.Code.OK, ledger, ((Position) ctx).getEntryId(), addOpCount);
172184
} else {
173-
log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state);
185+
log.warn("[{}] initiateShadowWrite with unexpected state {}, expect OPEN state.", ml.getName(), state);
174186
}
175187
}
176188

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4240,6 +4240,7 @@ private synchronized void fence() {
42404240

42414241
private synchronized void unfence() {
42424242
isFenced = false;
4243+
ledger.unfenceForInterceptorException();
42434244
cancelFencedTopicMonitoringTask();
42444245
}
42454246

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.Unpooled;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
25+
26+
@SuppressWarnings("unused") // Used by PublishWithMLPayloadProcessorTest
27+
public class ManagedLedgerPayloadProcessor0 implements ManagedLedgerPayloadProcessor {
28+
29+
private final AtomicInteger counter = new AtomicInteger(0);
30+
private final int failAt = 4;
31+
32+
@Override
33+
public Processor inputProcessor() {
34+
return new Processor() {
35+
@Override
36+
public ByteBuf process(Object contextObj, ByteBuf inputPayload) {
37+
if (counter.incrementAndGet() == failAt) {
38+
try {
39+
Thread.sleep(2000);
40+
} catch (InterruptedException e) {
41+
throw new RuntimeException(e);
42+
}
43+
throw new RuntimeException("Failed to process input payload");
44+
}
45+
byte[] bytes = new byte[inputPayload.readableBytes()];
46+
inputPayload.readBytes(bytes);
47+
inputPayload.release();
48+
return Unpooled.wrappedBuffer(bytes, 0, bytes.length).retainedDuplicate();
49+
}
50+
51+
@Override
52+
public void release(ByteBuf processedPayload) {
53+
processedPayload.release();
54+
}
55+
};
56+
}
57+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import lombok.Cleanup;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.pulsar.client.api.Consumer;
29+
import org.apache.pulsar.client.api.Producer;
30+
import org.apache.pulsar.client.api.ProducerConsumerBase;
31+
import org.apache.pulsar.client.api.Schema;
32+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
33+
import org.testng.Assert;
34+
import org.testng.annotations.AfterClass;
35+
import org.testng.annotations.BeforeClass;
36+
import org.testng.annotations.Test;
37+
38+
@Slf4j
39+
@Test(groups = "broker")
40+
public class PublishWithMLPayloadProcessorTest extends ProducerConsumerBase {
41+
42+
@BeforeClass(alwaysRun = true)
43+
@Override
44+
protected void setup() throws Exception {
45+
conf.setBrokerEntryPayloadProcessors(
46+
Collections.singleton(ManagedLedgerPayloadProcessor0.class.getName()));
47+
super.internalSetup();
48+
super.producerBaseSetup();
49+
admin.tenants().createTenant("my-test-tenant", createDefaultTenantInfo());
50+
admin.namespaces().createNamespace("my-test-tenant/my-test-ns");
51+
}
52+
53+
@AfterClass(alwaysRun = true)
54+
@Override
55+
protected void cleanup() throws Exception {
56+
super.internalCleanup();
57+
}
58+
59+
60+
@Test(timeOut = 30_000)
61+
public void testPublishWithoutDeduplication() throws Exception {
62+
String topic = "persistent://my-test-tenant/my-test-ns/testPublishWithoutDeduplication";
63+
admin.topics().createNonPartitionedTopic(topic);
64+
try {
65+
admin.topicPolicies().setDeduplicationStatus(topic, false);
66+
publishAndVerify(topic, false);
67+
} finally {
68+
admin.topics().delete(topic);
69+
}
70+
}
71+
72+
@Test(timeOut = 30_000)
73+
public void testPublishWithDeduplication() throws Exception {
74+
String topic = "persistent://my-test-tenant/my-test-ns/testPublishWithDeduplication";
75+
admin.topics().createNonPartitionedTopic(topic);
76+
try {
77+
admin.topicPolicies().setDeduplicationStatus(topic, true);
78+
publishAndVerify(topic, true);
79+
} finally {
80+
admin.topics().delete(topic);
81+
}
82+
}
83+
84+
85+
private void publishAndVerify(String topic, boolean enableDeduplication) throws Exception {
86+
@Cleanup
87+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
88+
89+
CountDownLatch latch = new CountDownLatch(10);
90+
for (int i = 0; i < 10; i++) {
91+
producer.sendAsync("message-" + i).whenComplete((ignored, e) -> {
92+
if (e != null) {
93+
log.error("Failed to publish message", e);
94+
}
95+
latch.countDown();
96+
});
97+
}
98+
99+
latch.await();
100+
101+
@Cleanup
102+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
103+
.topic(topic).subscriptionName("my-sub")
104+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
105+
106+
List<String> messageContents = new ArrayList<>();
107+
108+
int messageCount = 0;
109+
for (;;) {
110+
var msg = consumer.receive(3, TimeUnit.SECONDS);
111+
if (msg == null) {
112+
break;
113+
}
114+
String content = msg.getValue();
115+
messageCount++;
116+
if (!messageContents.contains(content)) {
117+
messageContents.add(content);
118+
}
119+
consumer.acknowledge(msg);
120+
}
121+
122+
if (enableDeduplication) {
123+
Assert.assertEquals(messageCount, 10, "Expected 10 total messages");
124+
}
125+
Assert.assertEquals(messageContents.size(), 10);
126+
127+
for (int i = 0; i < 10; i++) {
128+
Assert.assertEquals(messageContents.get(i), "message-" + i);
129+
}
130+
}
131+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ public Processor inputProcessor() {
461461
return new Processor() {
462462
@Override
463463
public ByteBuf process(Object contextObj, ByteBuf inputPayload) {
464+
log.info("testManagedLedgerPayloadInputProcessorFailure.process");
464465
Commands.skipBrokerEntryMetadataIfExist(inputPayload);
465466
if (inputPayload.readBoolean()) {
466467
throw new RuntimeException(failureMsg);
@@ -482,26 +483,38 @@ public void release(ByteBuf processedPayload) {
482483
var successCount = new AtomicInteger(0);
483484
var expectedException = new ArrayList<Exception>();
484485

485-
var addEntryCallback = new AsyncCallbacks.AddEntryCallback() {
486-
@Override
487-
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
488-
successCount.incrementAndGet();
489-
countDownLatch.countDown();
490-
}
491-
492-
@Override
493-
public void addFailed(ManagedLedgerException exception, Object ctx) {
494-
// expected
495-
expectedException.add(exception);
496-
countDownLatch.countDown();
497-
}
498-
};
486+
ByteBuf shouldFail = Unpooled.buffer().writeBoolean(true);
487+
ByteBuf shouldSucceed = Unpooled.buffer().writeBoolean(false);
488+
byte[] shouldFailBytes = new byte[shouldFail.readableBytes()];
489+
shouldFail.readBytes(shouldFailBytes);
490+
byte[] shouldSucceedBytes = new byte[shouldSucceed.readableBytes()];
491+
shouldSucceed.readBytes(shouldSucceedBytes);
492+
shouldSucceed.release();
493+
shouldFail.release();
499494

500495
for (int i = 0; i < count; i++) {
501496
if (i % 2 == 0) {
502-
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(true), addEntryCallback, null);
497+
try {
498+
ledger.addEntry(shouldFailBytes);
499+
successCount.incrementAndGet();
500+
countDownLatch.countDown();
501+
} catch (Exception t) {
502+
expectedException.add(t);
503+
countDownLatch.countDown();
504+
} finally {
505+
ledger.unfenceForInterceptorException();
506+
}
503507
} else {
504-
ledger.asyncAddEntry(Unpooled.buffer().writeBoolean(false), addEntryCallback, null);
508+
try {
509+
ledger.addEntry(shouldSucceedBytes);
510+
successCount.incrementAndGet();
511+
countDownLatch.countDown();
512+
} catch (Exception t) {
513+
expectedException.add(t);
514+
countDownLatch.countDown();
515+
} finally {
516+
ledger.unfenceForInterceptorException();
517+
}
505518
}
506519
}
507520

0 commit comments

Comments
 (0)