Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 3602c9e

Browse files
gaoran10eolivelli
andauthored
[bugfix] AppendRecordsContext cannot be Recyclable (#1967)
(cherry picked from commit 0b6fd77) ### Motivation AppendRecordsContext cannot be Recyclable. ### Modifications Don't recycle the AppendRecordsContext. Co-authored-by: Enrico Olivelli <[email protected]>
1 parent e931b6d commit 3602c9e

File tree

2 files changed

+9
-31
lines changed

2 files changed

+9
-31
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
875875
PartitionLog.AppendOrigin.Client,
876876
appendRecordsContext
877877
).whenComplete((response, ex) -> {
878-
appendRecordsContext.recycle();
879878
if (ex != null) {
880879
resultFuture.completeExceptionally(ex.getCause());
881880
return;
@@ -2449,7 +2448,6 @@ protected void handleWriteTxnMarkers(KafkaHeaderAndRequest kafkaHeaderAndRequest
24492448
PartitionLog.AppendOrigin.Coordinator,
24502449
appendRecordsContext
24512450
).whenComplete((result, ex) -> {
2452-
appendRecordsContext.recycle();
24532451
if (ex != null) {
24542452
log.error("[{}] Append txn marker ({}) failed.", ctx.channel(), marker, ex);
24552453
Map<TopicPartition, Errors> currentErrors = new HashMap<>();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/AppendRecordsContext.java

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,59 +14,39 @@
1414
package io.streamnative.pulsar.handlers.kop.storage;
1515

1616
import io.netty.channel.ChannelHandlerContext;
17-
import io.netty.util.Recycler;
1817
import io.streamnative.pulsar.handlers.kop.KafkaTopicManager;
1918
import io.streamnative.pulsar.handlers.kop.PendingTopicFutures;
2019
import java.util.Map;
2120
import java.util.function.Consumer;
21+
import lombok.AllArgsConstructor;
2222
import lombok.Getter;
23+
import lombok.extern.slf4j.Slf4j;
2324
import org.apache.kafka.common.TopicPartition;
2425

2526
/**
2627
* AppendRecordsContext is use for pass parameters to ReplicaManager, to avoid long parameter lists.
2728
*/
29+
@Slf4j
30+
@AllArgsConstructor
2831
@Getter
2932
public class AppendRecordsContext {
30-
private static final Recycler<AppendRecordsContext> RECYCLER = new Recycler<AppendRecordsContext>() {
31-
protected AppendRecordsContext newObject(Handle<AppendRecordsContext> handle) {
32-
return new AppendRecordsContext(handle);
33-
}
34-
};
35-
36-
private final Recycler.Handle<AppendRecordsContext> recyclerHandle;
3733
private KafkaTopicManager topicManager;
3834
private Consumer<Integer> startSendOperationForThrottling;
3935
private Consumer<Integer> completeSendOperationForThrottling;
4036
private Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap;
4137
private ChannelHandlerContext ctx;
4238

43-
private AppendRecordsContext(Recycler.Handle<AppendRecordsContext> recyclerHandle) {
44-
this.recyclerHandle = recyclerHandle;
45-
}
46-
4739
// recycler and get for this object
4840
public static AppendRecordsContext get(final KafkaTopicManager topicManager,
4941
final Consumer<Integer> startSendOperationForThrottling,
5042
final Consumer<Integer> completeSendOperationForThrottling,
5143
final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap,
5244
final ChannelHandlerContext ctx) {
53-
AppendRecordsContext context = RECYCLER.get();
54-
context.topicManager = topicManager;
55-
context.startSendOperationForThrottling = startSendOperationForThrottling;
56-
context.completeSendOperationForThrottling = completeSendOperationForThrottling;
57-
context.pendingTopicFuturesMap = pendingTopicFuturesMap;
58-
context.ctx = ctx;
59-
60-
return context;
61-
}
62-
63-
public void recycle() {
64-
topicManager = null;
65-
startSendOperationForThrottling = null;
66-
completeSendOperationForThrottling = null;
67-
pendingTopicFuturesMap = null;
68-
recyclerHandle.recycle(this);
69-
ctx = null;
45+
return new AppendRecordsContext(topicManager,
46+
startSendOperationForThrottling,
47+
completeSendOperationForThrottling,
48+
pendingTopicFuturesMap,
49+
ctx);
7050
}
7151

7252
}

0 commit comments

Comments
 (0)