Skip to content

Commit 74c3b57

Browse files
authored
[improve][client]Improve transaction log when a TXN command timeout (#24230)
1 parent 2d78cbd commit 74c3b57

File tree

1 file changed

+37
-7
lines changed

1 file changed

+37
-7
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit)
234234
}
235235
long requestId = client.newRequestId();
236236
ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout));
237-
OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client);
237+
String description = String.format("Create new transaction %s", transactionCoordinatorId);
238+
OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client, description, cnx());
238239
internalPinnedExecutor.execute(() -> {
239240
pendingRequests.put(requestId, op);
240241
timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
@@ -315,8 +316,10 @@ public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, List<S
315316
long requestId = client.newRequestId();
316317
ByteBuf cmd = Commands.newAddPartitionToTxn(
317318
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), partitions);
319+
String description = String.format("Add partition %s to TXN %s", String.valueOf(partitions),
320+
String.valueOf(txnID));
318321
OpForVoidCallBack op = OpForVoidCallBack
319-
.create(cmd, callback, client);
322+
.create(cmd, callback, client, description, cnx());
320323
internalPinnedExecutor.execute(() -> {
321324
pendingRequests.put(requestId, op);
322325
timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
@@ -400,7 +403,9 @@ public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<Subscripti
400403
long requestId = client.newRequestId();
401404
ByteBuf cmd = Commands.newAddSubscriptionToTxn(
402405
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList);
403-
OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client);
406+
String description = String.format("Add subscription %s to TXN %s", toStringSubscriptionList(subscriptionList),
407+
String.valueOf(txnID));
408+
OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client, description, cnx());
404409
internalPinnedExecutor.execute(() -> {
405410
pendingRequests.put(requestId, op);
406411
timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
@@ -411,6 +416,17 @@ public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, List<Subscripti
411416
return callback;
412417
}
413418

419+
private String toStringSubscriptionList(List<Subscription> list) {
420+
if (list == null || list.isEmpty()) {
421+
return "[]";
422+
}
423+
StringBuilder builder = new StringBuilder("[");
424+
for (Subscription subscription : list) {
425+
builder.append(String.format("%s %s", subscription.getTopic(), subscription.getSubscription()));
426+
}
427+
return builder.append("]").toString();
428+
}
429+
414430
public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) {
415431
final boolean hasError = response.hasError();
416432
final ServerError error;
@@ -482,7 +498,8 @@ public CompletableFuture<Void> endTxnAsync(TxnID txnID, TxnAction action) {
482498
long requestId = client.newRequestId();
483499
BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
484500
ByteBuf buf = Commands.serializeWithSize(cmd);
485-
OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client);
501+
String description = String.format("End [%s] TXN %s", String.valueOf(action), String.valueOf(txnID));
502+
OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client, description, cnx());
486503
internalPinnedExecutor.execute(() -> {
487504
pendingRequests.put(requestId, op);
488505
timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
@@ -572,13 +589,16 @@ private abstract static class OpBase<T> {
572589
protected ByteBuf cmd;
573590
protected CompletableFuture<T> callback;
574591
protected Backoff backoff;
592+
protected String description;
593+
protected ClientCnx clientCnx;
575594

576595
abstract void recycle();
577596
}
578597

579598
private static class OpForTxnIdCallBack extends OpBase<TxnID> {
580599

581-
static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client) {
600+
static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client,
601+
String description, ClientCnx clientCnx) {
582602
OpForTxnIdCallBack op = RECYCLER.get();
583603
op.callback = callback;
584604
op.cmd = cmd;
@@ -588,6 +608,8 @@ static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback,
588608
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS)
589609
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
590610
.create();
611+
op.description = description;
612+
op.clientCnx = clientCnx;
591613
return op;
592614
}
593615

@@ -600,6 +622,8 @@ void recycle() {
600622
this.backoff = null;
601623
this.cmd = null;
602624
this.callback = null;
625+
this.description = null;
626+
this.clientCnx = null;
603627
recyclerHandle.recycle(this);
604628
}
605629

@@ -615,7 +639,8 @@ protected OpForTxnIdCallBack newObject(Handle<OpForTxnIdCallBack> handle) {
615639
private static class OpForVoidCallBack extends OpBase<Void> {
616640

617641

618-
static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client) {
642+
static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client,
643+
String description, ClientCnx clientCnx) {
619644
OpForVoidCallBack op = RECYCLER.get();
620645
op.callback = callback;
621646
op.cmd = cmd;
@@ -625,6 +650,8 @@ static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, P
625650
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS)
626651
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
627652
.create();
653+
op.description = description;
654+
op.clientCnx = clientCnx;
628655
return op;
629656
}
630657

@@ -637,6 +664,8 @@ void recycle() {
637664
this.backoff = null;
638665
this.cmd = null;
639666
this.callback = null;
667+
this.description = null;
668+
this.clientCnx = null;
640669
recyclerHandle.recycle(this);
641670
}
642671

@@ -744,7 +773,8 @@ public void run(Timeout timeout) throws Exception {
744773
OpBase<?> op = pendingRequests.remove(lastPolled.requestId);
745774
if (op != null && !op.callback.isDone()) {
746775
op.callback.completeExceptionally(new PulsarClientException.TimeoutException(
747-
"Could not get response from transaction meta store within given timeout."));
776+
String.format("%s failed due to timeout. connection: %s. pending-queue: %s",
777+
op.description, op.clientCnx, pendingRequests.size())));
748778
if (LOG.isDebugEnabled()) {
749779
LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId);
750780
}

0 commit comments

Comments
 (0)