Skip to content

Add Snappy support to remote write endpoint#143994

Merged
felixbarny merged 15 commits intoelastic:mainfrom
felixbarny:remote-write-snappy-2
Mar 13, 2026
Merged

Add Snappy support to remote write endpoint#143994
felixbarny merged 15 commits intoelastic:mainfrom
felixbarny:remote-write-snappy-2

Conversation

@felixbarny
Copy link
Member

@felixbarny felixbarny commented Mar 11, 2026

The Prometheus remote write 1.0 spec mandates Snappy block format compression (Content-Encoding: snappy). Netty's built-in HttpContentDecompressor intercepts this header but uses SnappyFrameDecoder (the framed format), which is incompatible. Valid Prometheus remote write requests are rejected before reaching the REST handler.

This PR exempts the _prometheus path from Netty's built-in Snappy framed decoding and adds a BodyPostProcessor hook to IndexingPressureAwareContentAggregator. It adds a SnappyBlockDecoder that contains a fork of Netty's Snappy decoding logic which has the following improvements:

  • On-demand page allocation vs. Netty's out.ensureWritable(uncompressedLength) pre-allocation — prevents a malicious preamble from causing a huge allocation before any data is decoded and avoids humongous object allocations.
  • maxSize check — defends against decompression bombs.
  • Final length validation (out.written != uncompressedLength) — Netty doesn't check this.
  • No state machine — appropriate since ES receives the full HTTP body, unlike Netty's incremental channel pipeline.
  • Buffers compressed bytes before starting the decompression. This lowers the memory footprint during accumulation and is appropriate for Snappy block format as we can't release any partial chunks because there may be back-references to them later on.

Alternative to #142102

To only review the introduction of BodyPostProcessor, have a look at #144035.

@felixbarny felixbarny requested a review from DaveCTurner March 11, 2026 09:16
@felixbarny felixbarny self-assigned this Mar 11, 2026
@elasticsearchmachine elasticsearchmachine added external-contributor Pull request authored by a developer outside the Elasticsearch team Team:StorageEngine v9.4.0 labels Mar 11, 2026
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-storage-engine (Team:StorageEngine)

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - some small changes requested inline but nothing substantial.

@felixbarny
Copy link
Member Author

Thanks a lot for the thorough review ❤️
I tried to address all comments. PTAL.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit, otherwise LGTM

@DaveCTurner
Copy link
Contributor

WDYT about a test like this? Can be added in a follow-up if you'd prefer

    /**
     * Randomly synthesize a valid Snappy block as a sequence of arbitrary tagged elements and assert it decodes correctly.
     */
    public void testSyntheticCompressedStream() throws IOException {
        final var uncompressed = new byte[scaledRandomIntBetween(0, ByteSizeUnit.MB.toIntBytes(32))];
        final var compressed = new RecyclerBytesStreamOutput(recycler);
        writeVarint(compressed, uncompressed.length);

        int uncompressedPosition = 0;
        while (true) {
            int remaining = uncompressed.length - uncompressedPosition;
            if (remaining == 0) {
                break;
            }
            if (uncompressedPosition == 0 || randomBoolean()) {
                final var literal = randomByteArrayOfLength(scaledRandomIntBetween(1, remaining));
                System.arraycopy(literal, 0, uncompressed, uncompressedPosition, literal.length);
                writeLiteralLength(compressed, literal.length);
                compressed.write(literal);
                uncompressedPosition += literal.length;
            } else {
                final int copyLength = between(1, Math.min(remaining, 64));
                int copyPosition = between(0, uncompressedPosition - 1);
                writeCopy(compressed, uncompressedPosition - copyPosition, copyLength);
                for (int i = 0; i < copyLength; i++) {
                    uncompressed[uncompressedPosition++] = uncompressed[copyPosition++];
                }
            }
        }

        try (var decoded = decoder.process(compressed.moveToBytesReference(), uncompressed.length + between(0, 1024))) {
            assertThat(decoded, equalBytes(new BytesArray(uncompressed)));
        }
    }

    private void writeLiteralLength(OutputStream out, int length) throws IOException {
        int offsetLength = length - 1;
        if (offsetLength > 0xFFFFFF || randomBoolean()) {
            out.write(63 << 2);
            out.write(offsetLength & 0xFF);
            out.write((offsetLength >> 8) & 0xFF);
            out.write((offsetLength >> 16) & 0xFF);
            out.write((offsetLength >> 24) & 0xFF);
        } else if (offsetLength > 0xFFFF || randomBoolean()) {
            out.write(62 << 2);
            out.write(offsetLength & 0xFF);
            out.write((offsetLength >> 8) & 0xFF);
            out.write((offsetLength >> 16) & 0xFF);
        } else if (offsetLength > 0xFF || randomBoolean()) {
            out.write(61 << 2);
            out.write(offsetLength & 0xFF);
            out.write((offsetLength >> 8) & 0xFF);
        } else if (offsetLength > 59 || randomBoolean()) {
            out.write(60 << 2);
            out.write(offsetLength & 0xFF);
        } else {
            out.write(offsetLength << 2);
        }
    }

    private void writeCopy(OutputStream out, int offset, int length) throws IOException {
        if (offset > 0xFFFF || randomBoolean()) {
            out.write(0x03 | ((length - 1) << 2));
            out.write(offset & 0xFF);
            out.write((offset >> 8) & 0xFF);
            out.write((offset >> 16) & 0xFF);
            out.write((offset >> 24) & 0xFF);
        } else if (offset > 0x7FF || ((length - 4) | 7) != 7 || randomBoolean()) {
            out.write(0x02 | ((length - 1) << 2));
            out.write(offset & 0xFF);
            out.write((offset >> 8) & 0xFF);
        } else {
            out.write(0x01 | (length - 4 << 2) | (((offset >> 8) & 0x07) << 5));
            out.write(offset & 0xFF);
        }
    }

@felixbarny felixbarny enabled auto-merge (squash) March 12, 2026 15:14
@felixbarny felixbarny merged commit ad6d82f into elastic:main Mar 13, 2026
36 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

external-contributor Pull request authored by a developer outside the Elasticsearch team >non-issue :StorageEngine/TSDB You know, for Metrics Team:StorageEngine v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants