Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.search.fieldcaps;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -266,4 +267,12 @@ public void testReturnAllLocal() {
}
}
}

public void testIncludesMinTransportVersion() {
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareCreate("index"));
}
var response = client().prepareFieldCaps("_all").setFields("*").get();
assertThat(response.minTransportVersion(), equalTo(TransportVersion.current()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -30,6 +32,9 @@
* Response for {@link FieldCapabilitiesRequest} requests.
*/
public class FieldCapabilitiesResponse extends ActionResponse implements ChunkedToXContentObject {

private static final TransportVersion MIN_TRANSPORT_VERSION = TransportVersion.fromName("min_transport_version");

public static final ParseField INDICES_FIELD = new ParseField("indices");
public static final ParseField FIELDS_FIELD = new ParseField("fields");
private static final ParseField FAILED_INDICES_FIELD = new ParseField("failed_indices");
Expand All @@ -39,40 +44,46 @@ public class FieldCapabilitiesResponse extends ActionResponse implements Chunked
private final Map<String, Map<String, FieldCapabilities>> fields;
private final List<FieldCapabilitiesFailure> failures;
private final List<FieldCapabilitiesIndexResponse> indexResponses;
private final TransportVersion minTransportVersion;

public FieldCapabilitiesResponse(
String[] indices,
Map<String, Map<String, FieldCapabilities>> fields,
List<FieldCapabilitiesFailure> failures
) {
this(indices, fields, Collections.emptyList(), failures);
this(indices, fields, Collections.emptyList(), failures, null);
}

public FieldCapabilitiesResponse(String[] indices, Map<String, Map<String, FieldCapabilities>> fields) {
this(indices, fields, Collections.emptyList(), Collections.emptyList());
this(indices, fields, Collections.emptyList(), Collections.emptyList(), null);
}

public FieldCapabilitiesResponse(List<FieldCapabilitiesIndexResponse> indexResponses, List<FieldCapabilitiesFailure> failures) {
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures);
this(Strings.EMPTY_ARRAY, Collections.emptyMap(), indexResponses, failures, null);
}

private FieldCapabilitiesResponse(
String[] indices,
Map<String, Map<String, FieldCapabilities>> fields,
List<FieldCapabilitiesIndexResponse> indexResponses,
List<FieldCapabilitiesFailure> failures
List<FieldCapabilitiesFailure> failures,
TransportVersion minTransportVersion
) {
this.fields = Objects.requireNonNull(fields);
this.indexResponses = Objects.requireNonNull(indexResponses);
this.indices = indices;
this.failures = failures;
this.minTransportVersion = minTransportVersion;
}

public FieldCapabilitiesResponse(StreamInput in) throws IOException {
this.indices = in.readStringArray();
this.fields = in.readMap(FieldCapabilitiesResponse::readField);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readCollectionAsList(FieldCapabilitiesFailure::new);
this.minTransportVersion = in.getTransportVersion().supports(MIN_TRANSPORT_VERSION)
? in.readOptional(TransportVersion::readVersion)
: null;
}

/**
Expand Down Expand Up @@ -116,13 +127,20 @@ public List<FieldCapabilitiesIndexResponse> getIndexResponses() {
}

/**
*
* Get the field capabilities per type for the provided {@code field}.
*/
public Map<String, FieldCapabilities> getField(String field) {
return fields.get(field);
}

/**
* @return the minTransportVersion across all clusters involved in resolution
*/
@Nullable
public TransportVersion minTransportVersion() {
return minTransportVersion;
}

/**
* Returns <code>true</code> if the provided field is a metadata field.
*/
Expand All @@ -144,6 +162,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(fields, FieldCapabilitiesResponse::writeField);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeCollection(failures);
if (out.getTransportVersion().supports(MIN_TRANSPORT_VERSION)) {
out.writeOptional((Writer<TransportVersion>) (o, v) -> TransportVersion.writeVersion(v, o), minTransportVersion);
}
}

private static void writeField(StreamOutput out, Map<String, FieldCapabilities> map) throws IOException {
Expand Down Expand Up @@ -184,21 +205,60 @@ public boolean equals(Object o) {
return Arrays.equals(indices, that.indices)
&& Objects.equals(fields, that.fields)
&& Objects.equals(indexResponses, that.indexResponses)
&& Objects.equals(failures, that.failures);
&& Objects.equals(failures, that.failures)
&& Objects.equals(minTransportVersion, that.minTransportVersion);
}

@Override
public int hashCode() {
int result = Objects.hash(fields, indexResponses, failures);
result = 31 * result + Arrays.hashCode(indices);
return result;
return Objects.hash(fields, indexResponses, failures, minTransportVersion) * 31 + Arrays.hashCode(indices);
}

@Override
public String toString() {
if (indexResponses.size() > 0) {
return "FieldCapabilitiesResponse{unmerged}";
return indexResponses.isEmpty() ? Strings.toString(this) : "FieldCapabilitiesResponse{unmerged}";
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private String[] indices = Strings.EMPTY_ARRAY;
private Map<String, Map<String, FieldCapabilities>> fields = Collections.emptyMap();
private List<FieldCapabilitiesIndexResponse> indexResponses = Collections.emptyList();
private List<FieldCapabilitiesFailure> failures = Collections.emptyList();
private TransportVersion minTransportVersion = null;

private Builder() {}

public Builder withIndices(String[] indices) {
this.indices = indices;
return this;
}

public Builder withFields(Map<String, Map<String, FieldCapabilities>> fields) {
this.fields = fields;
return this;
}

public Builder withIndexResponses(List<FieldCapabilitiesIndexResponse> indexResponses) {
this.indexResponses = indexResponses;
return this;
}

public Builder withFailures(List<FieldCapabilitiesFailure> failures) {
this.failures = failures;
return this;
}

public Builder withMinTransportVersion(TransportVersion minTransportVersion) {
this.minTransportVersion = minTransportVersion;
return this;
}

public FieldCapabilitiesResponse build() {
return new FieldCapabilitiesResponse(indices, fields, indexResponses, failures, minTransportVersion);
}
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.util.automaton.TooComplexToDeterminizeException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
Expand Down Expand Up @@ -69,6 +70,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -144,22 +146,20 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
final Executor singleThreadedExecutor = buildSingleThreadedExecutor(searchCoordinationExecutor, LOGGER);
assert task instanceof CancellableTask;
final CancellableTask fieldCapTask = (CancellableTask) task;
// retrieve the initial timestamp in case the action is a cross cluster search
// retrieve the initial timestamp in case the action is a cross-cluster search
long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
final ProjectState projectState = projectResolver.getProjectState(clusterService.state());
final var minTransportVersion = new AtomicReference<>(clusterService.state().getMinTransportVersion());
final Map<String, OriginalIndices> remoteClusterIndices = transportService.getRemoteClusterService()
.groupIndices(request.indicesOptions(), request.indices(), request.returnLocalAll());
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final String[] concreteIndices;
if (localIndices == null) {
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices
concreteIndices = Strings.EMPTY_ARRAY;
} else {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), localIndices);
}
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices
final String[] concreteIndices = localIndices != null
? indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), localIndices)
: Strings.EMPTY_ARRAY;

if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) {
listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap()));
listener.onResponse(FieldCapabilitiesResponse.builder().withMinTransportVersion(minTransportVersion.get()).build());
return;
}

Expand Down Expand Up @@ -235,7 +235,7 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
if (fieldCapTask.notifyIfCancelled(listener)) {
releaseResourcesOnCancel.run();
} else {
mergeIndexResponses(request, fieldCapTask, indexResponses, indexFailures, listener);
mergeIndexResponses(request, fieldCapTask, indexResponses, indexFailures, minTransportVersion, listener);
}
})) {
// local cluster
Expand Down Expand Up @@ -281,6 +281,12 @@ private void doExecuteForked(Task task, FieldCapabilitiesRequest request, Action
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
}
}
minTransportVersion.accumulateAndGet(response.minTransportVersion(), (lhs, rhs) -> {
if (lhs == null || rhs == null) {
return null;
}
return TransportVersion.min(lhs, rhs);
});
}, ex -> {
for (String index : originalIndices.indices()) {
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
Expand Down Expand Up @@ -360,35 +366,41 @@ private static void mergeIndexResponses(
CancellableTask task,
Map<String, FieldCapabilitiesIndexResponse> indexResponses,
FailureCollector indexFailures,
AtomicReference<TransportVersion> minTransportVersion,
ActionListener<FieldCapabilitiesResponse> listener
) {
List<FieldCapabilitiesFailure> failures = indexFailures.build(indexResponses.keySet());
if (indexResponses.size() > 0) {
if (indexResponses.isEmpty() == false) {
if (request.isMergeResults()) {
ActionListener.completeWith(listener, () -> merge(indexResponses, task, request, failures));
ActionListener.completeWith(listener, () -> merge(indexResponses, task, request, failures, minTransportVersion));
} else {
listener.onResponse(new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), failures));
listener.onResponse(
FieldCapabilitiesResponse.builder()
.withIndexResponses(new ArrayList<>(indexResponses.values()))
.withFailures(failures)
.withMinTransportVersion(minTransportVersion.get())
.build()
);
}
} else {
// we have no responses at all, maybe because of errors
if (indexFailures.isEmpty() == false) {
/*
* Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors.
* Instead, they should always be passed through the response object, as part of "failures".
*/
if (failures.stream()
.anyMatch(
failure -> failure.getException() instanceof IllegalStateException ise
&& ise.getCause() instanceof ElasticsearchTimeoutException
)) {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures));
} else {
// throw back the first exception
listener.onFailure(failures.get(0).getException());
}
} else if (indexFailures.isEmpty() == false) {
/*
* Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors.
* Instead, they should always be passed through the response object, as part of "failures".
*/
if (failures.stream()
.anyMatch(
failure -> failure.getException() instanceof IllegalStateException ise
&& ise.getCause() instanceof ElasticsearchTimeoutException
)) {
listener.onResponse(
FieldCapabilitiesResponse.builder().withFailures(failures).withMinTransportVersion(minTransportVersion.get()).build()
);
} else {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
// throw back the first exception
listener.onFailure(failures.get(0).getException());
}
} else {
listener.onResponse(FieldCapabilitiesResponse.builder().withMinTransportVersion(minTransportVersion.get()).build());
}
}

Expand Down Expand Up @@ -423,7 +435,8 @@ private static FieldCapabilitiesResponse merge(
Map<String, FieldCapabilitiesIndexResponse> indexResponsesMap,
CancellableTask task,
FieldCapabilitiesRequest request,
List<FieldCapabilitiesFailure> failures
List<FieldCapabilitiesFailure> failures,
AtomicReference<TransportVersion> minTransportVersion
) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH_COORDINATION); // too expensive to run this on a transport worker
task.ensureNotCancelled();
Expand Down Expand Up @@ -464,7 +477,12 @@ private static FieldCapabilitiesResponse merge(
);
}
}
return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(fields), failures);
return FieldCapabilitiesResponse.builder()
.withIndices(indices)
.withFields(Collections.unmodifiableMap(fields))
.withFailures(failures)
.withMinTransportVersion(minTransportVersion.get())
.build();
}

private static boolean shouldLogException(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9202000,9185004
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_9.2.1,9185003
min_transport_version,9185004
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_lookup_join_full_text_function,9201000
min_transport_version,9202000