Skip to content

Commit 655c47c

Browse files
atristandonks
authored andcommitted
Add ACL-aware routing processors for multi-tenant document routing (opensearch-project#18834)
Introduces two new processors to enable document routing based on ACL metadata: - AclRoutingProcessor (ingest pipeline): * Extracts ACL field value and generates deterministic routing using MurmurHash3 * Configurable options: acl_field, target_field (default: _routing), ignore_missing, override_existing * Ensures documents with same ACL values colocate on same shard - AclRoutingSearchProcessor (search pipeline): * Automatically extracts ACL values from term/terms/bool queries * Sets routing on search requests to target specific shards * Supports nested bool query traversal (must/filter/should clauses) * Configurable extraction with extract_from_query flag Implementation details: - Both processors use identical MurmurHash3.hash128() with Base64 encoding for consistent routing value generation - Registered in IngestCommonModulePlugin and SearchPipelineCommonModulePlugin - Comprehensive unit tests and integration tests for both processors - Follows existing processor patterns (similar to HierarchicalRoutingProcessor) Use case: Improves query performance in multi-tenant environments by ensuring tenant-specific documents are colocated and queries are routed to relevant shards. Resolves opensearch-project#18829 -------- Signed-off-by: Atri Sharma <[email protected]>
1 parent c443f4c commit 655c47c

File tree

11 files changed

+1252
-3
lines changed

11 files changed

+1252
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
### Added
88
- [Feature Request] Enhance Terms lookup query to support query clause instead of docId ([#18195](https://github.com/opensearch-project/OpenSearch/issues/18195))
99
- Add hierarchical routing processors for ingest and search pipelines ([#18826](https://github.com/opensearch-project/OpenSearch/pull/18826))
10+
- Add ACL-aware routing processors for ingest and search pipelines ([#18834](https://github.com/opensearch-project/OpenSearch/pull/18834))
1011
- Add support for Warm Indices Write Block on Flood Watermark breach ([#18375](https://github.com/opensearch-project/OpenSearch/pull/18375))
1112
- FS stats for warm nodes based on addressable space ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
1213
- Add support for custom index name resolver from cluster plugin ([#18593](https://github.com/opensearch-project/OpenSearch/pull/18593))
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
12+
import org.opensearch.action.get.GetResponse;
13+
import org.opensearch.action.index.IndexRequest;
14+
import org.opensearch.action.search.SearchResponse;
15+
import org.opensearch.common.hash.MurmurHash3;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.core.common.bytes.BytesReference;
18+
import org.opensearch.core.xcontent.MediaTypeRegistry;
19+
import org.opensearch.index.query.TermQueryBuilder;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.search.SearchHit;
22+
import org.opensearch.search.builder.SearchSourceBuilder;
23+
import org.opensearch.test.OpenSearchIntegTestCase;
24+
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Arrays;
27+
import java.util.Base64;
28+
import java.util.Collection;
29+
import java.util.Map;
30+
31+
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST)
35+
public class AclRoutingProcessorIT extends OpenSearchIntegTestCase {
36+
37+
private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> nodePlugins() {
41+
return Arrays.asList(IngestCommonModulePlugin.class);
42+
}
43+
44+
public void testAclRoutingProcessor() throws Exception {
45+
// Create ingest pipeline with ACL routing processor
46+
String pipelineId = "acl-routing-test";
47+
BytesReference pipelineConfig = BytesReference.bytes(
48+
jsonBuilder().startObject()
49+
.startArray("processors")
50+
.startObject()
51+
.startObject("acl_routing")
52+
.field("acl_field", "team")
53+
.field("target_field", "_routing")
54+
.endObject()
55+
.endObject()
56+
.endArray()
57+
.endObject()
58+
);
59+
60+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
61+
62+
// Create index with multiple shards - don't set default pipeline, use explicit pipeline parameter
63+
String indexName = "test-acl-routing";
64+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
65+
Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build()
66+
)
67+
.mapping(
68+
jsonBuilder().startObject()
69+
.startObject("properties")
70+
.startObject("team")
71+
.field("type", "keyword")
72+
.endObject()
73+
.startObject("content")
74+
.field("type", "text")
75+
.endObject()
76+
.endObject()
77+
.endObject()
78+
);
79+
80+
client().admin().indices().create(createIndexRequest).get();
81+
82+
// Index documents with explicit pipeline parameter
83+
client().index(
84+
new IndexRequest(indexName).id("1")
85+
.source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 1").endObject())
86+
.setPipeline(pipelineId)
87+
).get();
88+
89+
client().index(
90+
new IndexRequest(indexName).id("2")
91+
.source(jsonBuilder().startObject().field("team", "team-alpha").field("content", "Alpha content 2").endObject())
92+
.setPipeline(pipelineId)
93+
).get();
94+
95+
client().index(
96+
new IndexRequest(indexName).id("3")
97+
.source(jsonBuilder().startObject().field("team", "team-beta").field("content", "Beta content").endObject())
98+
.setPipeline(pipelineId)
99+
).get();
100+
101+
// Refresh to make documents searchable
102+
client().admin().indices().prepareRefresh(indexName).get();
103+
104+
// Test search functionality - documents should be searchable
105+
SearchResponse searchResponse = client().prepareSearch(indexName)
106+
.setSource(new SearchSourceBuilder().query(new TermQueryBuilder("team", "team-alpha")))
107+
.get();
108+
109+
assertThat("Should find alpha team documents", searchResponse.getHits().getTotalHits().value(), equalTo(2L));
110+
111+
for (SearchHit hit : searchResponse.getHits().getHits()) {
112+
String team = (String) hit.getSourceAsMap().get("team");
113+
assertEquals("Found document should be from team alpha", "team-alpha", team);
114+
}
115+
}
116+
117+
public void testAclRoutingWithIgnoreMissing() throws Exception {
118+
// Create pipeline with ignore_missing = true
119+
String pipelineId = "acl-routing-ignore-missing";
120+
BytesReference pipelineConfig = BytesReference.bytes(
121+
jsonBuilder().startObject()
122+
.startArray("processors")
123+
.startObject()
124+
.startObject("acl_routing")
125+
.field("acl_field", "nonexistent_field")
126+
.field("target_field", "_routing")
127+
.field("ignore_missing", true)
128+
.endObject()
129+
.endObject()
130+
.endArray()
131+
.endObject()
132+
);
133+
134+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
135+
136+
String indexName = "test-ignore-missing";
137+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
138+
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build()
139+
);
140+
141+
client().admin().indices().create(createIndexRequest).get();
142+
143+
// Index document without the ACL field
144+
IndexRequest indexRequest = new IndexRequest(indexName).id("missing1")
145+
.source(
146+
jsonBuilder().startObject().field("other_field", "some value").field("content", "Document without ACL field").endObject()
147+
)
148+
.setPipeline(pipelineId);
149+
150+
client().index(indexRequest).get();
151+
client().admin().indices().prepareRefresh(indexName).get();
152+
153+
// Document should be indexed without routing since field was missing and ignored
154+
GetResponse doc = client().prepareGet(indexName, "missing1").get();
155+
assertTrue("Document should be indexed even with missing ACL field", doc.isExists());
156+
}
157+
158+
public void testAclRoutingWithCustomTargetField() throws Exception {
159+
// Create pipeline with custom target field
160+
String pipelineId = "acl-routing-custom-target";
161+
BytesReference pipelineConfig = BytesReference.bytes(
162+
jsonBuilder().startObject()
163+
.startArray("processors")
164+
.startObject()
165+
.startObject("acl_routing")
166+
.field("acl_field", "department")
167+
.field("target_field", "custom_routing")
168+
.endObject()
169+
.endObject()
170+
.endArray()
171+
.endObject()
172+
);
173+
174+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
175+
176+
String indexName = "test-custom-target";
177+
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(
178+
Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0).put("index.default_pipeline", pipelineId).build()
179+
);
180+
181+
client().admin().indices().create(createIndexRequest).get();
182+
183+
// Index document
184+
IndexRequest indexRequest = new IndexRequest(indexName).id("custom1")
185+
.source(jsonBuilder().startObject().field("department", "engineering").field("content", "Engineering document").endObject())
186+
.setPipeline(pipelineId);
187+
188+
client().index(indexRequest).get();
189+
client().admin().indices().prepareRefresh(indexName).get();
190+
191+
GetResponse doc = client().prepareGet(indexName, "custom1").get();
192+
assertTrue("Document should exist", doc.isExists());
193+
194+
// Check that custom routing field was set
195+
Map<String, Object> source = doc.getSource();
196+
assertNotNull("Custom routing field should be set", source.get("custom_routing"));
197+
assertEquals("Custom routing should match expected value", generateRoutingValue("engineering"), source.get("custom_routing"));
198+
}
199+
200+
public void testAclRoutingProcessorRegistration() throws Exception {
201+
// Verify processor is registered by attempting to create a pipeline
202+
String pipelineId = "test-acl-processor-registration";
203+
BytesReference pipelineConfig = BytesReference.bytes(
204+
jsonBuilder().startObject()
205+
.startArray("processors")
206+
.startObject()
207+
.startObject("acl_routing")
208+
.field("acl_field", "team")
209+
.endObject()
210+
.endObject()
211+
.endArray()
212+
.endObject()
213+
);
214+
215+
// This should succeed if processor is properly registered
216+
client().admin().cluster().preparePutPipeline(pipelineId, pipelineConfig, MediaTypeRegistry.JSON).get();
217+
218+
// Verify pipeline was created
219+
var getPipelineResponse = client().admin().cluster().prepareGetPipeline(pipelineId).get();
220+
assertTrue("Pipeline should be created successfully", getPipelineResponse.isFound());
221+
222+
// Clean up
223+
client().admin().cluster().prepareDeletePipeline(pipelineId).get();
224+
}
225+
226+
// Helper method to generate routing value (mirrors processor logic)
227+
private String generateRoutingValue(String aclValue) {
228+
// Use MurmurHash3 for consistent hashing (same as processor)
229+
byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8);
230+
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());
231+
232+
// Convert to base64 for routing value
233+
byte[] hashBytes = new byte[16];
234+
System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8);
235+
System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8);
236+
237+
return BASE64_ENCODER.encodeToString(hashBytes);
238+
}
239+
240+
private byte[] longToBytes(long value) {
241+
byte[] result = new byte[8];
242+
for (int i = 7; i >= 0; i--) {
243+
result[i] = (byte) (value & 0xFF);
244+
value >>= 8;
245+
}
246+
return result;
247+
}
248+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.common.hash.MurmurHash3;
12+
import org.opensearch.ingest.AbstractProcessor;
13+
import org.opensearch.ingest.ConfigurationUtils;
14+
import org.opensearch.ingest.IngestDocument;
15+
import org.opensearch.ingest.Processor;
16+
17+
import java.nio.charset.StandardCharsets;
18+
import java.util.Base64;
19+
import java.util.Map;
20+
21+
/**
22+
* Processor that sets the _routing field based on ACL metadata.
23+
*/
24+
public final class AclRoutingProcessor extends AbstractProcessor {
25+
26+
public static final String TYPE = "acl_routing";
27+
private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();
28+
29+
private final String aclField;
30+
private final String targetField;
31+
private final boolean ignoreMissing;
32+
private final boolean overrideExisting;
33+
34+
AclRoutingProcessor(
35+
String tag,
36+
String description,
37+
String aclField,
38+
String targetField,
39+
boolean ignoreMissing,
40+
boolean overrideExisting
41+
) {
42+
super(tag, description);
43+
this.aclField = aclField;
44+
this.targetField = targetField;
45+
this.ignoreMissing = ignoreMissing;
46+
this.overrideExisting = overrideExisting;
47+
}
48+
49+
@Override
50+
public IngestDocument execute(IngestDocument document) throws Exception {
51+
Object aclValue = document.getFieldValue(aclField, Object.class, ignoreMissing);
52+
53+
if (aclValue == null) {
54+
if (ignoreMissing) {
55+
return document;
56+
}
57+
throw new IllegalArgumentException("field [" + aclField + "] not present as part of path [" + aclField + "]");
58+
}
59+
60+
// Check if routing already exists
61+
if (!overrideExisting && document.hasField(targetField)) {
62+
return document;
63+
}
64+
65+
String routingValue = generateRoutingValue(aclValue.toString());
66+
document.setFieldValue(targetField, routingValue);
67+
68+
return document;
69+
}
70+
71+
private String generateRoutingValue(String aclValue) {
72+
// Use MurmurHash3 for consistent hashing
73+
byte[] bytes = aclValue.getBytes(StandardCharsets.UTF_8);
74+
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());
75+
76+
// Convert to base64 for routing value
77+
byte[] hashBytes = new byte[16];
78+
System.arraycopy(longToBytes(hash.h1), 0, hashBytes, 0, 8);
79+
System.arraycopy(longToBytes(hash.h2), 0, hashBytes, 8, 8);
80+
81+
return BASE64_ENCODER.encodeToString(hashBytes);
82+
}
83+
84+
private byte[] longToBytes(long value) {
85+
byte[] result = new byte[8];
86+
for (int i = 7; i >= 0; i--) {
87+
result[i] = (byte) (value & 0xFF);
88+
value >>= 8;
89+
}
90+
return result;
91+
}
92+
93+
@Override
94+
public String getType() {
95+
return TYPE;
96+
}
97+
98+
public static final class Factory implements Processor.Factory {
99+
100+
@Override
101+
public AclRoutingProcessor create(
102+
Map<String, Processor.Factory> registry,
103+
String processorTag,
104+
String description,
105+
Map<String, Object> config
106+
) throws Exception {
107+
String aclField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "acl_field");
108+
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "_routing");
109+
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
110+
boolean overrideExisting = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override_existing", true);
111+
112+
return new AclRoutingProcessor(processorTag, description, aclField, targetField, ignoreMissing, overrideExisting);
113+
}
114+
}
115+
}

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
121121
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
122122
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
123123
processors.put(HierarchicalRoutingProcessor.TYPE, new HierarchicalRoutingProcessor.Factory());
124+
processors.put(AclRoutingProcessor.TYPE, new AclRoutingProcessor.Factory());
124125
return filterForAllowlistSetting(parameters.env.settings(), processors);
125126
}
126127

0 commit comments

Comments
 (0)