diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java index feacb615390..ab39ecd62d0 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java @@ -12,7 +12,6 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.Plugin; -import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -21,6 +20,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + /** * The job runner class for scheduling async query. * @@ -37,7 +38,7 @@ public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner { // Share SQL plugin thread pool private static final String ASYNC_QUERY_THREAD_POOL_NAME = - AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME; + SQL_WORKER_THREAD_POOL_NAME; private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner(); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java index 30b242db816..a34ad909365 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import java.time.Instant; import org.apache.logging.log4j.LogManager; @@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() { spyJobRunner.runJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME)) .submit(captor.capture()); Runnable runnable = captor.getValue(); @@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() { spyJobRunner.runJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME)) .submit(captor.capture()); Runnable runnable = captor.getValue(); diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java b/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java index 1cf54ffd88d..8aa96338948 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java @@ -11,13 +11,11 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.node.NodeClient; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; /** The scheduler which schedule the task run in sql-worker thread pool. */ @UtilityClass public class Scheduler { - - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - public static void schedule(NodeClient client, Runnable task) { ThreadPool threadPool = client.threadPool(); threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 61fc9fa83d9..b0d43218a73 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -216,6 +216,27 @@ Result set:: "transient": {} } +Thread Pool Settings +==================== + +The SQL plugin is integrated with the `OpenSearch Thread Pool Settings `_. +There are two thread pools which can be configured on cluster setup via `settings.yml`:: + + thread_pool: + sql-worker: + size: 30 + queue_size: 100 + sql_background_io: + size: 30 + queue_size: 1000 + +The ``sql-worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets. +This directly maps to the number of queries that can be run concurrently. +This is the primary pool you interact with externally. +``sql_background_io`` is a low-footprint pool for IO requests the plugin makes, +and can be used to limit indirect load that SQL places on your cluster for Calcite-enabled operations. +A ``sql-worker`` thread may spawn multiple background threads. + plugins.query.executionengine.spark.session.limit ================================================== diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java index 2b16b584453..15580cf6477 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + import java.io.IOException; import java.time.Duration; import java.util.Map; @@ -30,10 +32,6 @@ /** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */ public class AsyncRestExecutor implements RestExecutor { - - /** Custom thread pool name managed by OpenSearch */ - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class); /** diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java index 799aa55cf40..0a1e043b811 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor.cursor; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + import java.io.IOException; import java.time.Duration; import java.util.Map; @@ -24,9 +26,6 @@ import org.opensearch.transport.client.Client; public class CursorAsyncRestExecutor { - /** Custom thread pool name managed by OpenSearch */ - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class); /** Delegated rest executor to async */ diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java index 32b4d17ecda..bbf1d351850 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java @@ -6,6 +6,7 @@ package org.opensearch.sql.legacy.plugin; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import com.google.common.collect.ImmutableList; import java.util.Arrays; @@ -90,7 +91,7 @@ protected Set responseParams() { private void schedule(NodeClient client, Runnable task) { ThreadPool threadPool = client.threadPool(); - threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker"); + threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } private Runnable withCurrentContext(final Runnable task) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java index 0261bc98120..68350c5a0fd 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.sql.opensearch.mapping.IndexMapping; @@ -97,7 +98,7 @@ public interface OpenSearchClient { */ void schedule(Runnable task); - NodeClient getNodeClient(); + Optional getNodeClient(); /** * Create PIT for given indices diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index 152fe499d2e..dab4b1e8ff1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -11,6 +11,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Predicate; @@ -223,8 +224,8 @@ public void schedule(Runnable task) { } @Override - public NodeClient getNodeClient() { - return client; + public Optional getNodeClient() { + return Optional.of(client); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index 87b171707bb..427eb7d6b03 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; @@ -236,8 +237,8 @@ public void schedule(Runnable task) { } @Override - public NodeClient getNodeClient() { - throw new UnsupportedOperationException("Unsupported method."); + public Optional getNodeClient() { + return Optional.empty(); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index b5c2d6edccc..d13d52e3d5f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -15,6 +15,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; @@ -47,13 +48,13 @@ import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.PPLFuncImpTable; import org.opensearch.sql.opensearch.client.OpenSearchClient; -import org.opensearch.sql.opensearch.client.OpenSearchNodeClient; import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector; import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction; import org.opensearch.sql.opensearch.functions.GeoIpFunction; import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.transport.client.node.NodeClient; /** OpenSearch execution engine implementation. */ public class OpenSearchExecutionEngine implements ExecutionEngine { @@ -268,9 +269,9 @@ private void buildResultSet( /** Registers opensearch-dependent functions */ private void registerOpenSearchFunctions() { - if (client instanceof OpenSearchNodeClient) { - SqlUserDefinedFunction geoIpFunction = - new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"); + Optional nodeClient = client.getNodeClient(); + if (nodeClient.isPresent()) { + SqlUserDefinedFunction geoIpFunction = new GeoIpFunction(nodeClient.get()).toUDF("GEOIP"); PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction); } else { logger.info( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java index 76218d8295d..75cc5280214 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java @@ -21,7 +21,8 @@ public class OpenSearchQueryManager implements QueryManager { private final NodeClient nodeClient; - private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io"; @Override public QueryId submit(AbstractPlan queryPlan) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index c8b00c6daed..3ea7280b16b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -10,6 +10,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import lombok.Getter; @@ -45,6 +46,7 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.transport.client.node.NodeClient; /** OpenSearch table (index) implementation. */ public class OpenSearchIndex extends AbstractOpenSearchTable { @@ -231,27 +233,43 @@ public static class OpenSearchDefaultImplementor extends DefaultImplementor nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Machine Learning operators on clients outside of the local node"); + } return new MLCommonsOperator( - visitChild(node, context), - node.getAlgorithm(), - node.getArguments(), - client.getNodeClient()); + visitChild(node, context), node.getAlgorithm(), node.getArguments(), nc.get()); } @Override public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) { - return new ADOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); + Optional nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Anomaly Detector operators on clients outside of the local node"); + } + return new ADOperator(visitChild(node, context), node.getArguments(), nc.get()); } @Override public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { - return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); + Optional nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Machine Learning operators on clients outside of the local node"); + } + return new MLOperator(visitChild(node, context), node.getArguments(), nc.get()); } @Override public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) { - return new OpenSearchEvalOperator( - visitChild(node, context), node.getExpressions(), client.getNodeClient()); + Optional nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Eval operators on clients outside of the local node"); + } + return new OpenSearchEvalOperator(visitChild(node, context), node.getExpressions(), nc.get()); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java new file mode 100644 index 00000000000..4019346e055 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.exception.NonFallbackCalciteException; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.response.OpenSearchResponse; + +/** + * Utility class for asynchronously scanning an index. This lets us send background requests to the + * index while we work on processing the previous batch. + * + *

Lifecycle

+ * + * The typical usage pattern is: + * + *
+ *   1. Create scanner: new BackgroundSearchScanner(client)
+ *   2. Start initial scan: startScanning(request)
+ *   3. Fetch batches in a loop: fetchNextBatch(request, maxWindow)
+ *   4. Close scanner when done: close()
+ * 
+ * + *

Async vs Sync Behavior

+ * + * The scanner attempts to operate asynchronously when possible to improve performance: + * + *
    + *
  • When async is available (client has thread pool access): - Next batch is pre-fetched while + * current batch is being processed - Reduces latency between batches + *
  • When async is not available (client lacks thread pool access): - Falls back to synchronous + * fetching - Each batch is fetched only when needed + *
+ * + *

Termination Conditions

+ * + * Scanning will stop when any of these conditions are met: + * + *
    + *
  • An empty response is received (lastBatch = true) + *
  • Response is an aggregation or count response (fetchOnce = true) + *
  • Response size is less than maxResultWindow (fetchOnce = true) + *
+ * + * Note: This class should be explicitly closed when no longer needed to ensure proper resource + * cleanup. + */ +public class BackgroundSearchScanner { + private final OpenSearchClient client; + @Nullable private final Executor backgroundExecutor; + private CompletableFuture nextBatchFuture = null; + private boolean stopIteration = false; + + public BackgroundSearchScanner(OpenSearchClient client) { + this.client = client; + // We can only actually do the background operation if we have the ability to access the thread + // pool. Otherwise, fallback to synchronous fetch. + if (client.getNodeClient().isPresent()) { + this.backgroundExecutor = + client.getNodeClient().get().threadPool().executor(SQL_BACKGROUND_THREAD_POOL_NAME); + } else { + this.backgroundExecutor = null; + } + } + + private boolean isAsync() { + return backgroundExecutor != null; + } + + /** + * @return Whether the search scanner has fetched all batches + */ + public boolean isScanDone() { + return stopIteration; + } + + /** + * Initiates the scanning process. If async operations are available, this will trigger the first + * background fetch. + * + * @param request The OpenSearch request to execute + */ + public void startScanning(OpenSearchRequest request) { + if (isAsync()) { + nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + } + } + + private OpenSearchResponse getCurrentResponse(OpenSearchRequest request) { + if (isAsync()) { + try { + return nextBatchFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new NonFallbackCalciteException( + "Failed to fetch data from the index: the background task failed or interrupted.\n" + + " Inner error: " + + e.getMessage()); + } + } else { + return client.search(request); + } + } + + /** + * Fetches the next batch of results. If async is enabled and more batches are expected, this will + * also trigger the next background fetch. + * + * @param request The OpenSearch request to execute + * @param maxResultWindow Maximum number of results to fetch per batch + * @return SearchBatchResult containing the current batch's iterator and completion status + * @throws NonFallbackCalciteException if the background fetch fails or is interrupted + */ + public SearchBatchResult fetchNextBatch(OpenSearchRequest request, int maxResultWindow) { + OpenSearchResponse response = getCurrentResponse(request); + + // Determine if we need future batches + if (response.isAggregationResponse() + || response.isCountResponse() + || response.getHitsSize() < maxResultWindow) { + stopIteration = true; + } + + Iterator iterator; + if (!response.isEmpty()) { + iterator = response.iterator(); + + // Pre-fetch next batch if needed + if (!stopIteration && isAsync()) { + nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + } + } else { + iterator = Collections.emptyIterator(); + stopIteration = true; + } + + return new SearchBatchResult(iterator, stopIteration); + } + + /** + * Resets the scanner to its initial state, allowing a new scan to begin. This clears all + * completion flags and initiates a new background fetch if async is enabled. + * + * @param request The OpenSearch request to execute + */ + public void reset(OpenSearchRequest request) { + stopIteration = false; + startScanning(request); + } + + /** + * Releases resources associated with this scanner. Cancels any pending background fetches and + * marks the scan as complete. The scanner cannot be reused after closing without calling reset(). + */ + public void close() { + stopIteration = true; + if (nextBatchFuture != null) { + nextBatchFuture.cancel(true); + } + } + + public record SearchBatchResult(Iterator iterator, boolean stopIteration) {} +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index c4118a965da..e684d128914 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -17,7 +17,6 @@ import org.opensearch.sql.monitor.ResourceMonitor; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.request.OpenSearchRequest; -import org.opensearch.sql.opensearch.response.OpenSearchResponse; /** * Supports a simple iteration over a collection for OpenSearch index @@ -31,6 +30,8 @@ public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; + private final BackgroundSearchScanner bgScanner; + private final List fields; /** Search request. */ @@ -49,15 +50,12 @@ public class OpenSearchIndexEnumerator implements Enumerator { private final ResourceMonitor monitor; /** Number of rows returned. */ - private Integer queryCount; + private Integer queryCount = 0; /** Search response for current batch. */ private Iterator iterator; - private ExprValue current; - - /** flag to indicate whether fetch more than one batch */ - private boolean fetchOnce = false; + private ExprValue current = null; public OpenSearchIndexEnumerator( OpenSearchClient client, @@ -66,33 +64,24 @@ public OpenSearchIndexEnumerator( int maxResultWindow, OpenSearchRequest request, ResourceMonitor monitor) { - this.client = client; + if (!monitor.isHealthy()) { + throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); + } + this.fields = fields; this.request = request; this.maxResponseSize = maxResponseSize; this.maxResultWindow = maxResultWindow; this.monitor = monitor; - this.queryCount = 0; - this.current = null; - if (!this.monitor.isHealthy()) { - throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); - } + this.client = client; + this.bgScanner = new BackgroundSearchScanner(client); + this.bgScanner.startScanning(request); } - private void fetchNextBatch() { - OpenSearchResponse response = client.search(request); - if (response.isAggregationResponse() - || response.isCountResponse() - || response.getHitsSize() < maxResultWindow) { - // no need to fetch next batch if it's for an aggregation - // or the length of response hits is less than max result window size. - fetchOnce = true; - } - if (!response.isEmpty()) { - iterator = response.iterator(); - } else if (iterator == null) { - iterator = Collections.emptyIterator(); - } + private Iterator fetchNextBatch() { + BackgroundSearchScanner.SearchBatchResult result = + bgScanner.fetchNextBatch(request, maxResultWindow); + return result.iterator(); } @Override @@ -121,8 +110,8 @@ public boolean moveNext() { throw new NonFallbackCalciteException("insufficient resources to load next row, quit."); } - if (iterator == null || (!iterator.hasNext() && !fetchOnce)) { - fetchNextBatch(); + if (iterator == null || (!iterator.hasNext() && !this.bgScanner.isScanDone())) { + iterator = fetchNextBatch(); } if (iterator.hasNext()) { current = iterator.next(); @@ -135,18 +124,16 @@ public boolean moveNext() { @Override public void reset() { - OpenSearchResponse response = client.search(request); - if (!response.isEmpty()) { - iterator = response.iterator(); - } else { - iterator = Collections.emptyIterator(); - } + bgScanner.reset(request); + iterator = bgScanner.fetchNextBatch(request, maxResultWindow).iterator(); queryCount = 0; } @Override public void close() { iterator = Collections.emptyIterator(); + queryCount = 0; + bgScanner.close(); if (request != null) { client.forceCleanup(request); request = null; diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java index 6be02c9d6f1..8cee2cf3f3a 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import lombok.SneakyThrows; import org.apache.commons.lang3.reflect.FieldUtils; @@ -527,7 +528,7 @@ void meta_with_IOException() throws IOException { @Test void ml_with_exception() { - assertThrows(UnsupportedOperationException.class, () -> client.getNodeClient()); + assertEquals(Optional.empty(), client.getNodeClient()); } private Map mockFieldMappings(String indexName, String mappings) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java index 85d0a4e94fa..1977a03a1be 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; @@ -18,6 +19,7 @@ import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.transport.client.node.NodeClient; @ExtendWith(MockitoExtension.class) public class OpenSearchDefaultImplementorTest { @@ -27,7 +29,8 @@ public class OpenSearchDefaultImplementorTest { @Test public void visitMachineLearning() { LogicalMLCommons node = Mockito.mock(LogicalMLCommons.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(node.getChild().getFirst()).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(client); assertNotNull(implementor.visitMLCommons(node, null)); @@ -36,7 +39,8 @@ public void visitMachineLearning() { @Test public void visitAD() { LogicalAD node = Mockito.mock(LogicalAD.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(node.getChild().getFirst()).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(client); assertNotNull(implementor.visitAD(node, null)); @@ -45,7 +49,8 @@ public void visitAD() { @Test public void visitML() { LogicalML node = Mockito.mock(LogicalML.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(node.getChild().getFirst()).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(client); assertNotNull(implementor.visitML(node, null)); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 6a4713dc917..7bba55955b2 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -38,6 +39,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.ast.tree.Sort; @@ -58,6 +60,7 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.physical.PhysicalPlanDSL; +import org.opensearch.transport.client.node.NodeClient; @ExtendWith(MockitoExtension.class) class OpenSearchIndexTest { @@ -225,6 +228,7 @@ void implementRelationOperatorWithOptimization() { @Test void implementOtherLogicalOperators() { when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); + when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); NamedExpression include = named("age", ref("age", INTEGER)); ReferenceExpression exclude = ref("name", STRING); ReferenceExpression dedupeField = ref("name", STRING); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java new file mode 100644 index 00000000000..f4a7f297df9 --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java @@ -0,0 +1,150 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.response.OpenSearchResponse; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.node.NodeClient; + +class BackgroundSearchScannerTest { + private OpenSearchClient client; + private NodeClient nodeClient; + private ThreadPool threadPool; + private OpenSearchRequest request; + private BackgroundSearchScanner scanner; + private ExecutorService executor; + + @BeforeEach + void setUp() { + client = mock(OpenSearchClient.class); + nodeClient = mock(NodeClient.class); + threadPool = mock(ThreadPool.class); + request = mock(OpenSearchRequest.class); + executor = Executors.newSingleThreadExecutor(); + + when(client.getNodeClient()).thenReturn(Optional.of(nodeClient)); + when(nodeClient.threadPool()).thenReturn(threadPool); + when(threadPool.executor(any())).thenReturn(executor); + + scanner = new BackgroundSearchScanner(client); + } + + @Test + void testSyncFallbackWhenNoNodeClient() { + // Setup client without node client + OpenSearchClient syncClient = mock(OpenSearchClient.class); + when(syncClient.getNodeClient()).thenReturn(Optional.empty()); + scanner = new BackgroundSearchScanner(syncClient); + + OpenSearchResponse response = mockResponse(false, false, 10); + when(syncClient.search(request)).thenReturn(response); + + scanner.startScanning(request); + BackgroundSearchScanner.SearchBatchResult result = scanner.fetchNextBatch(request, 10); + + assertFalse( + result.stopIteration(), "Expected iteration to continue after fetching one full page"); + verify(syncClient, times(1)).search(request); + } + + @Test + void testCompleteScanWithMultipleBatches() { + // First batch: normal response + OpenSearchResponse response1 = mockResponse(false, false, 10); + // Second batch: empty response + OpenSearchResponse response2 = mockResponse(true, false, 5); + + when(client.search(request)).thenReturn(response1).thenReturn(response2); + + scanner.startScanning(request); + + // First batch + BackgroundSearchScanner.SearchBatchResult result1 = scanner.fetchNextBatch(request, 10); + assertFalse( + result1.stopIteration(), "Expected iteration to continue after fetching 10/15 results"); + assertTrue(result1.iterator().hasNext()); + + // Second batch + BackgroundSearchScanner.SearchBatchResult result2 = scanner.fetchNextBatch(request, 10); + assertTrue(result2.stopIteration()); + assertFalse(result2.iterator().hasNext()); + } + + @Test + void testFetchOnceForAggregationResponse() { + OpenSearchResponse response = mockResponse(false, true, 1); + when(client.search(request)).thenReturn(response); + + scanner.startScanning(request); + BackgroundSearchScanner.SearchBatchResult result = scanner.fetchNextBatch(request, 10); + + assertTrue(scanner.isScanDone()); + } + + @Test + void testFetchOnceWhenResultsBelowWindow() { + OpenSearchResponse response = mockResponse(false, false, 5); + when(client.search(request)).thenReturn(response); + + scanner.startScanning(request); + BackgroundSearchScanner.SearchBatchResult result = scanner.fetchNextBatch(request, 10); + + assertTrue(scanner.isScanDone()); + } + + @Test + void testReset() { + OpenSearchResponse response1 = mockResponse(false, false, 5); + OpenSearchResponse response2 = mockResponse(true, false, 0); + + when(client.search(request)).thenReturn(response1).thenReturn(response2); + + scanner.startScanning(request); + scanner.fetchNextBatch(request, 10); + scanner.fetchNextBatch(request, 10); + + assertTrue(scanner.isScanDone()); + + scanner.reset(request); + + assertFalse(scanner.isScanDone()); + } + + private OpenSearchResponse mockResponse(boolean isEmpty, boolean isAggregation, int numResults) { + OpenSearchResponse response = mock(OpenSearchResponse.class); + when(response.isEmpty()).thenReturn(isEmpty); + when(response.isAggregationResponse()).thenReturn(isAggregation); + + if (numResults > 0) { + ExprValue[] values = new ExprValue[numResults]; + Arrays.fill(values, mock(ExprValue.class)); + when(response.iterator()).thenReturn(Arrays.asList(values).iterator()); + } else { + when(response.iterator()).thenReturn(Collections.emptyIterator()); + } + + when(response.getHitsSize()).thenReturn(numResults); + return response; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index fe4be96fc07..343a36880fc 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,8 +5,9 @@ package org.opensearch.sql.plugin; -import static java.util.Collections.singletonList; import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; import com.google.common.collect.ImmutableList; @@ -84,7 +85,6 @@ import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse; import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse; import org.opensearch.sql.legacy.esdomain.LocalClusterState; -import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; import org.opensearch.sql.legacy.plugin.RestSqlStatsAction; @@ -314,13 +314,24 @@ public ScheduledJobParser getJobParser() { @Override public List> getExecutorBuilders(Settings settings) { - return singletonList( + // The worker pool is the primary pool where most of the work is done. The background thread + // pool is a separate queue for asynchronous requests to other nodes. We keep them separate to + // prevent deadlocks during async fetches on small node counts. Tasks in the background pool + // should do no work except I/O to other services. + return List.of( new FixedExecutorBuilder( settings, - AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME, + SQL_WORKER_THREAD_POOL_NAME, OpenSearchExecutors.allocatedProcessors(settings), 1000, - null)); + "thread_pool." + SQL_WORKER_THREAD_POOL_NAME), + new FixedExecutorBuilder( + settings, + SQL_BACKGROUND_THREAD_POOL_NAME, + settings.getAsInt( + "thread_pool.search.size", OpenSearchExecutors.allocatedProcessors(settings)), + 1000, + "thread_pool." + SQL_BACKGROUND_THREAD_POOL_NAME)); } @Override