Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
Expand Down Expand Up @@ -417,21 +420,39 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
);
}

ch.pipeline()
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
.addLast("encoder", new HttpResponseEncoder() {
@Override
protected boolean isContentAlwaysEmpty(HttpResponse msg) {
// non-chunked responses (Netty4HttpResponse extends Netty's DefaultFullHttpResponse) with chunked transfer
// encoding are only sent by us in response to HEAD requests and must always have an empty body
if (msg instanceof Netty4FullHttpResponse netty4FullHttpResponse && HttpUtil.isTransferEncodingChunked(msg)) {
assert netty4FullHttpResponse.content().isReadable() == false;
return true;
}
return super.isContentAlwaysEmpty(msg);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor() { // this handles request body decompression
private String currentUri;

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, java.util.List<Object> out) throws Exception {
if (msg instanceof HttpRequest request) {
currentUri = request.uri();
}
super.decode(ctx, msg, out);
}

@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
if (currentUri != null && currentUri.startsWith("/_prometheus") && "snappy".equalsIgnoreCase(contentEncoding)) {
// Prometheus remote write uses raw Snappy block format, not the framed
// format that Netty's SnappyFrameDecoder expects. Skip auto-decompression
// and let the application layer handle it.
return null;
}
})
.addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));
return super.newContentDecoder(contentEncoding);
}
}).addLast("encoder", new HttpResponseEncoder() {
@Override
protected boolean isContentAlwaysEmpty(HttpResponse msg) {
// non-chunked responses (Netty4HttpResponse extends Netty's DefaultFullHttpResponse) with chunked transfer
// encoding are only sent by us in response to HEAD requests and must always have an empty body
if (msg instanceof Netty4FullHttpResponse netty4FullHttpResponse && HttpUtil.isTransferEncodingChunked(msg)) {
assert netty4FullHttpResponse.content().isReadable() == false;
return true;
}
return super.isContentAlwaysEmpty(msg);
}
}).addLast(new Netty4HttpContentSizeHandler(decoder, handlingSettings.maxContentLength()));

if (handlingSettings.compression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.compressionLevel()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.IndexingPressure;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;

/**
* Accumulates a streamed HTTP request body while tracking memory usage via {@link IndexingPressure}.
Expand All @@ -35,6 +37,28 @@
*/
public class IndexingPressureAwareContentAggregator implements BaseRestHandler.RequestBodyChunkConsumer {

/**
* Transforms the accumulated request body before it is handed to the {@link CompletionHandler}.
* Implementations must release the input reference when they produce new output.
*/
@FunctionalInterface
public interface BodyPostProcessor {

BodyPostProcessor NOOP = (body, size) -> body;

/**
* Post-processes the accumulated request body (e.g. decompression).
*
* @param body The accumulated raw body to process.
* Unless the post-processor returns the same reference, it is responsible for closing it.
* The caller must not use this reference after this method returns.
* @param maxSize The maximum permitted size for the result.
* @return The post-processed body. Must not exceed {@code maxSize}. The caller is responsible for closing the returned reference.
* @throws IOException on processing failure
*/
ReleasableBytesReference process(ReleasableBytesReference body, long maxSize) throws IOException;
}

/**
* Callback for request body accumulation lifecycle events.
*/
Expand All @@ -61,6 +85,7 @@ public interface CompletionHandler {
private final IndexingPressure.Coordinating coordinating;
private final long maxRequestSize;
private final CompletionHandler completionHandler;
private final BodyPostProcessor bodyPostProcessor;

private ArrayList<ReleasableBytesReference> chunks;
private long accumulatedSize;
Expand All @@ -70,12 +95,14 @@ public IndexingPressureAwareContentAggregator(
RestRequest request,
IndexingPressure.Coordinating coordinating,
long maxRequestSize,
CompletionHandler completionHandler
CompletionHandler completionHandler,
BodyPostProcessor bodyPostProcessor
) {
this.request = request;
this.coordinating = coordinating;
this.maxRequestSize = maxRequestSize;
this.completionHandler = completionHandler;
this.bodyPostProcessor = Objects.requireNonNull(bodyPostProcessor);
}

@Override
Expand All @@ -91,21 +118,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
}

accumulatedSize += chunk.length();
if (accumulatedSize > maxRequestSize) {
chunk.close();
closed = true;
if (chunks != null) {
Releasables.close(chunks);
chunks = null;
}
coordinating.close();
completionHandler.onFailure(
channel,
new ElasticsearchStatusException(
"request body too large, max [" + maxRequestSize + "] bytes",
RestStatus.REQUEST_ENTITY_TOO_LARGE
)
);
if (failIfAboveLimit(channel, chunk)) {
return;
}

Expand All @@ -126,16 +139,55 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
}
chunks = null;

try {
fullBody = bodyPostProcessor.process(fullBody, maxRequestSize);
} catch (Exception e) {
closeOnFailure(channel, e, fullBody);
return;
}
accumulatedSize = fullBody.length();
if (failIfAboveLimit(channel, fullBody)) {
return;
}

long excess = maxRequestSize - accumulatedSize;
if (excess > 0) {
coordinating.reduceBytes(excess);
}

closed = true;
completionHandler.onComplete(channel, fullBody, coordinating);
}
}

/**
* @return {@code true} if the limit was exceeded and failure handling was performed, otherwise {@code false}.
*/
private boolean failIfAboveLimit(RestChannel channel, Releasable releasable) {
if (accumulatedSize > maxRequestSize) {
closeOnFailure(
channel,
new ElasticsearchStatusException(
"request body too large, max [" + maxRequestSize + "] bytes",
RestStatus.REQUEST_ENTITY_TOO_LARGE
),
releasable
);
return true;
}
return false;
}

private void closeOnFailure(RestChannel channel, Exception e, Releasable releasable) {
releasable.close();
if (chunks != null) {
Releasables.close(chunks);
chunks = null;
}
closed = true;
coordinating.close();
completionHandler.onFailure(channel, e);
}

@Override
public void streamClose() {
if (closed == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.rest;

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.test.rest.FakeRestRequest;
import org.junit.After;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -177,6 +179,60 @@ public void testReducesPressureToActualSize() {
assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes());
}

public void testPostProcessorExpandsContent() {
long maxSize = 1024;
int compressedSize = 50;
int expandedSize = 200;
byte[] expanded = randomByteArrayOfLength(expandedSize);

initAggregator(maxSize, (body, max) -> {
body.close();
return new ReleasableBytesReference(new BytesArray(expanded), () -> {});
});

assertEquals(maxSize, indexingPressure.stats().getCurrentCoordinatingBytes());

var chunk = randomReleasableBytesReference(compressedSize);
stream.sendNext(chunk, true);

assertNotNull(contentRef.get());
assertEquals(expandedSize, contentRef.get().length());
assertEquals(expandedSize, indexingPressure.stats().getCurrentCoordinatingBytes());

pressureRef.get().close();
assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes());
}

public void testPostProcessorResultExceedsMaxSize() {
long maxSize = 100;
int compressedSize = 50;
int expandedSize = 200;
byte[] expanded = randomByteArrayOfLength(expandedSize);

initAggregator(maxSize, (body, max) -> {
body.close();
return new ReleasableBytesReference(new BytesArray(expanded), () -> {});
});

var chunk = randomReleasableBytesReference(compressedSize);
stream.sendNext(chunk, true);

assertTooLargeRejected();
}

public void testPostProcessorThrowsReleasesResources() {
long maxSize = 1024;
initAggregator(maxSize, (body, max) -> { throw new IOException("decompression failed"); });

var chunk = randomReleasableBytesReference(64);
stream.sendNext(chunk, true);

assertNull(contentRef.get());
assertNotNull(channel.capturedResponse());
assertFalse(chunk.hasReferences());
assertEquals(0, indexingPressure.stats().getCurrentCoordinatingBytes());
}

private RestRequest newStreamedRequest(FakeHttpBodyStream stream) {
var httpRequest = new FakeRestRequest.FakeHttpRequest(
RestRequest.Method.POST,
Expand All @@ -195,6 +251,10 @@ private void assertTooLargeRejected() {
}

private void initAggregator(long maxSize) {
initAggregator(maxSize, IndexingPressureAwareContentAggregator.BodyPostProcessor.NOOP);
}

private void initAggregator(long maxSize, IndexingPressureAwareContentAggregator.BodyPostProcessor postProcessor) {
var request = newStreamedRequest(stream);
channel = new FakeRestChannel(request, true, 1);
var coordinating = indexingPressure.markCoordinatingOperationStarted(1, maxSize, false);
Expand All @@ -213,7 +273,8 @@ public void onComplete(RestChannel ch, ReleasableBytesReference content, Releasa
public void onFailure(RestChannel ch, Exception e) {
ch.sendResponse(new RestResponse(RestStatus.REQUEST_ENTITY_TOO_LARGE, e.getMessage()));
}
}
},
postProcessor
);
stream.setHandler(new HttpBody.ChunkHandler() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

package org.elasticsearch.xpack.prometheus;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.compression.Snappy;

import org.apache.http.HttpHeaders;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.util.EntityUtils;
Expand Down Expand Up @@ -253,7 +258,8 @@ private void sendAndAssertSuccess(RemoteWrite.WriteRequest writeRequest) throws

private void sendAndAssertSuccess(RemoteWrite.WriteRequest writeRequest, String endpoint) throws IOException {
Request request = new Request("POST", endpoint);
request.setEntity(new ByteArrayEntity(writeRequest.toByteArray(), ContentType.create("application/x-protobuf")));
request.setEntity(new ByteArrayEntity(snappyEncode(writeRequest.toByteArray()), ContentType.create("application/x-protobuf")));
request.setOptions(request.getOptions().toBuilder().addHeader(HttpHeaders.CONTENT_ENCODING, "snappy"));
Response response = client().performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), equalTo(204));
}
Expand All @@ -264,7 +270,8 @@ private String sendAndAssertBadRequest(RemoteWrite.WriteRequest writeRequest) th

private String sendAndAssertBadRequest(RemoteWrite.WriteRequest writeRequest, String endpoint) throws IOException {
Request request = new Request("POST", endpoint);
request.setEntity(new ByteArrayEntity(writeRequest.toByteArray(), ContentType.create("application/x-protobuf")));
request.setEntity(new ByteArrayEntity(snappyEncode(writeRequest.toByteArray()), ContentType.create("application/x-protobuf")));
request.setOptions(request.getOptions().toBuilder().addHeader(HttpHeaders.CONTENT_ENCODING, "snappy"));
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
return EntityUtils.toString(e.getResponse().getEntity());
Expand Down Expand Up @@ -337,4 +344,19 @@ private boolean dataStreamExists(String dataStream) throws IOException {
throw e;
}
}

private static byte[] snappyEncode(byte[] input) {
ByteBuf in = Unpooled.wrappedBuffer(input);
ByteBuf out = Unpooled.buffer(input.length);
try {
new Snappy().encode(in, out, input.length);
byte[] result = new byte[out.readableBytes()];
out.readBytes(result);
return result;
} finally {
in.release();
out.release();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

package org.elasticsearch.xpack.prometheus;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.FeatureFlag;
Expand Down Expand Up @@ -43,6 +45,7 @@ public class PrometheusPlugin extends Plugin implements ActionPlugin {

private final SetOnce<PrometheusIndexTemplateRegistry> indexTemplateRegistry = new SetOnce<>();
private final SetOnce<IndexingPressure> indexingPressure = new SetOnce<>();
private final SetOnce<Recycler<BytesRef>> recycler = new SetOnce<>();
private final boolean enabled;
private final long maxProtobufContentLengthBytes;

Expand All @@ -56,6 +59,7 @@ public Collection<?> createComponents(PluginServices services) {
Settings settings = services.environment().settings();
ClusterService clusterService = services.clusterService();
indexingPressure.set(services.indexingPressure());
recycler.set(services.bigArrays().bytesRefRecycler());
indexTemplateRegistry.set(
new PrometheusIndexTemplateRegistry(
settings,
Expand Down Expand Up @@ -93,7 +97,7 @@ public Collection<RestHandler> getRestHandlers(
) {
if (enabled) {
assert indexingPressure.get() != null : "indexing pressure must be set if plugin is enabled";
return List.of(new PrometheusRemoteWriteRestAction(indexingPressure.get(), maxProtobufContentLengthBytes));
return List.of(new PrometheusRemoteWriteRestAction(indexingPressure.get(), maxProtobufContentLengthBytes, recycler.get()));
}
return List.of();
}
Expand Down
Loading
Loading