Skip to content

Commit e58f4b4

Browse files
authored
Introduce TimeSeriesRoutingIdFieldMapper and use it to create TSDB ids (elastic#106080)
Supporting non-keyword fields requires updating non-keyword fields in the routing path to be included in routing calculations. Routing is performed in coordinating nodes that lack mappings (or mappings haven't been created yet, for dynamically-defined dimensions), so the routing hash they calculate are passed to data nodes and stored in a new fields, namely _ts_routind_hash. This is included in the _id field, in turn, so that it can consistently reach the right shard for get-by-id and delete-by-id operations. A few interesting points: - The hash is passed from the coordinating to data nodes using the `routing` field in `IndexRequest`; adding another field to the latter requires updating dozens of classes. - We explicitly skip (double-) storing the hash to the routing field, as the latter is not optimized for storage using the TSDB codec. - The routing hash may not be available in Translog operations, it can then be retrieved from the `id` prefix. Related to elastic#103567
1 parent c59d4bc commit e58f4b4

36 files changed

+586
-313
lines changed

modules/lang-painless/src/main/java/org/elasticsearch/painless/action/PainlessExecuteAction.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,13 @@
5656
import org.elasticsearch.index.Index;
5757
import org.elasticsearch.index.IndexMode;
5858
import org.elasticsearch.index.IndexService;
59+
import org.elasticsearch.index.IndexVersions;
5960
import org.elasticsearch.index.mapper.DateFieldMapper;
6061
import org.elasticsearch.index.mapper.DocumentMapper;
6162
import org.elasticsearch.index.mapper.OnScriptError;
6263
import org.elasticsearch.index.mapper.ParsedDocument;
6364
import org.elasticsearch.index.mapper.SourceToParse;
65+
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
6466
import org.elasticsearch.index.query.AbstractQueryBuilder;
6567
import org.elasticsearch.index.query.QueryBuilder;
6668
import org.elasticsearch.index.query.SearchExecutionContext;
@@ -808,13 +810,18 @@ private static Response prepareRamIndex(
808810
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig(defaultAnalyzer))) {
809811
BytesReference document = request.contextSetup.document;
810812
XContentType xContentType = request.contextSetup.xContentType;
811-
String id;
812-
if (indexService.getIndexSettings().getMode() == IndexMode.TIME_SERIES) {
813-
id = null; // The id gets auto generated for time series indices.
814-
} else {
815-
id = "_id";
816-
}
817-
SourceToParse sourceToParse = new SourceToParse(id, document, xContentType);
813+
814+
SourceToParse sourceToParse = (indexService.getIndexSettings().getMode() == IndexMode.TIME_SERIES)
815+
? new SourceToParse(
816+
null,
817+
document,
818+
xContentType,
819+
indexService.getIndexSettings().getIndexVersionCreated().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID)
820+
? TimeSeriesRoutingHashFieldMapper.DUMMY_ENCODED_VALUE
821+
: null
822+
)
823+
: new SourceToParse("_id", document, xContentType);
824+
818825
DocumentMapper documentMapper = indexService.mapperService().documentMapper();
819826
if (documentMapper == null) {
820827
documentMapper = DocumentMapper.createEmpty(indexService.mapperService());

modules/parent-join/src/main/java/org/elasticsearch/join/mapper/ParentJoinFieldMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ public void parse(DocumentParserContext context) throws IOException {
293293
if (parent == null) {
294294
throw new IllegalArgumentException("[parent] is missing for join field [" + name() + "]");
295295
}
296-
if (context.sourceToParse().routing() == null) {
296+
if (context.routing() == null) {
297297
throw new IllegalArgumentException("[routing] is missing for join field [" + name() + "]");
298298
}
299299
String fieldName = fieldType().joiner.parentJoinField(name);

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -689,9 +689,7 @@ private static Engine.Result performOpOnReplica(
689689
indexRequest.id(),
690690
indexRequest.source(),
691691
indexRequest.getContentType(),
692-
indexRequest.routing(),
693-
Map.of(),
694-
DocumentSizeObserver.EMPTY_INSTANCE
692+
indexRequest.routing()
695693
);
696694
result = replica.applyIndexOperationOnReplica(
697695
primaryResponse.getSeqNo(),

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
881881

882882
@Override
883883
public int route(IndexRouting indexRouting) {
884-
return indexRouting.indexShard(id, routing, contentType, source);
884+
return indexRouting.indexShard(id, routing, contentType, source, this::routing);
885885
}
886886

887887
public IndexRequest setRequireAlias(boolean requireAlias) {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.elasticsearch.common.util.ByteUtils;
2222
import org.elasticsearch.common.xcontent.XContentHelper;
2323
import org.elasticsearch.core.Nullable;
24+
import org.elasticsearch.index.IndexVersions;
25+
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
2426
import org.elasticsearch.transport.Transports;
2527
import org.elasticsearch.xcontent.XContentParser;
2628
import org.elasticsearch.xcontent.XContentParser.Token;
@@ -35,6 +37,7 @@
3537
import java.util.List;
3638
import java.util.Map;
3739
import java.util.Set;
40+
import java.util.function.Consumer;
3841
import java.util.function.IntConsumer;
3942
import java.util.function.IntSupplier;
4043
import java.util.function.Predicate;
@@ -74,7 +77,13 @@ private IndexRouting(IndexMetadata metadata) {
7477
* Called when indexing a document to generate the shard id that should contain
7578
* a document with the provided parameters.
7679
*/
77-
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source);
80+
public abstract int indexShard(
81+
String id,
82+
@Nullable String routing,
83+
XContentType sourceType,
84+
BytesReference source,
85+
Consumer<String> routingHashSetter
86+
);
7887

7988
/**
8089
* Called when updating a document to generate the shard id that should contain
@@ -153,7 +162,13 @@ public void process(IndexRequest indexRequest) {
153162
}
154163

155164
@Override
156-
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
165+
public int indexShard(
166+
String id,
167+
@Nullable String routing,
168+
XContentType sourceType,
169+
BytesReference source,
170+
Consumer<String> routingHashSetter
171+
) {
157172
if (id == null) {
158173
throw new IllegalStateException("id is required and should have been set by process");
159174
}
@@ -237,12 +252,14 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
237252
public static class ExtractFromSource extends IndexRouting {
238253
private final Predicate<String> isRoutingPath;
239254
private final XContentParserConfiguration parserConfig;
255+
private final boolean trackTimeSeriesRoutingHash;
240256

241257
ExtractFromSource(IndexMetadata metadata) {
242258
super(metadata);
243259
if (metadata.isRoutingPartitionedIndex()) {
244260
throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path");
245261
}
262+
trackTimeSeriesRoutingHash = metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
246263
List<String> routingPaths = metadata.getRoutingPaths();
247264
isRoutingPath = Regex.simpleMatcher(routingPaths.toArray(String[]::new));
248265
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(Set.copyOf(routingPaths), null, true);
@@ -256,10 +273,20 @@ public boolean matchesField(String fieldName) {
256273
public void process(IndexRequest indexRequest) {}
257274

258275
@Override
259-
public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
276+
public int indexShard(
277+
String id,
278+
@Nullable String routing,
279+
XContentType sourceType,
280+
BytesReference source,
281+
Consumer<String> routingHashSetter
282+
) {
260283
assert Transports.assertNotTransportThread("parsing the _source can get slow");
261284
checkNoRouting(routing);
262-
return hashToShardId(hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty));
285+
int hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
286+
if (trackTimeSeriesRoutingHash) {
287+
routingHashSetter.accept(TimeSeriesRoutingHashFieldMapper.encode(hash));
288+
}
289+
return hashToShardId(hash);
263290
}
264291

265292
public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
@@ -334,16 +361,13 @@ private void extractItem(String path, XContentParser source) throws IOException
334361
source.nextToken();
335362
break;
336363
case VALUE_STRING:
364+
case VALUE_NUMBER:
337365
hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
338366
source.nextToken();
339367
break;
340368
case VALUE_NULL:
341369
source.nextToken();
342370
break;
343-
case VALUE_NUMBER: // allow parsing numbers assuming routing fields are always keyword fields
344-
hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
345-
source.nextToken();
346-
break;
347371
default:
348372
throw new ParsingException(
349373
source.getTokenLocation(),

server/src/main/java/org/elasticsearch/index/IndexMode.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.index.mapper.RoutingFieldMapper;
2929
import org.elasticsearch.index.mapper.SourceFieldMapper;
3030
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
31+
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
3132
import org.elasticsearch.index.mapper.TsidExtractingIdFieldMapper;
3233

3334
import java.io.IOException;
@@ -92,6 +93,12 @@ public MetadataFieldMapper timeSeriesIdFieldMapper() {
9293
return null;
9394
}
9495

96+
@Override
97+
public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
98+
// non time-series indices must not have a TimeSeriesRoutingIdFieldMapper
99+
return null;
100+
}
101+
95102
@Override
96103
public IdFieldMapper idFieldMapperWithoutFieldData() {
97104
return ProvidedIdFieldMapper.NO_FIELD_DATA;
@@ -185,6 +192,11 @@ public MetadataFieldMapper timeSeriesIdFieldMapper() {
185192
return TimeSeriesIdFieldMapper.INSTANCE;
186193
}
187194

195+
@Override
196+
public MetadataFieldMapper timeSeriesRoutingHashFieldMapper() {
197+
return TimeSeriesRoutingHashFieldMapper.INSTANCE;
198+
}
199+
188200
public IdFieldMapper idFieldMapperWithoutFieldData() {
189201
return TsidExtractingIdFieldMapper.INSTANCE;
190202
}
@@ -322,6 +334,13 @@ public String getName() {
322334
*/
323335
public abstract MetadataFieldMapper timeSeriesIdFieldMapper();
324336

337+
/**
338+
* Return an instance of the {@link TimeSeriesRoutingHashFieldMapper} that generates
339+
* the _ts_routing_hash field. The field mapper will be added to the list of the metadata
340+
* field mappers for the index.
341+
*/
342+
public abstract MetadataFieldMapper timeSeriesRoutingHashFieldMapper();
343+
325344
/**
326345
* How {@code time_series_dimension} fields are handled by indices in this mode.
327346
*/

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ private static IndexVersion def(int id, Version luceneVersion) {
102102
public static final IndexVersion UPGRADE_LUCENE_9_9_2 = def(8_502_00_0, Version.LUCENE_9_9_2);
103103
public static final IndexVersion TIME_SERIES_ID_HASHING = def(8_502_00_1, Version.LUCENE_9_9_2);
104104
public static final IndexVersion UPGRADE_TO_LUCENE_9_10 = def(8_503_00_0, Version.LUCENE_9_10_0);
105+
public static final IndexVersion TIME_SERIES_ROUTING_HASH_IN_ID = def(8_504_00_0, Version.LUCENE_9_10_0);
105106

106107
/*
107108
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/index/codec/PerFieldMapperCodec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ boolean useTSDBDocValuesFormat(final String field) {
115115

116116
private boolean excludeFields(String fieldName) {
117117
// Avoid using tsdb codec for fields like _seq_no, _primary_term.
118-
// But _tsid should always use the tesbd codec.
119-
return fieldName.startsWith("_") && fieldName.equals("_tsid") == false;
118+
// But _tsid and _ts_routing_hash should always use the tsdb codec.
119+
return fieldName.startsWith("_") && fieldName.equals("_tsid") == false && fieldName.equals("_ts_routing_hash") == false;
120120
}
121121

122122
private boolean isTimeSeriesModeIndex() {

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,9 @@
5959
import org.elasticsearch.index.mapper.VersionFieldMapper;
6060
import org.elasticsearch.index.shard.ShardId;
6161
import org.elasticsearch.index.translog.Translog;
62-
import org.elasticsearch.plugins.internal.DocumentSizeObserver;
6362

6463
import java.io.IOException;
6564
import java.util.Collections;
66-
import java.util.Map;
6765
import java.util.Set;
6866
import java.util.concurrent.atomic.AtomicReference;
6967

@@ -254,14 +252,7 @@ private LeafReader getDelegate() {
254252
private LeafReader createInMemoryLeafReader() {
255253
assert Thread.holdsLock(this);
256254
final ParsedDocument parsedDocs = documentParser.parseDocument(
257-
new SourceToParse(
258-
operation.id(),
259-
operation.source(),
260-
XContentHelper.xContentType(operation.source()),
261-
operation.routing(),
262-
Map.of(),
263-
DocumentSizeObserver.EMPTY_INSTANCE
264-
),
255+
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
265256
mappingLookup
266257
);
267258

server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
9999
context.version(),
100100
context.seqID(),
101101
context.id(),
102-
source.routing(),
102+
context.routing(),
103103
context.reorderParentAndGetDocs(),
104104
context.sourceToParse().source(),
105105
context.sourceToParse().getXContentType(),

0 commit comments

Comments
 (0)