Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.legacy.executor.AsyncRestExecutor;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
Expand All @@ -21,6 +20,8 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

/**
* The job runner class for scheduling async query.
*
Expand All @@ -37,7 +38,7 @@
public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner {
// Share SQL plugin thread pool
private static final String ASYNC_QUERY_THREAD_POOL_NAME =
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME;
SQL_WORKER_THREAD_POOL_NAME;
private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);

private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.time.Instant;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
Expand Down Expand Up @@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() {
spyJobRunner.runJob(request, context);

ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME))
verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME))
.submit(captor.capture());

Runnable runnable = captor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.node.NodeClient;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

/** The scheduler which schedule the task run in sql-worker thread pool. */
@UtilityClass
public class Scheduler {

public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

public static void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
Expand Down
21 changes: 21 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ Result set::
"transient": {}
}

Thread Pool Settings
====================

The SQL plugin is integrated with the `OpenSearch Thread Pool Settings <https://docs.opensearch.org/latest/install-and-configure/configuring-opensearch/thread-pool-settings/>`_.
There are two thread pools which can be configured on cluster setup via `settings.yml`::

thread_pool:
sql-worker:
size: 30
queue_size: 100
sql_background_io:
size: 30
queue_size: 1000

The ``sql-worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets.
This directly maps to the number of queries that can be run concurrently.
This is the primary pool you interact with externally.
``sql_background_io`` is a low-footprint pool for IO requests the plugin makes,
and can be used to limit indirect load that SQL places on your cluster for Calcite-enabled operations.
A ``sql-worker`` thread may spawn multiple background threads.

plugins.query.executionengine.spark.session.limit
==================================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand All @@ -30,10 +32,6 @@

/** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */
public class AsyncRestExecutor implements RestExecutor {

/** Custom thread pool name managed by OpenSearch */
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
Expand All @@ -24,9 +26,6 @@
import org.opensearch.transport.client.Client;

public class CursorAsyncRestExecutor {
/** Custom thread pool name managed by OpenSearch */
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class);

/** Delegated rest executor to async */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.plugin;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
Expand Down Expand Up @@ -90,7 +91,7 @@ protected Set<String> responseParams() {

private void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker");
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
}

private Runnable withCurrentContext(final Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
Expand Down Expand Up @@ -97,7 +98,7 @@ public interface OpenSearchClient {
*/
void schedule(Runnable task);

NodeClient getNodeClient();
Optional<NodeClient> getNodeClient();

/**
* Create PIT for given indices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -223,8 +224,8 @@ public void schedule(Runnable task) {
}

@Override
public NodeClient getNodeClient() {
return client;
public Optional<NodeClient> getNodeClient() {
return Optional.of(client);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -236,8 +237,8 @@ public void schedule(Runnable task) {
}

@Override
public NodeClient getNodeClient() {
throw new UnsupportedOperationException("Unsupported method.");
public Optional<NodeClient> getNodeClient() {
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
Expand Down Expand Up @@ -47,13 +48,13 @@
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.client.OpenSearchNodeClient;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.transport.client.node.NodeClient;

/** OpenSearch execution engine implementation. */
public class OpenSearchExecutionEngine implements ExecutionEngine {
Expand Down Expand Up @@ -268,9 +269,9 @@ private void buildResultSet(

/** Registers opensearch-dependent functions */
private void registerOpenSearchFunctions() {
if (client instanceof OpenSearchNodeClient) {
SqlUserDefinedFunction geoIpFunction =
new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP");
Optional<NodeClient> nodeClient = client.getNodeClient();
if (nodeClient.isPresent()) {
SqlUserDefinedFunction geoIpFunction = new GeoIpFunction(nodeClient.get()).toUDF("GEOIP");
PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction);
} else {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class OpenSearchQueryManager implements QueryManager {

private final NodeClient nodeClient;

private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";
public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io";

@Override
public QueryId submit(AbstractPlan queryPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.transport.client.node.NodeClient;

/** OpenSearch table (index) implementation. */
public class OpenSearchIndex extends AbstractOpenSearchTable {
Expand Down Expand Up @@ -231,27 +233,43 @@ public static class OpenSearchDefaultImplementor extends DefaultImplementor<Open

@Override
public PhysicalPlan visitMLCommons(LogicalMLCommons node, OpenSearchIndexScan context) {
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Machine Learning operators on clients outside of the local node");
}
return new MLCommonsOperator(
visitChild(node, context),
node.getAlgorithm(),
node.getArguments(),
client.getNodeClient());
visitChild(node, context), node.getAlgorithm(), node.getArguments(), nc.get());
}

@Override
public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) {
return new ADOperator(visitChild(node, context), node.getArguments(), client.getNodeClient());
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Anomaly Detector operators on clients outside of the local node");
}
return new ADOperator(visitChild(node, context), node.getArguments(), nc.get());
}

@Override
public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) {
return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient());
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Machine Learning operators on clients outside of the local node");
}
return new MLOperator(visitChild(node, context), node.getArguments(), nc.get());
}

@Override
public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) {
return new OpenSearchEvalOperator(
visitChild(node, context), node.getExpressions(), client.getNodeClient());
Optional<NodeClient> nc = client.getNodeClient();
if (nc.isEmpty()) {
throw new UnsupportedOperationException(
"Unable to run Eval operators on clients outside of the local node");
}
return new OpenSearchEvalOperator(visitChild(node, context), node.getExpressions(), nc.get());
}
}

Expand Down
Loading
Loading