Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen

private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>();

private CompletableFuture<Position> publishFuture = getTransactionBufferFuture()
.thenApply(__ -> PositionFactory.EARLIEST);

/**
* The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC.
*/
Expand All @@ -108,6 +106,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen

private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;
/** if the first snapshot is in progress, it will pending following publishing tasks. **/
private final LinkedList<PendingAppendingTxnBufferTask> pendingAppendingTxnBufferTasks = new LinkedList<>();

private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) {
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
Expand Down Expand Up @@ -232,16 +232,6 @@ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return CompletableFuture.completedFuture(null);
}

@VisibleForTesting
public void setPublishFuture(CompletableFuture<Position> publishFuture) {
this.publishFuture = publishFuture;
}

@VisibleForTesting
public CompletableFuture<Position> getPublishFuture() {
return publishFuture;
}

@VisibleForTesting
public CompletableFuture<Void> getTransactionBufferFuture() {
return transactionBufferFuture;
Expand All @@ -267,47 +257,146 @@ public long getCommittedTxnCount() {
return this.txnCommittedCounter.sum();
}

private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId, ByteBuf buffer,
CompletableFuture<Position> pendingPublishFuture) {

void fail(Throwable throwable) {
buffer.release();
pendingPublishFuture.completeExceptionally(throwable);
}
}

@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
// Method `takeAbortedTxnsSnapshot` will be executed in the different thread.
// So we need to retain the buffer in this thread. It will be released after message persistent.
buffer.retain();
CompletableFuture<Position> future = getPublishFuture().thenCompose(ignore -> {
if (checkIfNoSnapshot()) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
// `publishFuture` will be completed after message persistent, so there will not be two threads
// writing snapshots at the same time.
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> {
if (changeToReadyStateFromNoSnapshot()) {
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
completableFuture.complete(null);
} else {
log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot",
topic.getName());
completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(
"Transaction Buffer take first snapshot failed, the current state is: " + getState()));
}
}).exceptionally(exception -> {
log.error("Topic {} failed to take snapshot", this.topic.getName());
completableFuture.completeExceptionally(exception);
return null;
});
return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer));
} else if (checkIfReady()) {
return internalAppendBufferToTxn(txnId, buffer);
} else {
// `publishFuture` will be completed after transaction buffer recover completely
// during initializing, so this case should not happen.
synchronized (pendingAppendingTxnBufferTasks) {
// The first snapshot is in progress, the following publish tasks will be pending.
if (!pendingAppendingTxnBufferTasks.isEmpty()) {
CompletableFuture<Position> res = new CompletableFuture<>();
buffer.retain();
pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
return res;
}

// `publishFuture` will be completed after transaction buffer recover completely
// during initializing, so this case should not happen.
if (!checkIfReady() && !checkIfNoSnapshot() && !checkIfFirstSnapshotting() && !checkIfInitializing()) {
log.error("[{}] unexpected state: {} when try to take the first transaction buffer snapshot",
topic.getName(), getState());
return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException(
"Transaction Buffer recover failed, the current state is: " + getState()));
}
}).whenComplete(((position, throwable) -> buffer.release()));
setPublishFuture(future);
return future;

// The transaction buffer is ready to write.
if (checkIfReady()) {
return internalAppendBufferToTxn(txnId, buffer, sequenceId);
}

// Pending the current publishing and trigger new snapshot if needed.
CompletableFuture<Position> res = new CompletableFuture<>();
buffer.retain();
pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));

final java.util.function.Consumer<Throwable> failPendingTasks = throwable -> {
synchronized (pendingAppendingTxnBufferTasks) {
PendingAppendingTxnBufferTask pendingTask = null;
while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) {
pendingTask.fail(throwable);
}
}
};

final Runnable flushPendingTasks = () -> {
PendingAppendingTxnBufferTask pendingTask = null;
try {
synchronized (pendingAppendingTxnBufferTasks) {
while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) {
final ByteBuf data = pendingTask.buffer;
final CompletableFuture<Position> pendingFuture =
pendingTask.pendingPublishFuture;
internalAppendBufferToTxn(pendingTask.txnId, pendingTask.buffer,
pendingTask.sequenceId)
.whenComplete((positionAdded, ex3) -> {
data.release();
if (ex3 != null) {
pendingFuture.completeExceptionally(ex3);
return;
}
pendingFuture.complete(positionAdded);
});
}
}
} catch (Exception e) {
// If there are some error when adding entries or caching entries, this log will be printed.
log.error("[{}] Failed to flush pending publishing requests after taking the first"
+ " snapshot.",
topic.getName(), e);
if (pendingTask != null) {
pendingTask.fail(e);
}
failPendingTasks.accept(e);
}
};

// Trigger the first snapshot.
transactionBufferFuture.whenComplete((ignore1, ex1) -> {
if (ex1 != null) {
log.error("[{}] Transaction buffer recover failed", topic.getName(), ex1);
failPendingTasks.accept(ex1);
return;
}
if (changeToFirstSnapshotting()) {
log.info("[{}] Start to take the first snapshot", topic.getName());
// Flush pending publishing after the first snapshot finished.
takeFirstSnapshot().whenComplete((ignore2, ex2) -> {
if (ex2 != null) {
log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests",
topic.getName(), ex2);
failPendingTasks.accept(ex2);
return;
}
log.info("[{}] Finished to take the first snapshot, flushing publishing {} requests",
topic.getName(), pendingAppendingTxnBufferTasks.size());
flushPendingTasks.run();
});
} else if (checkIfReady()) {
log.info("[{}] No need to take the first snapshot, flushing publishing {} requests",
topic.getName(), pendingAppendingTxnBufferTasks.size());
flushPendingTasks.run();
} else {
log.error("[{}] Transaction buffer recover failed, current state is {}", topic.getName(),
getState());
failPendingTasks.accept(new BrokerServiceException.ServiceUnitNotReadyException(
"Transaction Buffer recover failed, the current state is: " + getState()));
}
});
return res;
}
}

private CompletableFuture<Void> takeFirstSnapshot() {
CompletableFuture<Void> firstSnapshottingFuture = new CompletableFuture<>();
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> {
if (changeToReadyStateFromNoSnapshot()) {
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
firstSnapshottingFuture.complete(null);
} else {
log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot",
topic.getName());
firstSnapshottingFuture.completeExceptionally(new BrokerServiceException
.ServiceUnitNotReadyException(
"Transaction Buffer take first snapshot failed, the current state is: " + getState()));
}
}).exceptionally(exception -> {
log.error("Topic {} failed to take snapshot", this.topic.getName());
firstSnapshottingFuture.completeExceptionally(exception);
return null;
});
return firstSnapshottingFuture;
}

private CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer) {
@VisibleForTesting
protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) {
CompletableFuture<Position> completableFuture = new CompletableFuture<>();
Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits());
if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) {
Expand Down Expand Up @@ -550,7 +639,16 @@ public CompletableFuture<Void> clearSnapshot() {

@Override
public CompletableFuture<Void> closeAsync() {
changeToCloseState();
synchronized (pendingAppendingTxnBufferTasks) {
if (!checkIfClosed()) {
PendingAppendingTxnBufferTask pendingTask = null;
Throwable t = new BrokerServiceException.ServiceUnitNotReadyException("Topic is closed");
while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) {
pendingTask.fail(t);
}
}
changeToCloseState();
}
return this.snapshotAbortedTxnProcessor.closeAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public enum State {
Initializing,
Ready,
Close,
NoSnapshot
NoSnapshot,
FirstSnapshotting
}

private static final AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER =
Expand All @@ -59,13 +60,25 @@ protected boolean changeToInitializingState() {
}

protected boolean changeToReadyStateFromNoSnapshot() {
return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, State.Ready);
return STATE_UPDATER.compareAndSet(this, State.FirstSnapshotting, State.Ready);
}

protected boolean changeToFirstSnapshotting() {
return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, State.FirstSnapshotting);
}

protected void changeToCloseState() {
STATE_UPDATER.set(this, State.Close);
}

public boolean checkIfInitializing() {
return STATE_UPDATER.get(this) == State.Initializing;
}

public boolean checkIfFirstSnapshotting() {
return STATE_UPDATER.get(this) == State.FirstSnapshotting;
}

public boolean checkIfReady() {
return STATE_UPDATER.get(this) == State.Ready;
}
Expand All @@ -74,6 +87,10 @@ public boolean checkIfNoSnapshot() {
return STATE_UPDATER.get(this) == State.NoSnapshot;
}

public boolean checkIfClosed() {
return STATE_UPDATER.get(this) == State.Close;
}

public State getState() {
return STATE_UPDATER.get(this);
}
Expand Down
Loading
Loading