2323import io .netty .util .Timeout ;
2424import io .netty .util .Timer ;
2525import io .netty .util .TimerTask ;
26+ import java .util .LinkedList ;
2627import java .util .List ;
2728import java .util .concurrent .CompletableFuture ;
2829import java .util .concurrent .ConcurrentHashMap ;
@@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
9293
9394 private final CompletableFuture <Void > transactionBufferFuture = new CompletableFuture <>();
9495
95- private CompletableFuture <Position > publishFuture = getTransactionBufferFuture ()
96- .thenApply (__ -> PositionFactory .EARLIEST );
97-
9896 /**
9997 * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
10098 */
@@ -108,6 +106,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
108106
109107 private final AbortedTxnProcessor .SnapshotType snapshotType ;
110108 private final MaxReadPositionCallBack maxReadPositionCallBack ;
109+ /** if the first snapshot is in progress, it will pending following publishing tasks. **/
110+ private final LinkedList <PendingAppendingTxnBufferTask > pendingAppendingTxnBufferTasks = new LinkedList <>();
111111
112112 private static AbortedTxnProcessor createSnapshotProcessor (PersistentTopic topic ) {
113113 return topic .getBrokerService ().getPulsar ().getConfiguration ().isTransactionBufferSegmentedSnapshotEnabled ()
@@ -229,16 +229,6 @@ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
229229 return CompletableFuture .completedFuture (null );
230230 }
231231
232- @ VisibleForTesting
233- public void setPublishFuture (CompletableFuture <Position > publishFuture ) {
234- this .publishFuture = publishFuture ;
235- }
236-
237- @ VisibleForTesting
238- public CompletableFuture <Position > getPublishFuture () {
239- return publishFuture ;
240- }
241-
242232 @ VisibleForTesting
243233 public CompletableFuture <Void > getTransactionBufferFuture () {
244234 return transactionBufferFuture ;
@@ -264,47 +254,146 @@ public long getCommittedTxnCount() {
264254 return this .txnCommittedCounter .sum ();
265255 }
266256
257+ private record PendingAppendingTxnBufferTask (TxnID txnId , long sequenceId , ByteBuf buffer ,
258+ CompletableFuture <Position > pendingPublishFuture ) {
259+
260+ void fail (Throwable throwable ) {
261+ buffer .release ();
262+ pendingPublishFuture .completeExceptionally (throwable );
263+ }
264+ }
265+
267266 @ Override
268267 public CompletableFuture <Position > appendBufferToTxn (TxnID txnId , long sequenceId , ByteBuf buffer ) {
269- // Method `takeAbortedTxnsSnapshot` will be executed in the different thread.
270- // So we need to retain the buffer in this thread. It will be released after message persistent.
271- buffer .retain ();
272- CompletableFuture <Position > future = getPublishFuture ().thenCompose (ignore -> {
273- if (checkIfNoSnapshot ()) {
274- CompletableFuture <Void > completableFuture = new CompletableFuture <>();
275- // `publishFuture` will be completed after message persistent, so there will not be two threads
276- // writing snapshots at the same time.
277- snapshotAbortedTxnProcessor .takeAbortedTxnsSnapshot (maxReadPosition ).thenRun (() -> {
278- if (changeToReadyStateFromNoSnapshot ()) {
279- timer .newTimeout (TopicTransactionBuffer .this ,
280- takeSnapshotIntervalTime , TimeUnit .MILLISECONDS );
281- completableFuture .complete (null );
282- } else {
283- log .error ("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot" ,
284- topic .getName ());
285- completableFuture .completeExceptionally (new BrokerServiceException .ServiceUnitNotReadyException (
286- "Transaction Buffer take first snapshot failed, the current state is: " + getState ()));
287- }
288- }).exceptionally (exception -> {
289- log .error ("Topic {} failed to take snapshot" , this .topic .getName ());
290- completableFuture .completeExceptionally (exception );
291- return null ;
292- });
293- return completableFuture .thenCompose (__ -> internalAppendBufferToTxn (txnId , buffer ));
294- } else if (checkIfReady ()) {
295- return internalAppendBufferToTxn (txnId , buffer );
296- } else {
297- // `publishFuture` will be completed after transaction buffer recover completely
298- // during initializing, so this case should not happen.
268+ synchronized (pendingAppendingTxnBufferTasks ) {
269+ // The first snapshot is in progress, the following publish tasks will be pending.
270+ if (!pendingAppendingTxnBufferTasks .isEmpty ()) {
271+ CompletableFuture <Position > res = new CompletableFuture <>();
272+ buffer .retain ();
273+ pendingAppendingTxnBufferTasks .offer (new PendingAppendingTxnBufferTask (txnId , sequenceId , buffer , res ));
274+ return res ;
275+ }
276+
277+ // `publishFuture` will be completed after transaction buffer recover completely
278+ // during initializing, so this case should not happen.
279+ if (!checkIfReady () && !checkIfNoSnapshot () && !checkIfFirstSnapshotting () && !checkIfInitializing ()) {
280+ log .error ("[{}] unexpected state: {} when try to take the first transaction buffer snapshot" ,
281+ topic .getName (), getState ());
299282 return FutureUtil .failedFuture (new BrokerServiceException .ServiceUnitNotReadyException (
300283 "Transaction Buffer recover failed, the current state is: " + getState ()));
301284 }
302- }).whenComplete (((position , throwable ) -> buffer .release ()));
303- setPublishFuture (future );
304- return future ;
285+
286+ // The transaction buffer is ready to write.
287+ if (checkIfReady ()) {
288+ return internalAppendBufferToTxn (txnId , buffer , sequenceId );
289+ }
290+
291+ // Pending the current publishing and trigger new snapshot if needed.
292+ CompletableFuture <Position > res = new CompletableFuture <>();
293+ buffer .retain ();
294+ pendingAppendingTxnBufferTasks .offer (new PendingAppendingTxnBufferTask (txnId , sequenceId , buffer , res ));
295+
296+ final java .util .function .Consumer <Throwable > failPendingTasks = throwable -> {
297+ synchronized (pendingAppendingTxnBufferTasks ) {
298+ PendingAppendingTxnBufferTask pendingTask = null ;
299+ while ((pendingTask = pendingAppendingTxnBufferTasks .poll ()) != null ) {
300+ pendingTask .fail (throwable );
301+ }
302+ }
303+ };
304+
305+ final Runnable flushPendingTasks = () -> {
306+ PendingAppendingTxnBufferTask pendingTask = null ;
307+ try {
308+ synchronized (pendingAppendingTxnBufferTasks ) {
309+ while ((pendingTask = pendingAppendingTxnBufferTasks .poll ()) != null ) {
310+ final ByteBuf data = pendingTask .buffer ;
311+ final CompletableFuture <Position > pendingFuture =
312+ pendingTask .pendingPublishFuture ;
313+ internalAppendBufferToTxn (pendingTask .txnId , pendingTask .buffer ,
314+ pendingTask .sequenceId )
315+ .whenComplete ((positionAdded , ex3 ) -> {
316+ data .release ();
317+ if (ex3 != null ) {
318+ pendingFuture .completeExceptionally (ex3 );
319+ return ;
320+ }
321+ pendingFuture .complete (positionAdded );
322+ });
323+ }
324+ }
325+ } catch (Exception e ) {
326+ // If there are some error when adding entries or caching entries, this log will be printed.
327+ log .error ("[{}] Failed to flush pending publishing requests after taking the first"
328+ + " snapshot." ,
329+ topic .getName (), e );
330+ if (pendingTask != null ) {
331+ pendingTask .fail (e );
332+ }
333+ failPendingTasks .accept (e );
334+ }
335+ };
336+
337+ // Trigger the first snapshot.
338+ transactionBufferFuture .whenComplete ((ignore1 , ex1 ) -> {
339+ if (ex1 != null ) {
340+ log .error ("[{}] Transaction buffer recover failed" , topic .getName (), ex1 );
341+ failPendingTasks .accept (ex1 );
342+ return ;
343+ }
344+ if (changeToFirstSnapshotting ()) {
345+ log .info ("[{}] Start to take the first snapshot" , topic .getName ());
346+ // Flush pending publishing after the first snapshot finished.
347+ takeFirstSnapshot ().whenComplete ((ignore2 , ex2 ) -> {
348+ if (ex2 != null ) {
349+ log .error ("[{}] Failed to take the first snapshot, flushing failed publishing requests" ,
350+ topic .getName (), ex2 );
351+ failPendingTasks .accept (ex2 );
352+ return ;
353+ }
354+ log .info ("[{}] Finished to take the first snapshot, flushing publishing {} requests" ,
355+ topic .getName (), pendingAppendingTxnBufferTasks .size ());
356+ flushPendingTasks .run ();
357+ });
358+ } else if (checkIfReady ()) {
359+ log .info ("[{}] No need to take the first snapshot, flushing publishing {} requests" ,
360+ topic .getName (), pendingAppendingTxnBufferTasks .size ());
361+ flushPendingTasks .run ();
362+ } else {
363+ log .error ("[{}] Transaction buffer recover failed, current state is {}" , topic .getName (),
364+ getState ());
365+ failPendingTasks .accept (new BrokerServiceException .ServiceUnitNotReadyException (
366+ "Transaction Buffer recover failed, the current state is: " + getState ()));
367+ }
368+ });
369+ return res ;
370+ }
371+ }
372+
373+ private CompletableFuture <Void > takeFirstSnapshot () {
374+ CompletableFuture <Void > firstSnapshottingFuture = new CompletableFuture <>();
375+ snapshotAbortedTxnProcessor .takeAbortedTxnsSnapshot (maxReadPosition ).thenRun (() -> {
376+ if (changeToReadyStateFromNoSnapshot ()) {
377+ timer .newTimeout (TopicTransactionBuffer .this ,
378+ takeSnapshotIntervalTime , TimeUnit .MILLISECONDS );
379+ firstSnapshottingFuture .complete (null );
380+ } else {
381+ log .error ("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot" ,
382+ topic .getName ());
383+ firstSnapshottingFuture .completeExceptionally (new BrokerServiceException
384+ .ServiceUnitNotReadyException (
385+ "Transaction Buffer take first snapshot failed, the current state is: " + getState ()));
386+ }
387+ }).exceptionally (exception -> {
388+ log .error ("Topic {} failed to take snapshot" , this .topic .getName ());
389+ firstSnapshottingFuture .completeExceptionally (exception );
390+ return null ;
391+ });
392+ return firstSnapshottingFuture ;
305393 }
306394
307- private CompletableFuture <Position > internalAppendBufferToTxn (TxnID txnId , ByteBuf buffer ) {
395+ @ VisibleForTesting
396+ protected CompletableFuture <Position > internalAppendBufferToTxn (TxnID txnId , ByteBuf buffer , long seq ) {
308397 CompletableFuture <Position > completableFuture = new CompletableFuture <>();
309398 Long lowWaterMark = lowWaterMarks .get (txnId .getMostSigBits ());
310399 if (lowWaterMark != null && lowWaterMark >= txnId .getLeastSigBits ()) {
@@ -547,7 +636,16 @@ public CompletableFuture<Void> clearSnapshot() {
547636
548637 @ Override
549638 public CompletableFuture <Void > closeAsync () {
550- changeToCloseState ();
639+ synchronized (pendingAppendingTxnBufferTasks ) {
640+ if (!checkIfClosed ()) {
641+ PendingAppendingTxnBufferTask pendingTask = null ;
642+ Throwable t = new BrokerServiceException .ServiceUnitNotReadyException ("Topic is closed" );
643+ while ((pendingTask = pendingAppendingTxnBufferTasks .poll ()) != null ) {
644+ pendingTask .fail (t );
645+ }
646+ }
647+ changeToCloseState ();
648+ }
551649 return this .snapshotAbortedTxnProcessor .closeAsync ();
552650 }
553651
0 commit comments