Skip to content

Commit 0f7e41e

Browse files
feat(tsdb): add composable pipeline framework for ES94 TSDB codec (#143589)
Introduce the foundation layer for the ES94 pipeline codec, which replaces the monolithic ES819 encoding with a composable pipeline of transform and payload stages. This is PR 1 of a series. It contains no concrete stage implementations - only the type system, wire format, metadata I/O, block format, and context objects that subsequent PRs build on. Key components: - StageId: wire format registry of stage byte identifiers - StageSpec: sealed hierarchy of stage specifications with parameters - PipelineDescriptor: serialization/deserialization of pipeline configuration - FieldDescriptor: versioned envelope for pipeline descriptors - PipelineConfig: fluent builder API for pipeline construction - BlockFormat: per-block encode/decode layout (bitmap + payload + metadata) - EncodingContext / DecodingContext: mutable per-block state, reused via clear() - MetadataBuffer / MetadataWriter / MetadataReader: gather-scatter metadata I/O - PayloadEncoder / PayloadDecoder: interfaces for terminal payload stages
1 parent af03d6b commit 0f7e41e

25 files changed

+2941
-0
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.pipeline;
11+
12+
import org.apache.lucene.store.DataInput;
13+
import org.apache.lucene.store.DataOutput;
14+
import org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadDecoder;
15+
import org.elasticsearch.index.codec.tsdb.pipeline.numeric.PayloadEncoder;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* Defines how each block of encoded values is written to the data file.
21+
*
22+
* <p>Data file layout:
23+
* <pre>
24+
* +------------------+----------------------------------------+
25+
* | Block 0 | [bitmap][payload][stage metadata] |
26+
* | Block 1 | [bitmap][payload][stage metadata] |
27+
* | ... | ... |
28+
* | Block N-1 | [bitmap][payload][stage metadata] |
29+
* +------------------+----------------------------------------+
30+
* | Block Offsets | DirectMonotonicWriter encoded offsets |
31+
* +------------------+----------------------------------------+
32+
* </pre>
33+
*
34+
* <p>Each block contains:
35+
* <ul>
36+
* <li><strong>bitmap</strong>: 1 byte ({@code <= 8} stages) or 2 bytes ({@code > 8} stages)
37+
* indicating which stages were applied</li>
38+
* <li><strong>payload</strong>: the encoded values written by the terminal payload stage</li>
39+
* <li><strong>stage metadata</strong>: per-stage metadata written by transformation stages
40+
* (e.g., GCD divisor)</li>
41+
* </ul>
42+
*
43+
* <p>The layout is designed for sequential decoding: the bitmap comes first so the
44+
* decoder immediately knows which stages to reverse, followed by the payload and
45+
* then stage metadata in reverse stage order (see {@link EncodingContext#writeStageMetadata}).
46+
* This means the decoder can read every section in a single forward pass with no
47+
* seeking or buffering. See {@link FieldDescriptor} for the metadata file format
48+
* that describes pipeline configuration.
49+
*/
50+
public final class BlockFormat {
51+
52+
private BlockFormat() {}
53+
54+
/**
55+
* Writes a block of encoded values to the data output.
56+
*
57+
* @param out the data output stream
58+
* @param values the values to encode
59+
* @param payloadStage the terminal payload encoder
60+
* @param context the encoding context with block metadata
61+
* @throws IOException if an I/O error occurs
62+
*/
63+
public static void writeBlock(
64+
final DataOutput out,
65+
final long[] values,
66+
final PayloadEncoder payloadStage,
67+
final EncodingContext context
68+
) throws IOException {
69+
writeHeader(out, context);
70+
payloadStage.encode(values, context.valueCount(), out, context);
71+
context.writeStageMetadata(out);
72+
}
73+
74+
/**
75+
* Reads a block of encoded values from the data input.
76+
*
77+
* @param in the data input stream
78+
* @param values the output array to populate
79+
* @param payloadStage the terminal payload decoder
80+
* @param context the decoding context with block metadata
81+
* @param payloadPosition the pipeline position of the payload stage
82+
* @return the number of values decoded
83+
* @throws IOException if an I/O error occurs
84+
*/
85+
public static int readBlock(
86+
final DataInput in,
87+
final long[] values,
88+
final PayloadDecoder payloadStage,
89+
final DecodingContext context,
90+
int payloadPosition
91+
) throws IOException {
92+
readHeader(in, context);
93+
if (context.isStageApplied(payloadPosition) == false) {
94+
throw new IOException("Payload stage not applied - possible data corruption");
95+
}
96+
return payloadStage.decode(values, in, context);
97+
}
98+
99+
static void writeHeader(final DataOutput out, final EncodingContext context) throws IOException {
100+
final short bitmap = context.positionBitmap();
101+
if (context.pipelineLength() <= 8) {
102+
out.writeByte((byte) bitmap);
103+
} else {
104+
out.writeShort(bitmap);
105+
}
106+
}
107+
108+
static void readHeader(final DataInput in, final DecodingContext context) throws IOException {
109+
final int pipelineLength = context.pipelineLength();
110+
if (pipelineLength <= 0) {
111+
throw new IOException("Pipeline must be set for decoding");
112+
}
113+
114+
final short bitmap;
115+
if (pipelineLength <= 8) {
116+
bitmap = (short) (in.readByte() & 0xFF);
117+
} else {
118+
bitmap = in.readShort();
119+
}
120+
context.setPositionBitmap(bitmap);
121+
}
122+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb.pipeline;
11+
12+
import org.apache.lucene.store.DataInput;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* Mutable per-block context for decoding, tracking the position bitmap and
18+
* delegating metadata reads to the underlying {@link DataInput}. Reused
19+
* across blocks via {@link #clear()}.
20+
*/
21+
public final class DecodingContext implements MetadataReader {
22+
23+
private final int pipelineLength;
24+
private final int blockSize;
25+
26+
private DataInput dataInput;
27+
private short positionBitmap;
28+
29+
/**
30+
* Creates a decoding context.
31+
*
32+
* @param blockSize the number of values per block
33+
* @param pipelineLength the number of stages in the pipeline
34+
*/
35+
public DecodingContext(int blockSize, int pipelineLength) {
36+
this.blockSize = blockSize;
37+
this.pipelineLength = pipelineLength;
38+
}
39+
40+
/**
41+
* Sets the data input stream for reading block data and stage metadata.
42+
*
43+
* @param dataInput the data input stream
44+
*/
45+
public void setDataInput(final DataInput dataInput) {
46+
this.dataInput = dataInput;
47+
}
48+
49+
/**
50+
* Returns the number of stages in the pipeline.
51+
*
52+
* @return the pipeline length
53+
*/
54+
public int pipelineLength() {
55+
return pipelineLength;
56+
}
57+
58+
/**
59+
* Sets the bitmap of applied stage positions, read from the block header.
60+
*
61+
* @param bitmap the position bitmap
62+
*/
63+
void setPositionBitmap(short bitmap) {
64+
this.positionBitmap = bitmap;
65+
}
66+
67+
/**
68+
* Returns {@code true} if the stage at the given position was applied.
69+
*
70+
* @param position the zero-based stage index
71+
* @return whether the stage was applied
72+
*/
73+
public boolean isStageApplied(int position) {
74+
assert position >= 0 && position < pipelineLength : "Position out of range: " + position;
75+
return (positionBitmap & (1 << position)) != 0;
76+
}
77+
78+
/**
79+
* Returns the metadata reader for accessing stage metadata.
80+
*
81+
* @return the metadata reader
82+
*/
83+
public MetadataReader metadata() {
84+
return this;
85+
}
86+
87+
/**
88+
* Returns the block size.
89+
*
90+
* @return the number of values per block
91+
*/
92+
public int blockSize() {
93+
return blockSize;
94+
}
95+
96+
/**
97+
* Resets this context for reuse with the next block.
98+
*
99+
* <p>NOTE: dataInput is intentionally nulled. Unlike EncodingContext which
100+
* owns its MetadataBuffer, DecodingContext does not own the DataInput (it is injected).
101+
* Nulling forces the caller to provide a fresh DataInput via {@link #setDataInput} before
102+
* each block, which is a fail-fast against silently reading garbage from a stale stream.
103+
*/
104+
public void clear() {
105+
positionBitmap = 0;
106+
dataInput = null;
107+
}
108+
109+
@Override
110+
public byte readByte() throws IOException {
111+
return dataInput.readByte();
112+
}
113+
114+
@Override
115+
public int readZInt() throws IOException {
116+
return dataInput.readZInt();
117+
}
118+
119+
@Override
120+
public long readZLong() throws IOException {
121+
return dataInput.readZLong();
122+
}
123+
124+
@Override
125+
public long readLong() throws IOException {
126+
return dataInput.readLong();
127+
}
128+
129+
@Override
130+
public int readInt() throws IOException {
131+
return dataInput.readInt();
132+
}
133+
134+
@Override
135+
public int readVInt() throws IOException {
136+
return dataInput.readVInt();
137+
}
138+
139+
@Override
140+
public long readVLong() throws IOException {
141+
return dataInput.readVLong();
142+
}
143+
144+
@Override
145+
public void readBytes(final byte[] bytes, int offset, int length) throws IOException {
146+
dataInput.readBytes(bytes, offset, length);
147+
}
148+
}

0 commit comments

Comments
 (0)