2323import io .netty .util .Timeout ;
2424import io .netty .util .Timer ;
2525import io .netty .util .TimerTask ;
26+ import java .util .LinkedList ;
2627import java .util .List ;
2728import java .util .concurrent .CompletableFuture ;
2829import java .util .concurrent .ConcurrentHashMap ;
@@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
9293
9394 private final CompletableFuture <Void > transactionBufferFuture = new CompletableFuture <>();
9495
95- private CompletableFuture <Position > publishFuture = getTransactionBufferFuture ()
96- .thenApply (__ -> PositionFactory .EARLIEST );
97-
9896 /**
9997 * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
10098 */
@@ -108,6 +106,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
108106
109107 private final AbortedTxnProcessor .SnapshotType snapshotType ;
110108 private final MaxReadPositionCallBack maxReadPositionCallBack ;
109+ /** if the first snapshot is in progress, it will pending following publishing tasks. **/
110+ private final LinkedList <PendingAppendingTxnBufferTask > pendingAppendingTxnBufferTasks = new LinkedList <>();
111111
112112 private static AbortedTxnProcessor createSnapshotProcessor (PersistentTopic topic ) {
113113 return topic .getBrokerService ().getPulsar ().getConfiguration ().isTransactionBufferSegmentedSnapshotEnabled ()
@@ -232,16 +232,6 @@ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
232232 return CompletableFuture .completedFuture (null );
233233 }
234234
235- @ VisibleForTesting
236- public void setPublishFuture (CompletableFuture <Position > publishFuture ) {
237- this .publishFuture = publishFuture ;
238- }
239-
240- @ VisibleForTesting
241- public CompletableFuture <Position > getPublishFuture () {
242- return publishFuture ;
243- }
244-
245235 @ VisibleForTesting
246236 public CompletableFuture <Void > getTransactionBufferFuture () {
247237 return transactionBufferFuture ;
@@ -267,47 +257,146 @@ public long getCommittedTxnCount() {
267257 return this .txnCommittedCounter .sum ();
268258 }
269259
260+ private record PendingAppendingTxnBufferTask (TxnID txnId , long sequenceId , ByteBuf buffer ,
261+ CompletableFuture <Position > pendingPublishFuture ) {
262+
263+ void fail (Throwable throwable ) {
264+ buffer .release ();
265+ pendingPublishFuture .completeExceptionally (throwable );
266+ }
267+ }
268+
270269 @ Override
271270 public CompletableFuture <Position > appendBufferToTxn (TxnID txnId , long sequenceId , ByteBuf buffer ) {
272- // Method `takeAbortedTxnsSnapshot` will be executed in the different thread.
273- // So we need to retain the buffer in this thread. It will be released after message persistent.
274- buffer .retain ();
275- CompletableFuture <Position > future = getPublishFuture ().thenCompose (ignore -> {
276- if (checkIfNoSnapshot ()) {
277- CompletableFuture <Void > completableFuture = new CompletableFuture <>();
278- // `publishFuture` will be completed after message persistent, so there will not be two threads
279- // writing snapshots at the same time.
280- snapshotAbortedTxnProcessor .takeAbortedTxnsSnapshot (maxReadPosition ).thenRun (() -> {
281- if (changeToReadyStateFromNoSnapshot ()) {
282- timer .newTimeout (TopicTransactionBuffer .this ,
283- takeSnapshotIntervalTime , TimeUnit .MILLISECONDS );
284- completableFuture .complete (null );
285- } else {
286- log .error ("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot" ,
287- topic .getName ());
288- completableFuture .completeExceptionally (new BrokerServiceException .ServiceUnitNotReadyException (
289- "Transaction Buffer take first snapshot failed, the current state is: " + getState ()));
290- }
291- }).exceptionally (exception -> {
292- log .error ("Topic {} failed to take snapshot" , this .topic .getName ());
293- completableFuture .completeExceptionally (exception );
294- return null ;
295- });
296- return completableFuture .thenCompose (__ -> internalAppendBufferToTxn (txnId , buffer ));
297- } else if (checkIfReady ()) {
298- return internalAppendBufferToTxn (txnId , buffer );
299- } else {
300- // `publishFuture` will be completed after transaction buffer recover completely
301- // during initializing, so this case should not happen.
271+ synchronized (pendingAppendingTxnBufferTasks ) {
272+ // The first snapshot is in progress, the following publish tasks will be pending.
273+ if (!pendingAppendingTxnBufferTasks .isEmpty ()) {
274+ CompletableFuture <Position > res = new CompletableFuture <>();
275+ buffer .retain ();
276+ pendingAppendingTxnBufferTasks .offer (new PendingAppendingTxnBufferTask (txnId , sequenceId , buffer , res ));
277+ return res ;
278+ }
279+
280+ // `publishFuture` will be completed after transaction buffer recover completely
281+ // during initializing, so this case should not happen.
282+ if (!checkIfReady () && !checkIfNoSnapshot () && !checkIfFirstSnapshotting () && !checkIfInitializing ()) {
283+ log .error ("[{}] unexpected state: {} when try to take the first transaction buffer snapshot" ,
284+ topic .getName (), getState ());
302285 return FutureUtil .failedFuture (new BrokerServiceException .ServiceUnitNotReadyException (
303286 "Transaction Buffer recover failed, the current state is: " + getState ()));
304287 }
305- }).whenComplete (((position , throwable ) -> buffer .release ()));
306- setPublishFuture (future );
307- return future ;
288+
289+ // The transaction buffer is ready to write.
290+ if (checkIfReady ()) {
291+ return internalAppendBufferToTxn (txnId , buffer , sequenceId );
292+ }
293+
294+ // Pending the current publishing and trigger new snapshot if needed.
295+ CompletableFuture <Position > res = new CompletableFuture <>();
296+ buffer .retain ();
297+ pendingAppendingTxnBufferTasks .offer (new PendingAppendingTxnBufferTask (txnId , sequenceId , buffer , res ));
298+
299+ final java .util .function .Consumer <Throwable > failPendingTasks = throwable -> {
300+ synchronized (pendingAppendingTxnBufferTasks ) {
301+ PendingAppendingTxnBufferTask pendingTask = null ;
302+ while ((pendingTask = pendingAppendingTxnBufferTasks .poll ()) != null ) {
303+ pendingTask .fail (throwable );
304+ }
305+ }
306+ };
307+
308+ final Runnable flushPendingTasks = () -> {
309+ PendingAppendingTxnBufferTask pendingTask = null ;
310+ try {
311+ synchronized (pendingAppendingTxnBufferTasks ) {
312+ while ((pendingTask = pendingAppendingTxnBufferTasks .poll ()) != null ) {
313+ final ByteBuf data = pendingTask .buffer ;
314+ final CompletableFuture <Position > pendingFuture =
315+ pendingTask .pendingPublishFuture ;
316+ internalAppendBufferToTxn (pendingTask .txnId , pendingTask .buffer ,
317+ pendingTask .sequenceId )
318+ .whenComplete ((positionAdded , ex3 ) -> {
319+ data .release ();
320+ if (ex3 != null ) {
321+ pendingFuture .completeExceptionally (ex3 );
322+ return ;
323+ }
324+ pendingFuture .complete (positionAdded );
325+ });
326+ }
327+ }
328+ } catch (Exception e ) {
329+ // If there are some error when adding entries or caching entries, this log will be printed.
330+ log .error ("[{}] Failed to flush pending publishing requests after taking the first"
331+ + " snapshot." ,
332+ topic .getName (), e );
333+ if (pendingTask != null ) {
334+ pendingTask .fail (e );
335+ }
336+ failPendingTasks .accept (e );
337+ }
338+ };
339+
340+ // Trigger the first snapshot.
341+ transactionBufferFuture .whenComplete ((ignore1 , ex1 ) -> {
342+ if (ex1 != null ) {
343+ log .error ("[{}] Transaction buffer recover failed" , topic .getName (), ex1 );
344+ failPendingTasks .accept (ex1 );
345+ return ;
346+ }
347+ if (changeToFirstSnapshotting ()) {
348+ log .info ("[{}] Start to take the first snapshot" , topic .getName ());
349+ // Flush pending publishing after the first snapshot finished.
350+ takeFirstSnapshot ().whenComplete ((ignore2 , ex2 ) -> {
351+ if (ex2 != null ) {
352+ log .error ("[{}] Failed to take the first snapshot, flushing failed publishing requests" ,
353+ topic .getName (), ex2 );
354+ failPendingTasks .accept (ex2 );
355+ return ;
356+ }
357+ log .info ("[{}] Finished to take the first snapshot, flushing publishing {} requests" ,
358+ topic .getName (), pendingAppendingTxnBufferTasks .size ());
359+ flushPendingTasks .run ();
360+ });
361+ } else if (checkIfReady ()) {
362+ log .info ("[{}] No need to take the first snapshot, flushing publishing {} requests" ,
363+ topic .getName (), pendingAppendingTxnBufferTasks .size ());
364+ flushPendingTasks .run ();
365+ } else {
366+ log .error ("[{}] Transaction buffer recover failed, current state is {}" , topic .getName (),
367+ getState ());
368+ failPendingTasks .accept (new BrokerServiceException .ServiceUnitNotReadyException (
369+ "Transaction Buffer recover failed, the current state is: " + getState ()));
370+ }
371+ });
372+ return res ;
373+ }
374+ }
375+
376+ private CompletableFuture <Void > takeFirstSnapshot () {
377+ CompletableFuture <Void > firstSnapshottingFuture = new CompletableFuture <>();
378+ snapshotAbortedTxnProcessor .takeAbortedTxnsSnapshot (maxReadPosition ).thenRun (() -> {
379+ if (changeToReadyStateFromNoSnapshot ()) {
380+ timer .newTimeout (TopicTransactionBuffer .this ,
381+ takeSnapshotIntervalTime , TimeUnit .MILLISECONDS );
382+ firstSnapshottingFuture .complete (null );
383+ } else {
384+ log .error ("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot" ,
385+ topic .getName ());
386+ firstSnapshottingFuture .completeExceptionally (new BrokerServiceException
387+ .ServiceUnitNotReadyException (
388+ "Transaction Buffer take first snapshot failed, the current state is: " + getState ()));
389+ }
390+ }).exceptionally (exception -> {
391+ log .error ("Topic {} failed to take snapshot" , this .topic .getName ());
392+ firstSnapshottingFuture .completeExceptionally (exception );
393+ return null ;
394+ });
395+ return firstSnapshottingFuture ;
308396 }
309397
310- private CompletableFuture <Position > internalAppendBufferToTxn (TxnID txnId , ByteBuf buffer ) {
398+ @ VisibleForTesting
399+ protected CompletableFuture <Position > internalAppendBufferToTxn (TxnID txnId , ByteBuf buffer , long seq ) {
311400 CompletableFuture <Position > completableFuture = new CompletableFuture <>();
312401 Long lowWaterMark = lowWaterMarks .get (txnId .getMostSigBits ());
313402 if (lowWaterMark != null && lowWaterMark >= txnId .getLeastSigBits ()) {
@@ -550,7 +639,16 @@ public CompletableFuture<Void> clearSnapshot() {
550639
551640 @ Override
552641 public CompletableFuture <Void > closeAsync () {
553- changeToCloseState ();
642+ synchronized (pendingAppendingTxnBufferTasks ) {
643+ if (!checkIfClosed ()) {
644+ PendingAppendingTxnBufferTask pendingTask = null ;
645+ Throwable t = new BrokerServiceException .ServiceUnitNotReadyException ("Topic is closed" );
646+ while ((pendingTask = pendingAppendingTxnBufferTasks .poll ()) != null ) {
647+ pendingTask .fail (t );
648+ }
649+ }
650+ changeToCloseState ();
651+ }
554652 return this .snapshotAbortedTxnProcessor .closeAsync ();
555653 }
556654
0 commit comments