From 29acdad0b19feb3100aa5199494ce32407460c2a Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 21 Mar 2025 16:08:43 -0700 Subject: [PATCH] change monitor fan out logic to handle empty request and checking node attributes before fanning out Signed-off-by: Surya Sashank Nistala --- .../alerting/DocumentLevelMonitorRunner.kt | 5 --- .../opensearch/alerting/MonitorFanOutUtils.kt | 35 +++++++++++++++++++ .../RemoteDocumentLevelMonitorRunner.kt | 7 +--- .../TransportDocLevelMonitorFanOutAction.kt | 16 +++++++++ 4 files changed, 52 insertions(+), 11 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/MonitorFanOutUtils.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 903f2ecc7..65a6eb62d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -7,7 +7,6 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper -import org.opensearch.Version import org.opensearch.action.ActionListenerResponseHandler import org.opensearch.action.support.GroupedActionListener import org.opensearch.alerting.util.IndexUtils @@ -582,10 +581,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map { - return monitorCtx.clusterService!!.state().nodes.dataNodes.filter { it.value.version >= Version.CURRENT } - } - private fun distributeShards( monitorCtx: MonitorRunnerExecutionContext, allNodes: List, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorFanOutUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorFanOutUtils.kt new file mode 100644 index 000000000..88f710445 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorFanOutUtils.kt @@ -0,0 +1,35 @@ +package org.opensearch.alerting + +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.Logger +import org.opensearch.Version +import org.opensearch.cluster.node.DiscoveryNode + +private val logger: Logger = LogManager.getLogger("FanOutEligibility") + +fun isNodeEligibleForFanOut( + candidateNode: DiscoveryNode, + coordinatorNode: DiscoveryNode, +): Boolean { + try { + val candidateNodeAttributes: Map = candidateNode.attributes + val coordinatorNodeAttributes: Map = coordinatorNode.attributes + if (candidateNodeAttributes.containsKey("di_number") && candidateNodeAttributes["di_number"] != null && + coordinatorNodeAttributes.containsKey("di_number") && candidateNodeAttributes["di_number"] != null + ) { + return candidateNodeAttributes["di_number"] as Int >= coordinatorNodeAttributes["di_number"] as Int + } + } catch (e: Exception) { + logger.error("Error in isNodeEligibleForFanOut criteria evaluation", e) + return true + } + return true +} + +fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map { + val clusterService = monitorCtx.clusterService!! + return clusterService.state().nodes.dataNodes.filter { + it.value.version >= Version.CURRENT && + isNodeEligibleForFanOut(it.value, clusterService.localNode()) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt index c74b4e23c..54daff6fa 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt @@ -7,13 +7,12 @@ package org.opensearch.alerting.remote.monitors import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper -import org.opensearch.Version import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.MonitorRunner import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.getNodes import org.opensearch.alerting.util.IndexUtils import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.node.DiscoveryNode import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse @@ -207,10 +206,6 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { } } - private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map { - return monitorCtx.clusterService!!.state().nodes.dataNodes.filter { it.value.version >= Version.CURRENT } - } - private fun distributeShards( monitorCtx: MonitorRunnerExecutionContext, allNodes: List, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 1e5990a40..97a670e37 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -207,6 +207,22 @@ class TransportDocLevelMonitorFanOutAction listener: ActionListener ) { try { + // handle situations where coordinator node is not on the same opensearch version or + // from the same domain instance and return no-op response instead of failing entire fan-out + if (request.indexExecutionContext == null) { + listener.onResponse( + DocLevelMonitorFanOutResponse( + nodeId = clusterService.localNode().id, + executionId = request.executionId, + monitorId = "", + lastRunContexts = mutableMapOf(), + inputResults = InputRunResults(emptyList()), + triggerResults = emptyMap(), + exception = null + ) + ) + return + } val monitor = request.monitor var monitorResult = MonitorRunResult(monitor.name, Instant.now(), Instant.now()) val updatedIndexNames = request.indexExecutionContext!!.updatedIndexNames