Skip to content

Commit 4d827b1

Browse files
committed
ESQL: Enable new data types with created version (elastic#136327)
Enables `dense_vector` and `aggregate_metric_double` if all nodes in all of the target clusters support it. Relates to elastic#135193
1 parent 89e2f54 commit 4d827b1

File tree

25 files changed

+361
-344
lines changed

25 files changed

+361
-344
lines changed

docs/changelog/136327.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 136327
2+
summary: Enable new data types with created version
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/reference/query-languages/esql/_snippets/functions/parameters/text_embedding.md

Lines changed: 0 additions & 10 deletions
This file was deleted.

docs/reference/query-languages/esql/images/functions/text_embedding.svg

Lines changed: 0 additions & 1 deletion
This file was deleted.

docs/reference/query-languages/esql/kibana/definition/functions/text_embedding.json

Lines changed: 0 additions & 12 deletions
This file was deleted.

docs/reference/query-languages/esql/kibana/docs/functions/text_embedding.md

Lines changed: 0 additions & 10 deletions
This file was deleted.

x-pack/plugin/build.gradle

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,19 +142,6 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
142142
task.skipTest("ml/sparse_vector_search/Search on a sparse_vector field with dots in the field names", "Vectors are no longer returned by default")
143143
task.skipTest("ml/sparse_vector_search/Search on a nested sparse_vector field with dots in the field names and conflicting child fields", "Vectors are no longer returned by default")
144144
task.skipTest("esql/190_lookup_join/lookup-no-key-only-key", "Requires the fix")
145-
task.skipTest("esql/40_tsdb/aggregate_metric_double unsortable", "Extra function required to enable the field type")
146-
task.skipTest("esql/40_tsdb/avg of aggregate_metric_double", "Extra function required to enable the field type")
147-
task.skipTest("esql/40_tsdb/grouping stats on aggregate_metric_double", "Extra function required to enable the field type")
148-
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing min and max", "Extra function required to enable the field type")
149-
task.skipTest("esql/40_tsdb/render aggregate_metric_double when missing value", "Extra function required to enable the field type")
150-
task.skipTest("esql/40_tsdb/sorting with aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
151-
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double missing min and max", "Extra function required to enable the field type")
152-
task.skipTest("esql/40_tsdb/to_string aggregate_metric_double", "Extra function required to enable the field type")
153-
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double with partial submetrics", "Extra function required to enable the field type")
154-
task.skipTest("esql/46_downsample/MV_EXPAND on non-MV aggregate metric double", "Extra function required to enable the field type")
155-
task.skipTest("esql/46_downsample/Query stats on downsampled index", "Extra function required to enable the field type")
156-
task.skipTest("esql/46_downsample/Render stats from downsampled index", "Extra function required to enable the field type")
157-
task.skipTest("esql/46_downsample/Sort from multiple indices one with aggregate metric double", "Extra function required to enable the field type")
158145
})
159146

160147
tasks.named('yamlRestCompatTest').configure {

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.esql.core.type;
88

99
import org.apache.lucene.util.BytesRef;
10+
import org.elasticsearch.Build;
1011
import org.elasticsearch.TransportVersion;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -748,7 +749,7 @@ public DataType counter() {
748749

749750
@Override
750751
public void writeTo(StreamOutput out) throws IOException {
751-
if (supportedVersion.supportedOn(out.getTransportVersion()) == false) {
752+
if (supportedVersion.supportedOn(out.getTransportVersion(), Build.current().isSnapshot()) == false) {
752753
/*
753754
* TODO when we implement version aware planning flip this to an IllegalStateException
754755
* so we throw a 500 error. It'll be our bug then. Right now it's a sign that the user

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/SupportedVersion.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
import org.elasticsearch.TransportVersion;
1212

1313
public interface SupportedVersion {
14-
boolean supportedOn(TransportVersion version);
14+
boolean supportedOn(TransportVersion version, boolean currentBuildIsSnapshot);
1515

1616
default boolean supportedLocally() {
17-
return supportedOn(TransportVersion.current());
17+
return supportedOn(TransportVersion.current(), Build.current().isSnapshot());
1818
}
1919

2020
SupportedVersion SUPPORTED_ON_ALL_NODES = new SupportedVersion() {
2121
@Override
22-
public boolean supportedOn(TransportVersion version) {
22+
public boolean supportedOn(TransportVersion version, boolean currentBuildIsSnapshot) {
2323
return true;
2424
}
2525

@@ -56,8 +56,8 @@ public String toString() {
5656
// Check usage of this constant to be sure.
5757
SupportedVersion UNDER_CONSTRUCTION = new SupportedVersion() {
5858
@Override
59-
public boolean supportedOn(TransportVersion version) {
60-
return Build.current().isSnapshot();
59+
public boolean supportedOn(TransportVersion version, boolean currentBuildIsSnapshot) {
60+
return currentBuildIsSnapshot;
6161
}
6262

6363
@Override
@@ -76,8 +76,8 @@ public String toString() {
7676
static SupportedVersion supportedSince(TransportVersion supportedVersion) {
7777
return new SupportedVersion() {
7878
@Override
79-
public boolean supportedOn(TransportVersion version) {
80-
return version.supports(supportedVersion) || Build.current().isSnapshot();
79+
public boolean supportedOn(TransportVersion version, boolean currentBuildIsSnapshot) {
80+
return version.supports(supportedVersion) || currentBuildIsSnapshot;
8181
}
8282

8383
@Override

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java

Lines changed: 89 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.IOException;
3333
import java.util.ArrayList;
3434
import java.util.Arrays;
35+
import java.util.Comparator;
3536
import java.util.List;
3637
import java.util.Locale;
3738
import java.util.Map;
@@ -43,6 +44,7 @@
4344
import static org.elasticsearch.test.ListMatcher.matchesList;
4445
import static org.elasticsearch.test.MapMatcher.assertMap;
4546
import static org.elasticsearch.test.MapMatcher.matchesMap;
47+
import static org.elasticsearch.xpack.esql.action.EsqlResolveFieldsResponse.RESOLVE_FIELDS_RESPONSE_CREATED_TV;
4648
import static org.hamcrest.Matchers.any;
4749
import static org.hamcrest.Matchers.anyOf;
4850
import static org.hamcrest.Matchers.containsString;
@@ -76,11 +78,6 @@ public class AllSupportedFieldsTestCase extends ESRestTestCase {
7678

7779
@ParametersFactory(argumentFormatting = "pref=%s mode=%s")
7880
public static List<Object[]> args() {
79-
if (Build.current().isSnapshot()) {
80-
// We only test behavior in release builds. Snapshot builds will have data types enabled that are still under construction.
81-
return List.of();
82-
}
83-
8481
List<Object[]> args = new ArrayList<>();
8582
for (MappedFieldType.FieldExtractPreference extractPreference : Arrays.asList(
8683
null,
@@ -102,7 +99,7 @@ protected AllSupportedFieldsTestCase(MappedFieldType.FieldExtractPreference extr
10299
this.indexMode = indexMode;
103100
}
104101

105-
protected record NodeInfo(String cluster, String id, TransportVersion version, Set<String> roles) {}
102+
protected record NodeInfo(String cluster, String id, boolean snapshot, TransportVersion version, Set<String> roles) {}
106103

107104
private static Map<String, NodeInfo> nodeToInfo;
108105

@@ -126,6 +123,19 @@ protected boolean fetchDenseVectorAggMetricDoubleIfFns() throws IOException {
126123
return clusterHasCapability("GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_FNS")).orElse(false);
127124
}
128125

126+
private static Boolean denseVectorAggMetricDoubleIfVersion;
127+
128+
private boolean denseVectorAggMetricDoubleIfVersion() throws IOException {
129+
if (denseVectorAggMetricDoubleIfVersion == null) {
130+
denseVectorAggMetricDoubleIfVersion = fetchDenseVectorAggMetricDoubleIfVersion();
131+
}
132+
return denseVectorAggMetricDoubleIfVersion;
133+
}
134+
135+
protected boolean fetchDenseVectorAggMetricDoubleIfVersion() throws IOException {
136+
return clusterHasCapability("GET", "/_query", List.of(), List.of("DENSE_VECTOR_AGG_METRIC_DOUBLE_IF_VERSION")).orElse(false);
137+
}
138+
129139
private static Boolean supportsNodeAssignment;
130140

131141
protected boolean supportsNodeAssignment() throws IOException {
@@ -153,11 +163,21 @@ protected static Map<String, NodeInfo> fetchNodeToInfo(RestClient client, String
153163
String id = (String) n.getKey();
154164
Map<?, ?> nodeInfo = (Map<?, ?>) n.getValue();
155165
String nodeName = (String) extractValue(nodeInfo, "name");
166+
167+
/*
168+
* Figuring out is a node is a snapshot is kind of tricky. The main version
169+
* doesn't include -SNAPSHOT. But ${VERSION}-SNAPSHOT is in the node info
170+
* *somewhere*. So we do this silly toString here.
171+
*/
172+
String version = (String) extractValue(nodeInfo, "version");
173+
boolean snapshot = nodeInfo.toString().contains(version + "-SNAPSHOT");
174+
156175
TransportVersion transportVersion = TransportVersion.fromId((Integer) extractValue(nodeInfo, "transport_version"));
157176
List<?> roles = (List<?>) nodeInfo.get("roles");
177+
158178
nodeToInfo.put(
159179
nodeName,
160-
new NodeInfo(cluster, id, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet()))
180+
new NodeInfo(cluster, id, snapshot, transportVersion, roles.stream().map(Object::toString).collect(Collectors.toSet()))
161181
);
162182
}
163183

@@ -175,6 +195,22 @@ public void createIndices() throws IOException {
175195
}
176196
}
177197

198+
/**
199+
* Make sure the test doesn't run on snapshot builds. Release builds only.
200+
* <p>
201+
* {@link Build#isSnapshot()} checks if the version under test is a snapshot.
202+
* But! This run test runs against many versions and if *any* are snapshots
203+
* then this will fail. So we check the versions of each node in the cluster too.
204+
* </p>
205+
*/
206+
@Before
207+
public void skipSnapshots() throws IOException {
208+
assumeFalse("Only supported on production builds", Build.current().isSnapshot());
209+
for (NodeInfo n : allNodeToInfo().values()) {
210+
assumeFalse("Only supported on production builds", n.snapshot());
211+
}
212+
}
213+
178214
// TODO: Also add a test for _tsid once we can determine the minimum transport version of all nodes.
179215
public final void testFetchAll() throws IOException {
180216
Map<String, Object> response = esql("""
@@ -212,7 +248,7 @@ public final void testFetchAll() throws IOException {
212248
if (supportedInIndex(type) == false) {
213249
continue;
214250
}
215-
expectedValues = expectedValues.entry(fieldName(type), expectedValue(type));
251+
expectedValues = expectedValues.entry(fieldName(type), expectedValue(type, nodeInfo));
216252
}
217253
expectedValues = expectedValues.entry("_id", any(String.class))
218254
.entry("_ignored", nullValue())
@@ -227,15 +263,23 @@ public final void testFetchAll() throws IOException {
227263
profileLogger.clearProfile();
228264
}
229265

230-
// Tests a workaround and will become obsolete once we can determine the actual minimum transport version of all nodes.
266+
/**
267+
* Tests fetching {@code dense_vector} if possible. Uses the {@code dense_vector_agg_metric_double_if_fns}
268+
* work around if required.
269+
*/
231270
public final void testFetchDenseVector() throws IOException {
232271
Map<String, Object> response;
233272
try {
234-
response = esql("""
235-
| EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector
273+
String request = """
236274
| KEEP _index, f_dense_vector
237275
| LIMIT 1000
238-
""");
276+
""";
277+
if (denseVectorAggMetricDoubleIfVersion() == false) {
278+
request = """
279+
| EVAL k = v_l2_norm(f_dense_vector, [1]) // workaround to enable fetching dense_vector
280+
""" + request;
281+
}
282+
response = esql(request);
239283
if ((Boolean) response.get("is_partial")) {
240284
Map<?, ?> clusters = (Map<?, ?>) response.get("_clusters");
241285
Map<?, ?> details = (Map<?, ?>) clusters.get("details");
@@ -410,7 +454,7 @@ private void createAllTypesDoc(RestClient client, String indexName) throws IOExc
410454
}
411455

412456
// This will become dependent on the minimum transport version of all nodes once we can determine that.
413-
private Matcher<?> expectedValue(DataType type) {
457+
private Matcher<?> expectedValue(DataType type, NodeInfo nodeInfo) throws IOException {
414458
return switch (type) {
415459
case BOOLEAN -> equalTo(true);
416460
case COUNTER_LONG, LONG, COUNTER_INTEGER, INTEGER, UNSIGNED_LONG, SHORT, BYTE -> equalTo(1);
@@ -429,14 +473,24 @@ private Matcher<?> expectedValue(DataType type) {
429473
case GEO_SHAPE -> equalTo("POINT (-71.34 41.12)");
430474
case NULL -> nullValue();
431475
case AGGREGATE_METRIC_DOUBLE -> {
432-
// Currently, we cannot tell if all nodes support it or not so we treat it as unsupported.
433-
// TODO: Fix this once we know the node versions.
434-
yield nullValue();
476+
/*
477+
* We need both AGGREGATE_METRIC_DOUBLE_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV
478+
* but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it.
479+
*/
480+
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
481+
yield nullValue();
482+
}
483+
yield equalTo("{\"min\":-302.5,\"max\":702.3,\"sum\":200.0,\"value_count\":25}");
435484
}
436485
case DENSE_VECTOR -> {
437-
// Currently, we cannot tell if all nodes support it or not so we treat it as unsupported.
438-
// TODO: Fix this once we know the node versions.
439-
yield nullValue();
486+
/*
487+
* We need both DENSE_VECTOR_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV
488+
* but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it.
489+
*/
490+
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
491+
yield nullValue();
492+
}
493+
yield equalTo(List.of(0.5, 10.0, 5.9999995));
440494
}
441495
default -> throw new AssertionError("unsupported field type [" + type + "]");
442496
};
@@ -507,7 +561,7 @@ private Map<String, Object> nameToValue(List<String> names, List<?> values) {
507561
}
508562

509563
// This will become dependent on the minimum transport version of all nodes once we can determine that.
510-
private Matcher<String> expectedType(DataType type) {
564+
private Matcher<String> expectedType(DataType type) throws IOException {
511565
return switch (type) {
512566
case COUNTER_DOUBLE, COUNTER_LONG, COUNTER_INTEGER -> {
513567
if (indexMode == IndexMode.TIME_SERIES) {
@@ -518,10 +572,16 @@ private Matcher<String> expectedType(DataType type) {
518572
case BYTE, SHORT -> equalTo("integer");
519573
case HALF_FLOAT, SCALED_FLOAT, FLOAT -> equalTo("double");
520574
case NULL -> equalTo("keyword");
521-
// Currently unsupported without TS command or KNN function
522-
case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR ->
523-
// TODO: Fix this once we know the node versions.
524-
equalTo("unsupported");
575+
case AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> {
576+
/*
577+
* We need both <type_name>_CREATED and RESOLVE_FIELDS_RESPONSE_CREATED_TV
578+
* but RESOLVE_FIELDS_RESPONSE_CREATED_TV came last so it's enough to check just it.
579+
*/
580+
if (minVersion().supports(RESOLVE_FIELDS_RESPONSE_CREATED_TV) == false) {
581+
yield equalTo("unsupported");
582+
}
583+
yield equalTo(type.esType());
584+
}
525585
default -> equalTo(type.esType());
526586
};
527587
}
@@ -555,9 +615,13 @@ private Map<String, NodeInfo> expectedIndices() throws IOException {
555615
name = e.getValue().cluster + ":" + name;
556616
}
557617
// We should only end up with one per cluster
558-
result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().version(), null));
618+
result.put(name, new NodeInfo(e.getValue().cluster, null, e.getValue().snapshot(), e.getValue().version(), null));
559619
}
560620
}
561621
return result;
562622
}
623+
624+
protected TransportVersion minVersion() throws IOException {
625+
return allNodeToInfo().values().stream().map(NodeInfo::version).min(Comparator.naturalOrder()).get();
626+
}
563627
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/dense_vector-bit.csv-spec

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
retrieveBitVectorData
2-
required_capability: dense_vector_field_type_released
2+
required_capability: dense_vector_agg_metric_double_if_version
33
required_capability: l2_norm_vector_similarity_function
44

55
FROM dense_vector
6-
| EVAL k = v_l2_norm(bit_vector, [1]) // workaround to enable fetching dense_vector
76
| KEEP id, bit_vector
87
| SORT id
98
;
@@ -16,11 +15,11 @@ id:l | bit_vector:dense_vector
1615
;
1716

1817
denseBitVectorWithEval
19-
required_capability: dense_vector_field_type_released
18+
required_capability: dense_vector_agg_metric_double_if_version
2019
required_capability: l2_norm_vector_similarity_function
2120

2221
FROM dense_vector
23-
| EVAL v = bit_vector, k = v_l2_norm(bit_vector, [1]) // workaround to enable fetching dense_vector
22+
| EVAL v = bit_vector
2423
| KEEP id, v
2524
| SORT id
2625
;
@@ -33,14 +32,13 @@ id:l | v:dense_vector
3332
;
3433

3534
denseBitVectorWithRenameAndDrop
36-
required_capability: dense_vector_field_type_released
35+
required_capability: dense_vector_agg_metric_double_if_version
3736
required_capability: l2_norm_vector_similarity_function
3837

3938
FROM dense_vector
4039
| EVAL v = bit_vector
41-
| EVAL k = v_l2_norm(bit_vector, [1]) // workaround to enable fetching dense_vector
4240
| RENAME v AS new_vector
43-
| DROP float_vector, byte_vector, bit_vector, k
41+
| DROP float_vector, byte_vector, bit_vector
4442
| SORT id
4543
;
4644

0 commit comments

Comments
 (0)