Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
Expand All @@ -21,6 +20,8 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

/**
* The job runner class for scheduling async query.
*
Expand All @@ -37,7 +38,7 @@
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
// Share SQL plugin thread pool
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
SQL_WORKER_THREAD_POOL_NAME;
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);

private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.time.Instant;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
Expand Down Expand Up @@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.node.NodeClient;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

/** The scheduler which schedule the task run in sql-worker thread pool. */
/** The scheduler which schedule the task run in sql_worker thread pool. */
@UtilityClass
public class Scheduler {

public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

public static void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/query-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ Parser parse raw query as Statement and create AbstractPlan. Each AbstractPlan d
### Change of existing logic
1. Remove the schedule logic in NIO thread. After the change,
a. Parser will be executed in NIO thread.
b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql-worker** thread pool.
b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql_worker** thread pool.
21 changes: 21 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ Result set::
"transient": {}
}

Thread Pool Settings
====================

The SQL plugin is integrated with the `OpenSearch Thread Pool Settings <https://docs.opensearch.org/latest/install-and-configure/configuring-opensearch/thread-pool-settings/>`_.
There are two thread pools which can be configured on cluster setup via `settings.yml`::

thread_pool:
sql_worker:
size: 30
queue_size: 100
sql_background_io:
size: 30
queue_size: 1000

The ``sql_worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets.
This directly maps to the number of queries that can be run concurrently.
This is the primary pool you interact with externally.
``sql_background_io`` is a low-footprint pool for IO requests the plugin makes,
and can be used to limit indirect load that SQL places on your cluster for Calcite-enabled operations.
A ``sql_worker`` thread may spawn multiple background threads.

plugins.query.executionengine.spark.session.limit
==================================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand All @@ -30,10 +32,6 @@

/** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */
public class AsyncRestExecutor implements RestExecutor {

/** Custom thread pool name managed by OpenSearch */
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand All @@ -24,9 +26,6 @@
import org.opensearch.transport.client.Client;

public class CursorAsyncRestExecutor {
/** Custom thread pool name managed by OpenSearch */
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class);

/** Delegated rest executor to async */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.plugin;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
Expand Down Expand Up @@ -90,7 +91,7 @@ protected Set<String> responseParams() {

private void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker");
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
}

private Runnable withCurrentContext(final Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
Expand Down Expand Up @@ -97,7 +98,7 @@ public interface OpenSearchClient {
*/
void schedule(Runnable task);

NodeClient getNodeClient();
Optional<NodeClient> getNodeClient();

/**
* Create PIT for given indices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -218,13 +219,13 @@ public void cleanup(OpenSearchRequest request) {

@Override
public void schedule(Runnable task) {
// at that time, task already running the sql-worker ThreadPool.
// at that time, task already running the sql_worker ThreadPool.
task.run();
}

@Override
public NodeClient getNodeClient() {
return client;
public Optional<NodeClient> getNodeClient() {
return Optional.of(client);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -236,8 +237,8 @@ public void schedule(Runnable task) {
}

@Override
public NodeClient getNodeClient() {
throw new UnsupportedOperationException("Unsupported method.");
public Optional<NodeClient> getNodeClient() {
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -47,13 +48,13 @@
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.client.OpenSearchNodeClient;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.transport.client.node.NodeClient;

/** OpenSearch execution engine implementation. */
public class OpenSearchExecutionEngine implements ExecutionEngine {
Expand Down Expand Up @@ -268,9 +269,9 @@ private void buildResultSet(

/** Registers opensearch-dependent functions */
private void registerOpenSearchFunctions() {
if (client instanceof OpenSearchNodeClient) {
SqlUserDefinedFunction geoIpFunction =
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
Optional<NodeClient> nodeClient = client.getNodeClient();
if (nodeClient.isPresent()) {
SqlUserDefinedFunction geoIpFunction = new GeoIpFunction(nodeClient.get()).toUDF("GEOIP");
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
} else {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class OpenSearchQueryManager implements QueryManager {

private final NodeClient nodeClient;

private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql_worker";
public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io";

@Override
public QueryId submit(AbstractPlan queryPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.transport.client.node.NodeClient;

/** OpenSearch table (index) implementation. */
public class OpenSearchIndex extends AbstractOpenSearchTable {
Expand Down Expand Up @@ -231,27 +233,43 @@ public static class OpenSearchDefaultImplementor extends DefaultImplementor<Open

@Override
public PhysicalPlan visitMLCommons(LogicalMLCommons node, OpenSearchIndexScan context) {
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Machine Learning operators on clients outside of the local node");
}
return new MLCommonsOperator(
visitChild(node, context),
node.getAlgorithm(),
node.getArguments(),
client.getNodeClient());
visitChild(node, context), node.getAlgorithm(), node.getArguments(), nc.get());
}

@Override
public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) {
return new ADOperator(visitChild(node, context), node.getArguments(), client.getNodeClient());
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Anomaly Detector operators on clients outside of the local node");
}
return new ADOperator(visitChild(node, context), node.getArguments(), nc.get());
}

@Override
public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) {
return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient());
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Machine Learning operators on clients outside of the local node");
}
return new MLOperator(visitChild(node, context), node.getArguments(), nc.get());
}

@Override
public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) {
return new OpenSearchEvalOperator(
visitChild(node, context), node.getExpressions(), client.getNodeClient());
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Eval operators on clients outside of the local node");
}
return new OpenSearchEvalOperator(visitChild(node, context), node.getExpressions(), nc.get());
}
}

Expand Down
Loading
Loading