Skip to content
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
980723e
poc
n1v0lg Jun 24, 2025
4ff5e6e
poc
n1v0lg Jun 24, 2025
3df7a1d
Fix ups
n1v0lg Jun 26, 2025
c21a0a9
The missing commit
n1v0lg Jun 26, 2025
5266376
Merge branch 'main' into poc-flat-world
n1v0lg Jul 9, 2025
650dd77
Merge branch 'main' into poc-flat-world
n1v0lg Jul 10, 2025
9d63d77
More
n1v0lg Jul 10, 2025
8eec08e
Moar
n1v0lg Jul 10, 2025
fe6b696
Also resolver
n1v0lg Jul 10, 2025
8dee748
field caps
n1v0lg Jul 10, 2025
f9d6407
Merge branch 'main' into poc-flat-world
n1v0lg Jul 11, 2025
be2ab99
SPI
n1v0lg Jul 11, 2025
710d789
SPI
n1v0lg Jul 11, 2025
3587e6a
Remote conns
n1v0lg Jul 11, 2025
26a49c0
More
n1v0lg Jul 12, 2025
2a45f5b
Extract interface
n1v0lg Jul 12, 2025
9d29d4f
Inject authenticator
n1v0lg Jul 13, 2025
0ef2258
Simplify
n1v0lg Jul 13, 2025
88104ce
Scope
n1v0lg Jul 13, 2025
035ffc1
Compile
n1v0lg Jul 14, 2025
f5cad57
Merge branch 'main' into poc-cps-e2e
n1v0lg Jul 14, 2025
fe3dea0
Query routing
n1v0lg Jul 14, 2025
1153dfa
Some clean up
n1v0lg Jul 14, 2025
3fd532e
WIP esql
n1v0lg Jul 15, 2025
512222e
Fix interface
n1v0lg Jul 15, 2025
b867e69
WIP resolver
n1v0lg Jul 15, 2025
08d3e4e
Only inject resolver
n1v0lg Jul 15, 2025
d36c3f6
Clean up
n1v0lg Jul 15, 2025
d20a8c9
esql patch by @idegtiarenko
n1v0lg Jul 16, 2025
32ed394
More context
n1v0lg Jul 17, 2025
67f5cdf
More still
n1v0lg Jul 17, 2025
e25d8b2
CrossProjectAware
n1v0lg Jul 17, 2025
137195f
Javadoc
n1v0lg Jul 17, 2025
ee93568
Merge and clean up
n1v0lg Aug 18, 2025
18d6bcb
More
n1v0lg Aug 19, 2025
2ae90af
Local index resolution
n1v0lg Aug 19, 2025
2b092fb
WIP
n1v0lg Aug 20, 2025
d834784
More
n1v0lg Aug 22, 2025
7de6de0
More
n1v0lg Aug 22, 2025
9e53cc3
WIP e2e error handling
n1v0lg Aug 22, 2025
2d8a118
Flat world maybe
n1v0lg Aug 25, 2025
01ad89b
Merge branch 'main' into poc-cps-e2e
n1v0lg Aug 25, 2025
4cc41c3
Small clean up
n1v0lg Aug 25, 2025
254977c
TODO
n1v0lg Aug 25, 2025
e427303
A v big refactor
n1v0lg Aug 27, 2025
efbd82e
V confusing double resolution fix
n1v0lg Aug 27, 2025
d8103f9
Ominous TODO
n1v0lg Aug 27, 2025
1d3dc40
Nits
n1v0lg Aug 27, 2025
c03c73a
Tweaks
n1v0lg Aug 28, 2025
1f04a53
[CI] Auto commit changes from spotless
Aug 28, 2025
9f341be
Merge
n1v0lg Aug 28, 2025
7c6b1fc
Consolidate more index resolution
n1v0lg Aug 28, 2025
91bf747
Missing
n1v0lg Aug 28, 2025
266a822
fix
n1v0lg Aug 28, 2025
bf307e2
Undo
n1v0lg Aug 28, 2025
768affc
More
n1v0lg Aug 28, 2025
d1bd040
Merge
n1v0lg Aug 28, 2025
a78010b
TODO
n1v0lg Aug 28, 2025
ca5b153
Project routing
n1v0lg Aug 28, 2025
aaab227
More abstractions
n1v0lg Aug 29, 2025
1967bfa
Fixes
n1v0lg Aug 29, 2025
700af92
More
n1v0lg Aug 29, 2025
652caf3
Merge branch 'main' into poc-cps-e2e
n1v0lg Aug 29, 2025
3a8c4dc
Renames
n1v0lg Aug 29, 2025
24b9f7c
Clean up
n1v0lg Aug 31, 2025
804069c
More clean up
n1v0lg Sep 1, 2025
a61ad84
Also xpack
n1v0lg Sep 1, 2025
ab675dc
Merge branch 'main' into poc-cps-e2e
n1v0lg Sep 1, 2025
b11d30a
Checkstyle
n1v0lg Sep 1, 2025
cd6511f
Nits
n1v0lg Sep 2, 2025
da74cb5
Rename
n1v0lg Sep 3, 2025
5373143
Merge
n1v0lg Sep 3, 2025
6544589
Cut down on number of classes
n1v0lg Sep 3, 2025
7b7aa7d
More
n1v0lg Sep 3, 2025
2af9134
Clean up
n1v0lg Sep 4, 2025
c17b5c5
More
n1v0lg Sep 4, 2025
7d9a438
Tweaks
n1v0lg Sep 8, 2025
80bd89e
Nits
n1v0lg Sep 9, 2025
0a4046c
Fix compl
n1v0lg Sep 9, 2025
49814b1
Merge branch 'main' into poc-cps-e2e
n1v0lg Sep 12, 2025
01276d5
Revert "Merge branch 'main' into poc-cps-e2e"
n1v0lg Sep 12, 2025
d1bb772
CPS service and fix refactor
n1v0lg Sep 12, 2025
d5af5bc
Index resolution
n1v0lg Sep 12, 2025
f380e82
xpack
n1v0lg Sep 12, 2025
fd1df71
Persist headers
n1v0lg Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch;

import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.transport.RemoteClusterService;

import java.util.List;

public interface RewritableIndicesRequest extends IndicesRequest {
boolean rewritten();

void rewritten(List<RewrittenIndexExpression> indexExpressions);

boolean checkRemote(List<RemoteClusterService.RemoteTag> tags);

record RewrittenIndexExpression(String original, List<String> rewritten) {}
Copy link
Contributor

@quux00 quux00 Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to check (and track) whether each component of the rewritten list is flat or qualified? Is that left for a later impl or do you think we don't need to track that here?

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.RewritableIndicesRequest;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
Expand All @@ -53,7 +55,11 @@
* @see Client#search(SearchRequest)
* @see SearchResponse
*/
public class SearchRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, Rewriteable<SearchRequest> {
public class SearchRequest extends LegacyActionRequest
implements
RewritableIndicesRequest,
IndicesRequest.Replaceable,
Rewriteable<SearchRequest> {

public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

Expand All @@ -69,6 +75,10 @@ public class SearchRequest extends LegacyActionRequest implements IndicesRequest
private SearchType searchType = SearchType.DEFAULT;

private String[] indices = Strings.EMPTY_ARRAY;
private List<RemoteClusterService.RemoteTag> routingTags = List.of();

@Nullable
private List<RewrittenIndexExpression> indexExpressions;

@Nullable
private String routing;
Expand Down Expand Up @@ -400,6 +410,11 @@ public SearchRequest indices(String... indices) {
return this;
}

public SearchRequest routingTags(List<RemoteClusterService.RemoteTag> routingTags) {
this.routingTags = routingTags;
return this;
}

private static void validateIndices(String... indices) {
Objects.requireNonNull(indices, "indices must not be null");
for (String index : indices) {
Expand Down Expand Up @@ -853,4 +868,30 @@ public String toString() {
+ source
+ '}';
}

@Override
public boolean rewritten() {
return indexExpressions != null;
}

@Override
public void rewritten(List<RewrittenIndexExpression> indexExpressions) {
assert false == rewritten();
this.indexExpressions = indexExpressions;
indices(indexExpressions.stream().flatMap(indexExpression -> indexExpression.rewritten().stream()).toArray(String[]::new));
}

@Override
public boolean checkRemote(List<RemoteClusterService.RemoteTag> tags) {
if (routingTags.isEmpty()) {
return true; // no routing requested, so no constraints
}
// if any tag in routingTags matches one in tags, return true
for (RemoteClusterService.RemoteTag tag : routingTags) {
if (tags.contains(tag)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.DEFAULT_PRE_FILTER_SHARD_SIZE,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.REMOTE_CLUSTER_TAGS,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
RemoteClusterService.REMOTE_NODE_ATTRIBUTE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.rest.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.term.TermSuggestionBuilder;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.usage.SearchUsageHolder;
import org.elasticsearch.xcontent.XContentParser;

Expand Down Expand Up @@ -63,6 +66,7 @@ public class RestSearchAction extends BaseRestHandler {
public static final String TYPED_KEYS_PARAM = "typed_keys";
public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score";
public static final Set<String> RESPONSE_PARAMS = Set.of(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM);
private static final Logger log = LogManager.getLogger(RestSearchAction.class);

private final SearchUsageHolder searchUsageHolder;
private final Predicate<NodeFeature> clusterSupportsFeature;
Expand Down Expand Up @@ -98,6 +102,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
client.threadPool().getThreadContext().setErrorTraceTransportHeader(request);
}
SearchRequest searchRequest = new SearchRequest();

// access the BwC param, but just drop it
// this might be set by old clients
request.param("min_compatible_shard_node");
Expand Down Expand Up @@ -167,6 +172,16 @@ public static void parseSearchRequest(
searchRequest.source(new SearchSourceBuilder());
}
searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));

var routingTags = request.param("query_routing", null);
if (routingTags != null) {
searchRequest.routingTags(
Arrays.stream(Strings.splitStringByCommaToArray(routingTags)).map(RemoteClusterService.RemoteTag::fromString).toList()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of the type/format of tags you are allowing here? And is there an implicit "ANDing" of those tags?

As a first impl that might be useful.

Of course to state the obvious (not a criticism)

  1. it doesn't handle ES|QL if that is embedded in the query
  2. it doesn't support the complex AND, OR, NOT and grouping logic we'll eventually have to implement
    so later we'll need some additional constructs/interfaces around parsing that more complex scenario.

} else {
log.info("No routing tags");
}

if (requestContentParser != null) {
if (searchUsageHolder == null) {
searchRequest.source().parseXContent(requestContentParser, true, clusterSupportsFeature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -97,6 +98,33 @@ public final class RemoteClusterService extends RemoteClusterAware
(ns, key) -> boolSetting(key, true, new RemoteConnectionEnabled<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope)
);

public record RemoteTag(String key, String value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local project also has tags, so my guess is that this doesn't belong in RemoteClusterService and shouldn't be called RemoteTag? Maybe MetadataTag or ProjectTag?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ProjectTag makes sense though we might also go broader and just call it metadata in case we also want it to cover project alias and such.

public static RemoteTag fromString(String tag) {
if (tag == null || tag.isEmpty()) {
throw new IllegalArgumentException("Remote tag must not be null or empty");
}
// - as a separator to simplify search path param parsing; won't be like this in the real implementation
int idx = tag.indexOf('-');
if (idx < 0) {
return new RemoteTag(tag, "");
} else {
return new RemoteTag(tag.substring(0, idx), tag.substring(idx + 1));
}
}
}

public static final Setting.AffixSetting<List<RemoteTag>> REMOTE_CLUSTER_TAGS = Setting.affixKeySetting(
"cluster.remote.",
"tags",
(ns, key) -> Setting.listSetting(
key,
Collections.emptyList(),
RemoteTag::fromString,
Setting.Property.Dynamic,
Setting.Property.NodeScope
)
);

public static final Setting.AffixSetting<TimeValue> REMOTE_CLUSTER_PING_SCHEDULE = Setting.affixKeySetting(
"cluster.remote.",
"transport.ping_schedule",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.security;

import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;

public interface CustomRemoteServerTransportInterceptor {
// TODO probably don't want this
boolean enabled();

// TODO this should be a wrapper around TransportInterceptor.AsyncSender instead
<T extends TransportResponse> void sendRequest(
TransportInterceptor.AsyncSender sender,
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
);

CustomServerTransportFilter getFilter();

class Default implements CustomRemoteServerTransportInterceptor {
@Override
public boolean enabled() {
return false;
}

@Override
public <T extends TransportResponse> void sendRequest(
TransportInterceptor.AsyncSender sender,
Transport.Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
sender.sendRequest(connection, action, request, options, handler);
}

@Override
public CustomServerTransportFilter getFilter() {
return new CustomServerTransportFilter.Default();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.security;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.TransportRequest;

public interface CustomServerTransportFilter {
void filter(String securityAction, TransportRequest request, ActionListener<Void> authenticationListener);

class Default implements CustomServerTransportFilter {
@Override
public void filter(String securityAction, TransportRequest request, ActionListener<Void> listener) {
listener.onResponse(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.core.security.authc.service.ServiceAccountTokenStore;
import org.elasticsearch.xpack.core.security.authc.support.UserRoleMapper;
import org.elasticsearch.xpack.core.security.authz.AuthorizationEngine;
import org.elasticsearch.xpack.core.security.authz.CustomIndicesRequestRewriter;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;

Expand Down Expand Up @@ -133,6 +134,14 @@ default CustomApiKeyAuthenticator getCustomApiKeyAuthenticator(SecurityComponent
return null;
}

default CustomIndicesRequestRewriter getCustomIndicesRequestRewriter(SecurityComponents components) {
return null;
}

default CustomRemoteServerTransportInterceptor getCustomRemoteServerTransportInterceptor(SecurityComponents components) {
return null;
}

/**
* Returns a authorization engine for authorizing requests, or null to use the default authorization mechanism.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.security.authz;

import org.elasticsearch.RewritableIndicesRequest;

public interface CustomIndicesRequestRewriter {
void rewrite(RewritableIndicesRequest request);

class Default implements CustomIndicesRequestRewriter {
@Override
public void rewrite(RewritableIndicesRequest request) {
// No rewriting by default
// This is a no-op implementation
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected ChannelHandler getClientChannelInitializer(DiscoveryNode node, Connect

@Override
protected InboundPipeline getInboundPipeline(Channel channel, boolean isRemoteClusterServerChannel) {
if (false == isRemoteClusterServerChannel) {
if (false == isRemoteClusterServerChannel || crossClusterAccessAuthenticationService.skipTransportCheck()) {
return super.getInboundPipeline(channel, false);
} else {
return new InboundPipeline(
Expand Down
Loading