Skip to content

Commit 24a6c1c

Browse files
committed
Flush and buffer reuse
1 parent 523331e commit 24a6c1c

File tree

4 files changed

+237
-5
lines changed

4 files changed

+237
-5
lines changed

resp-decoder/src/main/java/org/infinispan/server/resp/NettyChannelState.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,29 @@
1212
public class NettyChannelState {
1313
public EmbeddedChannel channel;
1414

15+
public enum DECODER {
16+
CURRENT,
17+
NEW
18+
}
19+
20+
@Param
21+
public DECODER decoder;
22+
1523
@Setup
1624
public void initializeState() {
1725

1826
channel = new EmbeddedChannel();
1927
channel.config().setAllocator(PooledByteBufAllocator.DEFAULT);
2028

21-
OurRespHandler ourRespHandler = new OurRespHandler();
22-
23-
channel.pipeline()
24-
.addLast(new RespDecoder(ourRespHandler));
29+
switch (decoder) {
30+
case NEW:
31+
channel.pipeline()
32+
.addLast(new NewDecoder(new NewRespHandler()));
33+
break;
34+
case CURRENT:
35+
channel.pipeline()
36+
.addLast(new RespDecoder(new OurRespHandler()));
37+
break;
38+
}
2539
}
2640
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package org.infinispan.server.resp;
2+
3+
import java.util.List;
4+
import java.util.concurrent.CompletionStage;
5+
6+
import org.infinispan.commons.util.Util;
7+
import org.infinispan.util.concurrent.CompletionStages;
8+
9+
import io.netty.buffer.ByteBuf;
10+
import io.netty.buffer.Unpooled;
11+
import io.netty.channel.ChannelHandlerContext;
12+
import io.netty.util.concurrent.FastThreadLocal;
13+
14+
public class NewDecoder extends RespDecoder {
15+
public NewDecoder(NewRespHandler initialHandler) {
16+
super(initialHandler);
17+
initialHandler.writer = this::flushBuffer;
18+
initialHandler.allocator = this::retrieveBuffer;
19+
}
20+
21+
protected static final int THREAD_LOCAL_CAPACITY = 1024;
22+
23+
private ChannelHandlerContext ctx;
24+
25+
private boolean isReading;
26+
private ByteBuf pendingBuffer;
27+
28+
private static final FastThreadLocal<ByteBuf> BYTE_BUF_FAST_THREAD_LOCAL = new FastThreadLocal<>() {
29+
@Override
30+
protected ByteBuf initialValue() {
31+
return Unpooled.directBuffer(THREAD_LOCAL_CAPACITY, THREAD_LOCAL_CAPACITY);
32+
}
33+
34+
@Override
35+
protected void onRemoval(ByteBuf value) {
36+
value.release();
37+
}
38+
};
39+
40+
protected void flushBuffer(ByteBuf buffer) {
41+
assert buffer == pendingBuffer : "Buffer mismatch expected " + pendingBuffer + " but was " + buffer;
42+
// Only flush the buffer it is done outside of the read loop, as we handle that ourselves
43+
if (!isReading) {
44+
ctx.writeAndFlush(pendingBuffer, ctx.voidPromise());
45+
pendingBuffer = null;
46+
}
47+
}
48+
49+
private void flushPendingBuffer() {
50+
if (pendingBuffer != null) {
51+
ctx.writeAndFlush(pendingBuffer, ctx.voidPromise());
52+
pendingBuffer = null;
53+
}
54+
}
55+
56+
@Override
57+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
58+
flushPendingBuffer();
59+
super.channelInactive(ctx);
60+
}
61+
62+
@Override
63+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
64+
this.ctx = ctx;
65+
super.handlerAdded(ctx);
66+
}
67+
68+
protected ByteBuf retrieveBuffer(int requiredBytes) {
69+
if (pendingBuffer != null) {
70+
if (requiredBytes < pendingBuffer.writableBytes()) {
71+
return pendingBuffer;
72+
}
73+
ctx.write(pendingBuffer, ctx.voidPromise());
74+
pendingBuffer = null;
75+
}
76+
ByteBuf buf = BYTE_BUF_FAST_THREAD_LOCAL.get();
77+
if (requiredBytes < buf.writableBytes()) {
78+
// This will reserve the buffer for our usage only, other channels in same event loop may try to reserve
79+
if (buf.refCnt() == 1) {
80+
buf.retain();
81+
buf.clear();
82+
pendingBuffer = buf;
83+
return buf;
84+
}
85+
}
86+
int reserveSize = Math.max(requiredBytes, 4096);
87+
pendingBuffer = ctx.alloc().buffer(reserveSize, reserveSize);
88+
return pendingBuffer;
89+
}
90+
91+
@Override
92+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
93+
isReading = false;
94+
flushPendingBuffer();
95+
super.channelReadComplete(ctx);
96+
}
97+
98+
@Override
99+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
100+
isReading = true;
101+
super.channelRead(ctx, msg);
102+
}
103+
104+
@Override
105+
protected boolean handleCommandAndArguments(ChannelHandlerContext ctx, String command, List<byte[]> arguments) {
106+
boolean canContinue = handleCommandAndArgumentsOverride(ctx, command, arguments);
107+
if (canContinue) {
108+
if (ctx.channel().bytesBeforeUnwritable() < pendingBuffer.readableBytes()) {
109+
ctx.writeAndFlush(pendingBuffer);
110+
pendingBuffer = null;
111+
// TODO: we should probably check for writeability still and block reading if possible until is cleared
112+
}
113+
} else {
114+
assert !ctx.channel().config().isAutoRead();
115+
}
116+
return canContinue;
117+
}
118+
119+
protected boolean handleCommandAndArgumentsOverride(ChannelHandlerContext ctx, String command, List<byte[]> arguments) {
120+
if (log.isTraceEnabled()) {
121+
log.tracef("Received command: %s with arguments %s for %s", command, Util.toStr(arguments), ctx.channel());
122+
}
123+
124+
CompletionStage<RespRequestHandler> stage = requestHandler.handleRequest(ctx, command, arguments);
125+
if (CompletionStages.isCompletedSuccessfully(stage)) {
126+
requestHandler = CompletionStages.join(stage);
127+
return true;
128+
}
129+
log.tracef("Disabling auto read for channel %s until previous command is complete", ctx.channel());
130+
// Disable reading any more from socket - until command is complete
131+
ctx.channel().config().setAutoRead(false);
132+
stage.whenComplete((handler, t) -> {
133+
assert ctx.channel().eventLoop().inEventLoop();
134+
log.tracef("Re-enabling auto read for channel %s as previous command is complete", ctx.channel());
135+
ctx.channel().config().setAutoRead(true);
136+
if (t != null) {
137+
exceptionCaught(ctx, t);
138+
} else {
139+
// Instate the new handler if there was no exception
140+
requestHandler = handler;
141+
}
142+
143+
ctx.read();
144+
});
145+
return false;
146+
}
147+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package org.infinispan.server.resp;
2+
3+
import java.nio.charset.StandardCharsets;
4+
import java.util.List;
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.CompletionStage;
7+
import java.util.function.Consumer;
8+
import java.util.function.IntFunction;
9+
10+
import org.infinispan.commons.util.concurrent.CompletableFutures;
11+
import org.infinispan.util.function.TriConsumer;
12+
13+
import io.netty.buffer.ByteBuf;
14+
import io.netty.channel.ChannelHandlerContext;
15+
import io.netty.util.AttributeKey;
16+
17+
public class NewRespHandler extends RespRequestHandler {
18+
private static final CompletableFuture<byte[]> GET_FUTURE = CompletableFuture.completedFuture(new byte[] { 0x1, 0x12});
19+
public static byte[] OK = "+OK\r\n".getBytes(StandardCharsets.US_ASCII);
20+
21+
public IntFunction<ByteBuf> allocator;
22+
public Consumer<ByteBuf> writer;
23+
24+
@Override
25+
public CompletionStage<RespRequestHandler> handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
26+
switch (type) {
27+
case "GET":
28+
return stageToReturn(GET_FUTURE, ctx, GET_TRICONSUMER);
29+
case "SET":
30+
return stageToReturn(CompletableFutures.completedNull(), ctx, SET_TRICONSUMER);
31+
}
32+
return super.handleRequest(ctx, type, arguments);
33+
}
34+
35+
private final TriConsumer<byte[], ChannelHandlerContext, Throwable> SET_TRICONSUMER = (ignore, innerCtx, t) -> {
36+
if (t != null) {
37+
throw new AssertionError(t);
38+
} else {
39+
ByteBuf buf = allocator.apply(OK.length);
40+
writer.accept(buf.writeBytes(OK));
41+
}
42+
};
43+
44+
private final TriConsumer<byte[], ChannelHandlerContext, Throwable> GET_TRICONSUMER = (innerValueBytes, innerCtx, t) -> {
45+
if (t != null) {
46+
throw new AssertionError(t);
47+
} else if (innerValueBytes != null) {
48+
ByteBuf buf = bytesToResult(innerValueBytes, allocator);
49+
writer.accept(buf);
50+
} else {
51+
innerCtx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", innerCtx.alloc()), innerCtx.voidPromise());
52+
}
53+
};
54+
55+
protected static ByteBuf bytesToResult(byte[] result, IntFunction<ByteBuf> allocator) {
56+
int length = result.length;
57+
int stringLength = stringSize(length);
58+
59+
// Need 5 extra for $ and 2 sets of /r/n
60+
int exactSize = stringLength + length + 5;
61+
ByteBuf buffer = allocator.apply(exactSize);
62+
buffer.writeByte('$');
63+
// This method is anywhere from 10-100% faster than ByteBufUtil.writeAscii and avoids allocations
64+
setIntChars(length, stringLength, buffer);
65+
buffer.writeByte('\r').writeByte('\n');
66+
buffer.writeBytes(result);
67+
buffer.writeByte('\r').writeByte('\n');
68+
69+
return buffer;
70+
}
71+
}

resp-decoder/src/main/java/org/infinispan/server/resp/PipelineState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void initializeState() {
3434

3535
CompositeByteBuf setBuffer = Unpooled.compositeBuffer(messageCount);
3636
for (int i = 0; i < messageCount; ++i) {
37-
getBuffer.addComponent(true, Unpooled.copiedBuffer("*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n", CharsetUtil.US_ASCII));
37+
setBuffer.addComponent(true, Unpooled.copiedBuffer("*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n", CharsetUtil.US_ASCII));
3838
}
3939

4040
SET_REQUEST = Unpooled.unreleasableBuffer(setBuffer);

0 commit comments

Comments
 (0)