diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt index 09f8d2e00..b3655e226 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt @@ -12,13 +12,13 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionRequest import org.opensearch.action.delete.DeleteRequest -import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.comments.CommentsIndices.Companion.ALL_COMMENTS_INDEX_PATTERN import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.suspendUntil import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings @@ -38,6 +38,8 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.query.QueryBuilders +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -49,6 +51,7 @@ private val log = LogManager.getLogger(TransportDeleteAlertingCommentAction::cla class TransportDeleteAlertingCommentAction @Inject constructor( transportService: TransportService, val client: Client, + val sdkClient: SdkClient, actionFilters: ActionFilters, val clusterService: ClusterService, settings: Settings, @@ -151,11 +154,15 @@ class TransportDeleteAlertingCommentAction @Inject constructor( .version(true) .seqNoAndPrimaryTerm(true) .query(queryBuilder) - val searchRequest = SearchRequest() - .source(searchSourceBuilder) + val searchRequest = SearchDataObjectRequest.builder() + .searchSourceBuilder(searchSourceBuilder) .indices(ALL_COMMENTS_INDEX_PATTERN) + .build() + + val searchResponse: SearchResponse = sdkClient.suspendUntil { + searchDataObjectAsync(searchRequest).whenComplete(it) + } - val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } val comments = searchResponse.hits.map { hit -> val xcp = XContentHelper.createParser( NamedXContentRegistry.EMPTY, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt index d0d3e45f7..c04c6cf5a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt @@ -6,8 +6,8 @@ package org.opensearch.alerting.transport import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException import org.opensearch.OpenSearchStatusException -import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction @@ -29,6 +29,9 @@ import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.index.IndexNotFoundException +import org.opensearch.remote.metadata.client.GetDataObjectRequest +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -36,6 +39,7 @@ import org.opensearch.transport.client.Client class TransportGetWorkflowAction @Inject constructor( transportService: TransportService, val client: Client, + val sdkClient: SdkClient, actionFilters: ActionFilters, val xContentRegistry: NamedXContentRegistry, val clusterService: ClusterService, @@ -56,94 +60,97 @@ class TransportGetWorkflowAction @Inject constructor( override fun doExecute(task: Task, getWorkflowRequest: GetWorkflowRequest, actionListener: ActionListener) { val user = readUserFromThreadContext(client) - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getWorkflowRequest.workflowId) + val getRequest = GetDataObjectRequest.builder() + .index(ScheduledJob.SCHEDULED_JOBS_INDEX) + .id(getWorkflowRequest.workflowId) + .build() if (!validateUserBackendRoles(user, actionListener)) { return } client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - log.error("Workflow with ${getWorkflowRequest.workflowId} not found") - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException( - "Workflow not found.", - RestStatus.NOT_FOUND + sdkClient.getDataObjectAsync(getRequest) + .whenComplete( + SdkClientUtils.wrapGetCompletion(object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + log.error("Workflow with ${getWorkflowRequest.workflowId} not found") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) ) ) - ) - return - } + return + } - var workflow: Workflow? = null - if (!response.isSourceEmpty) { - XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON - ).use { xcp -> - val compositeMonitor = ScheduledJob.parse(xcp, response.id, response.version) - if (compositeMonitor is Workflow) { - workflow = compositeMonitor - } else { - log.error("Wrong monitor type returned") - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException( - "Workflow not found.", - RestStatus.NOT_FOUND + var workflow: Workflow? = null + if (!response.isSourceEmpty) { + XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON + ).use { xcp -> + val compositeMonitor = ScheduledJob.parse(xcp, response.id, response.version) + if (compositeMonitor is Workflow) { + workflow = compositeMonitor + } else { + log.error("Wrong monitor type returned") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) ) ) - ) - return - } + return + } - // security is enabled and filterby is enabled - if (!checkUserPermissionsWithResource( - user, - workflow?.user, - actionListener, - "workflow", - getWorkflowRequest.workflowId - ) - ) { - return + // security is enabled and filterby is enabled + if (!checkUserPermissionsWithResource( + user, + workflow?.user, + actionListener, + "workflow", + getWorkflowRequest.workflowId + ) + ) { + return + } } } - } - actionListener.onResponse( - GetWorkflowResponse( - response.id, - response.version, - response.seqNo, - response.primaryTerm, - RestStatus.OK, - workflow + actionListener.onResponse( + GetWorkflowResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + workflow + ) ) - ) - } + } - override fun onFailure(t: Exception) { - log.error("Getting the workflow failed", t) + override fun onFailure(t: Exception) { + log.error("Getting the workflow failed", t) - if (t is IndexNotFoundException) { - actionListener.onFailure( - OpenSearchStatusException( - "Workflow not found", - RestStatus.NOT_FOUND + if (t is IndexNotFoundException || t is OpenSearchException && t.cause is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found", + RestStatus.NOT_FOUND + ) ) - ) - } else { - actionListener.onFailure(AlertingException.wrap(t)) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } } - } - } - ) + }) + ) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailAccountAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailAccountAction.kt index b63d9a488..28746eb54 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailAccountAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailAccountAction.kt @@ -20,6 +20,9 @@ import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -27,6 +30,7 @@ import org.opensearch.transport.client.Client class TransportSearchEmailAccountAction @Inject constructor( transportService: TransportService, val client: Client, + val sdkClient: SdkClient, actionFilters: ActionFilters, val clusterService: ClusterService, settings: Settings @@ -54,19 +58,24 @@ class TransportSearchEmailAccountAction @Inject constructor( return } + val searchDataObjectRequest = SearchDataObjectRequest.builder() + .indices(*searchRequest.indices()) + .searchSourceBuilder(searchRequest.source()) + .build() + client.threadPool().threadContext.stashContext().use { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - actionListener.onResponse(response) - } + sdkClient.searchDataObjectAsync(searchDataObjectRequest) + .whenComplete( + SdkClientUtils.wrapSearchCompletion(object : ActionListener { + override fun onResponse(response: SearchResponse) { + actionListener.onResponse(response) + } - override fun onFailure(e: Exception) { - actionListener.onFailure(e) - } - } - ) + override fun onFailure(e: Exception) { + actionListener.onFailure(e) + } + }) + ) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailGroupAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailGroupAction.kt index f6b3b92d1..9a8f3c729 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailGroupAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchEmailGroupAction.kt @@ -20,6 +20,9 @@ import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -27,6 +30,7 @@ import org.opensearch.transport.client.Client class TransportSearchEmailGroupAction @Inject constructor( transportService: TransportService, val client: Client, + val sdkClient: SdkClient, actionFilters: ActionFilters, val clusterService: ClusterService, settings: Settings @@ -54,19 +58,22 @@ class TransportSearchEmailGroupAction @Inject constructor( return } + val searchDataObjectRequest = SearchDataObjectRequest.builder().indices(*searchRequest.indices()) + .searchSourceBuilder(searchRequest.source()).build() + client.threadPool().threadContext.stashContext().use { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - actionListener.onResponse(response) - } + sdkClient.searchDataObjectAsync(searchDataObjectRequest) + .whenComplete( + SdkClientUtils.wrapSearchCompletion(object : ActionListener { + override fun onResponse(response: SearchResponse) { + actionListener.onResponse(response) + } - override fun onFailure(e: Exception) { - actionListener.onFailure(e) - } - } - ) + override fun onFailure(e: Exception) { + actionListener.onFailure(e) + } + }) + ) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index 73a883de4..0e900a725 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -31,6 +31,9 @@ import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.ExistsQueryBuilder import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.client.SearchDataObjectRequest +import org.opensearch.remote.metadata.common.SdkClientUtils import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import org.opensearch.transport.client.Client @@ -41,6 +44,7 @@ class TransportSearchMonitorAction @Inject constructor( transportService: TransportService, val settings: Settings, val client: Client, + val sdkClient: SdkClient, clusterService: ClusterService, actionFilters: ActionFilters, val namedWriteableRegistry: NamedWriteableRegistry @@ -101,18 +105,25 @@ class TransportSearchMonitorAction @Inject constructor( } fun search(searchRequest: SearchRequest, actionListener: ActionListener) { - client.search( - searchRequest, - object : ActionListener { - override fun onResponse(response: SearchResponse) { - actionListener.onResponse(response) - } + val searchDataObjectRequest = SearchDataObjectRequest.builder() + .indices(*searchRequest.indices()) + .searchSourceBuilder(searchRequest.source()) + .build() - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } - } - ) + client.threadPool().threadContext.stashContext().use { + sdkClient.searchDataObjectAsync(searchDataObjectRequest) + .whenComplete( + SdkClientUtils.wrapSearchCompletion(object : ActionListener { + override fun onResponse(response: SearchResponse) { + actionListener.onResponse(response) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + ) + } } private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/SdkClientExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/SdkClientExtensions.kt new file mode 100644 index 000000000..77e28642b --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/SdkClientExtensions.kt @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import kotlinx.coroutines.suspendCancellableCoroutine +import org.opensearch.OpenSearchStatusException +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.remote.metadata.client.SdkClient +import org.opensearch.remote.metadata.common.SdkClientUtils +import java.util.function.BiConsumer +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Suspends until an SdkClient async operation completes, with parsing and exception handling. + * + * @param block A lambda that receives a [BiConsumer] to handle the [CompletionStage]'s result or exception. + * @return The parsed result of type [R] (e.g., [IndexResponse], [GetResponse]) after converting the raw response. + * @throws IllegalArgumentException If the raw response does not have a `parser()` method or cannot be parsed to [R]. + */ +suspend inline fun SdkClient.suspendUntil( + crossinline block: SdkClient.(BiConsumer) -> Unit +): R { + try { + val rawResponse: Raw = suspendCancellableCoroutine { cont -> + block( + BiConsumer { result, exception -> + if (exception != null) { + cont.resumeWithException(SdkClientUtils.unwrapAndConvertToException(exception)) + } else { + cont.resume(result) + } + } + ) + } + val parser = rawResponse::class.java.getMethod("parser").invoke(rawResponse) as? XContentParser + ?: throw IllegalArgumentException("Missing parser() on ${rawResponse::class.simpleName}") + val parsed = R::class.java.getMethod("fromXContent", XContentParser::class.java) + .invoke(null, parser) as? R + ?: throw IllegalArgumentException("Failed to parse response into ${R::class.simpleName}") + return parsed + } catch (e: Throwable) { + throw handleException(e) + } +} + +/** + * Centralized Method for Handling different types of exceptions, + * transforming them into appropriate OpenSearch exceptions + * + * @param exception The exception to process + * @return The appropriate matching exception found + */ +fun handleException(exception: Throwable): Throwable { + if (isVersionConflict(exception)) { + return OpenSearchStatusException( + exception.message ?: "Version conflict occurred", + RestStatus.CONFLICT + ) + } + return exception +} + +/** + * Checks if an exception represents a version conflict. + * + * @param exception The exception to check. + * @return True if the exception or its cause is a [VersionConflictEngineException], false otherwise. + */ +private fun isVersionConflict(exception: Throwable): Boolean { + val cause = exception.cause ?: exception + return cause is VersionConflictEngineException || + (cause is OpenSearchStatusException && cause.cause is VersionConflictEngineException) +}