Skip to content

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Nov 4, 2025

Motivation

Issue 1: concurrently initialising transaction buffer snapshot

Before #21406, the snapshot would be taken when the persistent topic is initialising, so no concurrency. After #21406, the transaction buffer snapshot is triggered by publishing messages, so concurrency occurs. #21406 forgot to handle this case, which caused the following errors

2025-11-04T22:44:14,413 - WARN  - [pulsar-io-28-3:PersistentTopic] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] Failed to persist msg in store: org.apache.pulsar.broker.service.BrokerServiceException$ServiceUnitNotReadyException: Transaction Buffer take first snapshot failed, the current state is: Ready
2025-11-04T22:44:14,413 - INFO  - [pulsar-io-28-3:PersistentTopic] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] Un-fencing topic...
2025-11-04T22:44:14,414 - INFO  - [pulsar-client-io-96-3:ClientCnx] - [localhost/127.0.0.1:57291] Broker notification of closed producer: 0, assignedBrokerUrl: null, assignedBrokerUrlTls: null
2025-11-04T22:44:14,412 - WARN  - [pulsar-client-io-262-3:ClientCnx] - [id: 0xe9ef6b71, L:/127.0.0.1:57301 - R:localhost/127.0.0.1:57291] Received send error from server: PersistenceError : org.apache.bookkeeper.mledger.ManagedLedgerException: org.apache.pulsar.broker.service.BrokerServiceException$ServiceUnitNotReadyException: Transaction Buffer take first snapshot failed, the current state is: Ready
2025-11-04T22:44:14,412 - WARN  - [pulsar-client-io-262-3:ClientCnx] - [id: 0xe9ef6b71, L:/127.0.0.1:57301 - R:localhost/127.0.0.1:57291] Producer with id 0 not found while handling send error
2025-11-04T22:44:14,413 - INFO  - [pulsar-client-io-96-3:ProducerImpl] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] [test-0-1] Created producer on cnx [id: 0x2f0343b6, L:/127.0.0.1:57296 - R:localhost/127.0.0.1:57291]
2025-11-04T22:44:14,413 - INFO  - [pulsar-client-io-96-3:ProducerImpl] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] [test-0-1] Re-Sending 1 messages to server
2025-11-04T22:44:14,413 - INFO  - [broker-topic-workers-OrderedExecutor-8-0:ServerCnx] - [/127.0.0.1:57298] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973}, client=[id: 0x960ce44f, L:/127.0.0.1:57291 - R:/127.0.0.1:57298] [SR:127.0.0.1, state:Connected], producerName=test-0-3, producerId=0}, role: null
2025-11-04T22:44:14,413 - INFO  - [pulsar-io-28-3:Producer] - Disconnecting producer: Producer{topic=PersistentTopic{topic=persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973}, client=[id: 0x960ce44f, L:/127.0.0.1:57291 - R:/127.0.0.1:57298] [SR:127.0.0.1, state:Connected], producerName=test-0-3, producerId=0}, assignedBrokerLookupData: Optional.empty
2025-11-04T22:44:14,413 - INFO  - [pulsar-io-28-3:Producer] - Disconnecting producer: Producer{topic=PersistentTopic{topic=persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973}, client=[id: 0x039d7a90, L:/127.0.0.1:57291 - R:/127.0.0.1:57296] [SR:127.0.0.1, state:Connected], producerName=test-0-1, producerId=0}, assignedBrokerLookupData: Optional.empty
2025-11-04T22:44:14,413 - WARN  - [pulsar-io-28-3:PersistentTopic] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] Failed to persist msg in store: org.apache.pulsar.broker.service.BrokerServiceException$ServiceUnitNotReadyException: Transaction Buffer take first snapshot failed, the current state is: Ready
2025-11-04T22:44:14,413 - INFO  - [pulsar-io-28-3:PersistentTopic] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] Un-fencing topic...
2025-11-04T22:44:14,414 - INFO  - [pulsar-client-io-96-3:ClientCnx] - [localhost/127.0.0.1:57291] Broker notification of closed producer: 0, assignedBrokerUrl: null, assignedBrokerUrlTls: null
2025-11-04T22:44:14,414 - INFO  - [pulsar-client-io-163-3:ProducerImpl] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] [test-0-3] Created producer on cnx [id: 0xfbd3c65b, L:/127.0.0.1:57298 - R:localhost/127.0.0.1:57291]
2025-11-04T22:44:14,414 - INFO  - [pulsar-client-io-96-3:ConnectionHandler] - [persistent://public/txn/tp-8064cb9f-1f8f-44f1-8bf2-872cc3870973] [test-0-1] Closed connection [id: 0x2f0343b6, L:/127.0.0.1:57296 - R:localhost/127.0.0.1:57291] -- Will try again in 0.1 s, hostUrl: null
2025-11-04T22:44:14,414 - WARN  - [pulsar-client-io-96-3:ClientCnx] - [id: 0x2f0343b6, L:/127.0.0.1:57296 - R:localhost/127.0.0.1:57291] Received send error from server: PersistenceError : org.apache.bookkeeper.mledger.ManagedLedgerException: org.apache.pulsar.broker.service.BrokerServiceException$ServiceUnitNotReadyException: Transaction Buffer take first snapshot failed, the current state is: Ready
2025-11-04T22:44:14,414 - WARN  - [pulsar-client-io-96-3:ClientCnx] - [id: 0x2f0343b6, L:/127.0.0.1:57296 - R:localhost/127.0.0.1:57291] Producer with id 0 not found while handling send error
2025-11-04T22:44:14,414 - INFO  - [pulsar-client-io-163-3:ClientCnx] - [localhost/127.0.0.1:57291] Broker notification of closed producer: 0, assignedBrokerUrl: null, assignedBrokerUrlTls: null

Issue 2: publishing messages before the transaction buffer is recovered.

Before #21406: a wrong variable was used when reconstructing the class, the correct variable should be snapshotAbortedTxnProcessor, but it used publishFuture. See follows:

This issue makes transaction buffer recovery and taking a transaction snapshot execute concurrently.


performance issue 3: the second write request can only be executed after the first response client

There is a variable named publishFuture, the following publishing can only be executed after the previous one is done.

https://github.com/apache/pulsar/pull/21406/files#diff-ecd728301a585f256e8a649b5e65b28c166194477355b3a1eefc198d014c25d3R255-R288

public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
    CompletableFuture<Position> future = this.publishFuture.thenCompose(ignore -> {
        internalAppendBufferToTxn(txnId, buffer);
    });
    this.publishFuture = future;
    return future;
}

Modifications

  • Fix the two issues
  • Improve performance issue that was described in Motivation-3

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

…currently executed transaction buffer snaoshot
@poorbarcode poorbarcode added this to the 4.2.0 milestone Nov 4, 2025
@poorbarcode poorbarcode self-assigned this Nov 4, 2025
@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug ready-to-test release/4.1.2 release/4.0.8 labels Nov 4, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 4, 2025
@lhotari lhotari changed the title [fix][broker]Trsaction messages can never be sent successfully if concurrently taking transaction buffer snapshot [fix][broker]Transactional messages can never be sent successfully if concurrently taking transaction buffer snapshot Nov 4, 2025
@lhotari
Copy link
Member

lhotari commented Nov 4, 2025

Checkstyle errors:

Error:  src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:[291] (sizes) LineLength: Line is longer than 120 characters (found 126).
Error:  src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java:[35,8] (imports) UnusedImports: Unused import: java.util.WeakHashMap.

Please run mvn initialize checkstyle:check to quickly run checkstyle locally.

@lhotari
Copy link
Member

lhotari commented Nov 4, 2025

test failures

  Error:  Tests run: 13, Failures: 1, Errors: 0, Skipped: 10, Time elapsed: 41.26 s <<< FAILURE! -- in org.apache.pulsar.broker.transaction.TransactionProduceTest
  Error:  org.apache.pulsar.broker.transaction.TransactionProduceTest.produceAndAbortTest -- Time elapsed: 1.323 s <<< FAILURE!
  java.lang.AssertionError: Tnx commit failed!
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.apache.pulsar.broker.transaction.TransactionProduceTest.lambda$checkMessageId$0(TransactionProduceTest.java:242)
  	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
  	at org.apache.pulsar.broker.transaction.TransactionProduceTest.checkMessageId(TransactionProduceTest.java:223)
  	at org.apache.pulsar.broker.transaction.TransactionProduceTest.produceTest(TransactionProduceTest.java:140)
  	at org.apache.pulsar.broker.transaction.TransactionProduceTest.produceAndAbortTest(TransactionProduceTest.java:103)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)
  Error:  Tests run: 22, Failures: 1, Errors: 0, Skipped: 16, Time elapsed: 62.50 s <<< FAILURE! -- in org.apache.pulsar.broker.transaction.buffer.TopicTransactionBufferTest
  Error:  org.apache.pulsar.broker.transaction.buffer.TopicTransactionBufferTest.testMessagePublishInOrder -- Time elapsed: 0.221 s <<< FAILURE!
  java.lang.AssertionError: expected [0] but found [28]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertEqualsImpl(Assert.java:149)
  	at org.testng.Assert.assertEquals(Assert.java:131)
  	at org.testng.Assert.assertEquals(Assert.java:1418)
  	at org.testng.Assert.assertEquals(Assert.java:1438)
  	at org.apache.pulsar.broker.transaction.buffer.TopicTransactionBufferTest.testMessagePublishInOrder(TopicTransactionBufferTest.java:538)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)
  Error:  Tests run: 110, Failures: 1, Errors: 0, Skipped: 106, Time elapsed: 628.6 s <<< FAILURE! -- in org.apache.pulsar.client.impl.TransactionEndToEndTest
  Error:  org.apache.pulsar.client.impl.TransactionEndToEndTest.produceCommitTest[true](3) -- Time elapsed: 300.0 s <<< FAILURE!
  org.testng.internal.thread.ThreadTimeoutException: Method org.apache.pulsar.client.impl.TransactionEndToEndTest.produceCommitTest() didn't finish within the time-out 300000
  	at java.base/jdk.internal.misc.Unsafe.park(Native Method)
  	at java.base/java.util.concurrent.locks.LockSupport.park(LockSupport.java:221)
  	at java.base/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1864)
  	at java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780)
  	at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725)
  	at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
  	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
  	at org.apache.pulsar.client.impl.TransactionEndToEndTest.produceCommitTest(TransactionEndToEndTest.java:349)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

@poorbarcode poorbarcode requested a review from lhotari November 5, 2025 17:57
@poorbarcode poorbarcode requested a review from lhotari November 6, 2025 13:09
@poorbarcode poorbarcode requested a review from lhotari November 7, 2025 02:09
@codecov-commenter
Copy link

codecov-commenter commented Nov 7, 2025

Codecov Report

❌ Patch coverage is 68.86792% with 33 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.38%. Comparing base (676ba07) to head (29e8d32).
⚠️ Report is 19 commits behind head on master.

Files with missing lines Patch % Lines
...ransaction/buffer/impl/TopicTransactionBuffer.java 69.69% 24 Missing and 6 partials ⚠️
...ction/buffer/impl/TopicTransactionBufferState.java 57.14% 0 Missing and 3 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24945       +/-   ##
=============================================
+ Coverage     38.56%   74.38%   +35.82%     
- Complexity    13262    33673    +20411     
=============================================
  Files          1856     1920       +64     
  Lines        145287   150138     +4851     
  Branches      16877    17414      +537     
=============================================
+ Hits          56025   111687    +55662     
+ Misses        81696    29570    -52126     
- Partials       7566     8881     +1315     
Flag Coverage Δ
inttests 26.34% <40.56%> (+0.16%) ⬆️
systests 22.84% <0.00%> (+0.08%) ⬆️
unittests 73.93% <68.86%> (+39.19%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ction/buffer/impl/TopicTransactionBufferState.java 88.46% <57.14%> (-2.02%) ⬇️
...ransaction/buffer/impl/TopicTransactionBuffer.java 83.48% <69.69%> (+46.14%) ⬆️

... and 1423 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@BewareMyPower
Copy link
Contributor

testRefCountWhenAppendBufferToTxn failed, but it should not be a regression, just because the previous exception message is different.

image

@poorbarcode poorbarcode merged commit f29ca21 into apache:master Nov 11, 2025
97 of 100 checks passed
lhotari pushed a commit that referenced this pull request Nov 11, 2025
… concurrently taking transaction buffer snapshot (#24945)

(cherry picked from commit f29ca21)
lhotari pushed a commit that referenced this pull request Nov 11, 2025
… concurrently taking transaction buffer snapshot (#24945)

(cherry picked from commit f29ca21)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
… concurrently taking transaction buffer snapshot (apache#24945)

(cherry picked from commit f29ca21)
(cherry picked from commit 85e8b5d)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 14, 2025
… concurrently taking transaction buffer snapshot (apache#24945)

(cherry picked from commit f29ca21)
(cherry picked from commit 85e8b5d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants