Skip to content

Commit 6487b40

Browse files
atrissunqijun.jun
authored andcommitted
opensearch-project#17593 Add Extension Points For Pre and Post Collection of Scores in QueryPhase (opensearch-project#18814)
Signed-off-by: Atri Sharma <[email protected]> Signed-off-by: sunqijun.jun <[email protected]>
1 parent 53b47fc commit 6487b40

File tree

7 files changed

+717
-49
lines changed

7 files changed

+717
-49
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- [Workload Management] Modify logging message in WorkloadGroupService ([#18712](https://github.com/opensearch-project/OpenSearch/pull/18712))
2727
- Add BooleanQuery rewrite moving constant-scoring must clauses to filter clauses ([#18510](https://github.com/opensearch-project/OpenSearch/issues/18510))
2828
- Add functionality for plugins to inject QueryCollectorContext during QueryPhase ([#18637](https://github.com/opensearch-project/OpenSearch/pull/18637))
29+
- Add QueryPhaseListener interface for pre/post collection hooks ([#17593](https://github.com/opensearch-project/OpenSearch/issues/17593))
2930
- Add support for non-timing info in profiler ([#18460](https://github.com/opensearch-project/OpenSearch/issues/18460))
3031
- [Rule-based auto tagging] Bug fix and improvements ([#18726](https://github.com/opensearch-project/OpenSearch/pull/18726))
3132
- Extend Approximation Framework to other numeric types ([#18530](https://github.com/opensearch-project/OpenSearch/issues/18530))
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.query;
10+
11+
import org.apache.lucene.search.Collector;
12+
import org.apache.lucene.search.CollectorManager;
13+
import org.apache.lucene.search.Query;
14+
import org.opensearch.common.annotation.InternalApi;
15+
import org.opensearch.search.internal.ContextIndexSearcher;
16+
import org.opensearch.search.internal.SearchContext;
17+
18+
import java.io.IOException;
19+
import java.util.LinkedList;
20+
import java.util.List;
21+
import java.util.Optional;
22+
23+
import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;
24+
25+
/**
26+
* Abstract base class for QueryPhaseSearcher implementations that provides
27+
* extension hook execution logic using the template pattern.
28+
*
29+
* @opensearch.internal
30+
*/
31+
@InternalApi
32+
public abstract class AbstractQueryPhaseSearcher implements QueryPhaseSearcher {
33+
34+
@Override
35+
public final boolean searchWith(
36+
SearchContext searchContext,
37+
ContextIndexSearcher searcher,
38+
Query query,
39+
LinkedList<QueryCollectorContext> collectors,
40+
boolean hasFilterCollector,
41+
boolean hasTimeout
42+
) throws IOException {
43+
List<QueryPhaseListener> listeners = queryPhaseListeners();
44+
45+
// Execute beforeCollection listeners
46+
for (QueryPhaseListener listener : listeners) {
47+
listener.beforeCollection(searchContext);
48+
}
49+
50+
boolean shouldRescore = doSearchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
51+
// Execute afterCollection listeners
52+
for (QueryPhaseListener listener : listeners) {
53+
listener.afterCollection(searchContext);
54+
}
55+
return shouldRescore;
56+
}
57+
58+
/**
59+
* Template method for actual search implementation.
60+
* Subclasses must implement this to define their specific search behavior.
61+
*/
62+
protected abstract boolean doSearchWith(
63+
SearchContext searchContext,
64+
ContextIndexSearcher searcher,
65+
Query query,
66+
LinkedList<QueryCollectorContext> collectors,
67+
boolean hasFilterCollector,
68+
boolean hasTimeout
69+
) throws IOException;
70+
71+
/**
72+
* Common method to create QueryCollectorContext that can be used by all implementations.
73+
*/
74+
protected QueryCollectorContext getQueryCollectorContext(SearchContext searchContext, boolean hasFilterCollector) throws IOException {
75+
// create the top docs collector last when the other collectors are known
76+
final Optional<QueryCollectorContext> queryCollectorContextOpt = QueryCollectorContextSpecRegistry.getQueryCollectorContextSpec(
77+
searchContext,
78+
new QueryCollectorArguments.Builder().hasFilterCollector(hasFilterCollector).build()
79+
).map(queryCollectorContextSpec -> new QueryCollectorContext(queryCollectorContextSpec.getContextName()) {
80+
@Override
81+
Collector create(Collector in) throws IOException {
82+
return queryCollectorContextSpec.create(in);
83+
}
84+
85+
@Override
86+
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) throws IOException {
87+
return queryCollectorContextSpec.createManager(in);
88+
}
89+
90+
@Override
91+
void postProcess(QuerySearchResult result) throws IOException {
92+
queryCollectorContextSpec.postProcess(result);
93+
}
94+
});
95+
if (queryCollectorContextOpt.isPresent()) {
96+
return queryCollectorContextOpt.get();
97+
} else {
98+
return createTopDocsCollectorContext(searchContext, hasFilterCollector);
99+
}
100+
}
101+
}

server/src/main/java/org/opensearch/search/query/ConcurrentQueryPhaseSearcher.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.opensearch.search.internal.ContextIndexSearcher;
2020
import org.opensearch.search.internal.SearchContext;
2121
import org.opensearch.search.profile.query.ProfileCollectorManager;
22-
import org.opensearch.search.query.QueryPhase.DefaultQueryPhaseSearcher;
2322

2423
import java.io.IOException;
2524
import java.util.LinkedList;
@@ -30,7 +29,7 @@
3029
* The implementation of the {@link QueryPhaseSearcher} which attempts to use concurrent
3130
* search of Apache Lucene segments if it has been enabled.
3231
*/
33-
public class ConcurrentQueryPhaseSearcher extends DefaultQueryPhaseSearcher {
32+
public class ConcurrentQueryPhaseSearcher extends AbstractQueryPhaseSearcher {
3433
private static final Logger LOGGER = LogManager.getLogger(ConcurrentQueryPhaseSearcher.class);
3534
private final AggregationProcessor aggregationProcessor = new ConcurrentAggregationProcessor();
3635

@@ -40,15 +39,15 @@ public class ConcurrentQueryPhaseSearcher extends DefaultQueryPhaseSearcher {
4039
public ConcurrentQueryPhaseSearcher() {}
4140

4241
@Override
43-
protected boolean searchWithCollector(
42+
protected boolean doSearchWith(
4443
SearchContext searchContext,
4544
ContextIndexSearcher searcher,
4645
Query query,
4746
LinkedList<QueryCollectorContext> collectors,
48-
QueryCollectorContext queryCollectorContext,
4947
boolean hasFilterCollector,
5048
boolean hasTimeout
5149
) throws IOException {
50+
QueryCollectorContext queryCollectorContext = getQueryCollectorContext(searchContext, hasFilterCollector);
5251
return searchWithCollectorManager(
5352
searchContext,
5453
searcher,

server/src/main/java/org/opensearch/search/query/QueryPhase.java

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,13 @@
7676
import java.util.List;
7777
import java.util.Map;
7878
import java.util.Objects;
79-
import java.util.Optional;
8079
import java.util.concurrent.ExecutorService;
8180
import java.util.stream.Collectors;
8281

8382
import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
8483
import static org.opensearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
8584
import static org.opensearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
8685
import static org.opensearch.search.query.QueryCollectorContext.createMultiCollectorContext;
87-
import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;
8886

8987
/**
9088
* Query phase of a search request, used to run the query and get back from each shard information about the matching documents
@@ -411,7 +409,7 @@ public static class TimeExceededException extends RuntimeException {
411409
*
412410
* @opensearch.internal
413411
*/
414-
public static class DefaultQueryPhaseSearcher implements QueryPhaseSearcher {
412+
public static class DefaultQueryPhaseSearcher extends AbstractQueryPhaseSearcher {
415413
private final AggregationProcessor aggregationProcessor;
416414

417415
/**
@@ -422,7 +420,7 @@ protected DefaultQueryPhaseSearcher() {
422420
}
423421

424422
@Override
425-
public boolean searchWith(
423+
protected boolean doSearchWith(
426424
SearchContext searchContext,
427425
ContextIndexSearcher searcher,
428426
Query query,
@@ -447,47 +445,6 @@ protected boolean searchWithCollector(
447445
boolean hasTimeout
448446
) throws IOException {
449447
QueryCollectorContext queryCollectorContext = getQueryCollectorContext(searchContext, hasFilterCollector);
450-
return searchWithCollector(searchContext, searcher, query, collectors, queryCollectorContext, hasFilterCollector, hasTimeout);
451-
}
452-
453-
private QueryCollectorContext getQueryCollectorContext(SearchContext searchContext, boolean hasFilterCollector) throws IOException {
454-
// create the top docs collector last when the other collectors are known
455-
final Optional<QueryCollectorContext> queryCollectorContextOpt = QueryCollectorContextSpecRegistry.getQueryCollectorContextSpec(
456-
searchContext,
457-
new QueryCollectorArguments.Builder().hasFilterCollector(hasFilterCollector).build()
458-
).map(queryCollectorContextSpec -> new QueryCollectorContext(queryCollectorContextSpec.getContextName()) {
459-
@Override
460-
Collector create(Collector in) throws IOException {
461-
return queryCollectorContextSpec.create(in);
462-
}
463-
464-
@Override
465-
CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in)
466-
throws IOException {
467-
return queryCollectorContextSpec.createManager(in);
468-
}
469-
470-
@Override
471-
void postProcess(QuerySearchResult result) throws IOException {
472-
queryCollectorContextSpec.postProcess(result);
473-
}
474-
});
475-
if (queryCollectorContextOpt.isPresent()) {
476-
return queryCollectorContextOpt.get();
477-
} else {
478-
return createTopDocsCollectorContext(searchContext, hasFilterCollector);
479-
}
480-
}
481-
482-
protected boolean searchWithCollector(
483-
SearchContext searchContext,
484-
ContextIndexSearcher searcher,
485-
Query query,
486-
LinkedList<QueryCollectorContext> collectors,
487-
QueryCollectorContext queryCollectorContext,
488-
boolean hasFilterCollector,
489-
boolean hasTimeout
490-
) throws IOException {
491448
return QueryPhase.searchWithCollector(
492449
searchContext,
493450
searcher,
@@ -498,5 +455,6 @@ protected boolean searchWithCollector(
498455
hasTimeout
499456
);
500457
}
458+
501459
}
502460
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.query;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.search.internal.SearchContext;
13+
14+
/**
15+
* Listener interface that allows plugins to hook into the query phase
16+
* before and after collection. This enables custom CollectorManager
17+
* implementations and data processing for advanced search features like
18+
* hybrid queries and neural search.
19+
*
20+
* <p>This API is experimental and may change in future versions based on
21+
* feedback from plugin implementations.</p>
22+
*
23+
* @opensearch.api
24+
*/
25+
@ExperimentalApi
26+
public interface QueryPhaseListener {
27+
28+
/**
29+
* Called before collection begins in the query phase.
30+
* This allows extensions to set up custom state or modify the search context
31+
* before the main query execution.
32+
*
33+
* @param searchContext the current search context
34+
*/
35+
void beforeCollection(SearchContext searchContext);
36+
37+
/**
38+
* Called after collection completes in the query phase.
39+
* This allows extensions to process collected data or perform
40+
* post-collection operations.
41+
*
42+
* @param searchContext the current search context
43+
*/
44+
void afterCollection(SearchContext searchContext);
45+
46+
}

server/src/main/java/org/opensearch/search/query/QueryPhaseSearcher.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import org.opensearch.search.internal.SearchContext;
1818

1919
import java.io.IOException;
20+
import java.util.Collections;
2021
import java.util.LinkedList;
22+
import java.util.List;
2123

2224
/**
2325
* The extension point which allows to plug in custom search implementation to be
@@ -53,4 +55,12 @@ boolean searchWith(
5355
default AggregationProcessor aggregationProcessor(SearchContext searchContext) {
5456
return new DefaultAggregationProcessor();
5557
}
58+
59+
/**
60+
* Get the list of query phase listeners that should be executed before and after score collection.
61+
* @return list of query phase listeners, empty list if none
62+
*/
63+
default List<QueryPhaseListener> queryPhaseListeners() {
64+
return Collections.emptyList();
65+
}
5666
}

0 commit comments

Comments
 (0)