|
1 | 1 | package org.infinispan.server.resp; |
2 | 2 |
|
| 3 | +import java.nio.charset.StandardCharsets; |
3 | 4 | import java.util.List; |
4 | 5 | import java.util.concurrent.CompletableFuture; |
5 | 6 | import java.util.concurrent.CompletionStage; |
| 7 | +import java.util.function.BiConsumer; |
| 8 | +import java.util.function.Consumer; |
| 9 | +import java.util.function.IntFunction; |
6 | 10 |
|
7 | 11 | import org.infinispan.commons.util.concurrent.CompletableFutures; |
8 | 12 | import org.infinispan.util.function.TriConsumer; |
9 | 13 |
|
10 | 14 | import io.netty.buffer.ByteBuf; |
11 | | -import io.netty.buffer.Unpooled; |
12 | 15 | import io.netty.channel.ChannelHandlerContext; |
13 | | -import io.netty.util.CharsetUtil; |
| 16 | +import io.netty.util.AttributeKey; |
14 | 17 |
|
15 | 18 | public class OurRespHandler extends RespRequestHandler { |
16 | 19 | private static final CompletableFuture<byte[]> GET_FUTURE = CompletableFuture.completedFuture(new byte[] { 0x1, 0x12}); |
17 | | - public static ByteBuf OK = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("+OK\r\n", CharsetUtil.US_ASCII)); |
18 | | - |
19 | | - // Returns a cached OK status that is retained for multiple uses |
20 | | - static ByteBuf statusOK() { |
21 | | - return OK.duplicate(); |
22 | | - } |
| 20 | + public static byte[] OK = "+OK\r\n".getBytes(StandardCharsets.US_ASCII); |
23 | 21 |
|
24 | 22 | @Override |
25 | | - public CompletionStage<RespRequestHandler> handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) { |
| 23 | + public CompletionStage<RespRequestHandler> actualHandleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) { |
26 | 24 | switch (type) { |
27 | 25 | case "GET": |
28 | 26 | return stageToReturn(GET_FUTURE, ctx, GET_TRICONSUMER); |
29 | 27 | case "SET": |
30 | | - return stageToReturn(CompletableFutures.completedNull(), ctx, SET_TRICONSUMER); |
| 28 | + return stageToReturn(CompletableFutures.completedNull(), ctx, OK_BICONSUMER); |
31 | 29 | } |
32 | 30 | return super.handleRequest(ctx, type, arguments); |
33 | 31 | } |
34 | 32 |
|
35 | | - private static final TriConsumer<byte[], ChannelHandlerContext, Throwable> SET_TRICONSUMER = (ignore, innerCtx, t) -> { |
36 | | - if (t != null) { |
37 | | - throw new AssertionError(t); |
38 | | - } else { |
39 | | - innerCtx.writeAndFlush(statusOK(), innerCtx.voidPromise()); |
40 | | - } |
41 | | - }; |
| 33 | + protected static final BiConsumer<Object, ByteBufPool> OK_BICONSUMER = (ignore, alloc) -> |
| 34 | + alloc.acquire(OK.length).writeBytes(OK); |
42 | 35 |
|
43 | | - private static final TriConsumer<byte[], ChannelHandlerContext, Throwable> GET_TRICONSUMER = (innerValueBytes, innerCtx, t) -> { |
44 | | - if (t != null) { |
45 | | - throw new AssertionError(t); |
46 | | - } else if (innerValueBytes != null) { |
47 | | - ByteBuf buf = bytesToResult(innerValueBytes, innerCtx.alloc()); |
48 | | - innerCtx.writeAndFlush(buf, innerCtx.voidPromise()); |
| 36 | + protected static final BiConsumer<byte[], ByteBufPool> GET_TRICONSUMER = (innerValueBytes, alloc) -> { |
| 37 | + if (innerValueBytes != null) { |
| 38 | + bytesToResult(innerValueBytes, alloc); |
49 | 39 | } else { |
50 | | - innerCtx.writeAndFlush(RespRequestHandler.stringToByteBuf("$-1\r\n", innerCtx.alloc()), innerCtx.voidPromise()); |
| 40 | + stringToByteBuf("$-1\r\n", alloc); |
51 | 41 | } |
52 | 42 | }; |
53 | 43 | } |
0 commit comments