Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.Version
import org.opensearch.action.ActionListenerResponseHandler
import org.opensearch.action.support.GroupedActionListener
import org.opensearch.alerting.util.IndexUtils
Expand Down Expand Up @@ -582,10 +581,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return allShards.filter { it.primary() }.size
}

private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map<String, DiscoveryNode> {
return monitorCtx.clusterService!!.state().nodes.dataNodes.filter { it.value.version >= Version.CURRENT }
}

private fun distributeShards(
monitorCtx: MonitorRunnerExecutionContext,
allNodes: List<String>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.Version
import org.opensearch.cluster.node.DiscoveryNode

private val logger: Logger = LogManager.getLogger("FanOutEligibility")

fun isNodeEligibleForFanOut(
candidateNode: DiscoveryNode,
coordinatorNode: DiscoveryNode,
): Boolean {
try {
val candidateNodeAttributes: Map<String, Any> = candidateNode.attributes
val coordinatorNodeAttributes: Map<String, Any> = coordinatorNode.attributes
if (candidateNodeAttributes.containsKey("di_number") && candidateNodeAttributes["di_number"] != null &&
coordinatorNodeAttributes.containsKey("di_number") && candidateNodeAttributes["di_number"] != null
Comment on lines +17 to +18
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking:
I think the use of containsKey and != null would be redundant in this context.

) {
return candidateNodeAttributes["di_number"] as Int >= coordinatorNodeAttributes["di_number"] as Int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a downside to keeping the fanout fully within the same DI (== instead of >=)? That seems like a safer approach than relying on backwards compatibility in newer DIs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we want this to instead be an equalTo operator, or are we confident that the coodinator node will always be in the old di?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup going with ==

}
} catch (e: Exception) {
logger.error("Error in isNodeEligibleForFanOut criteria evaluation", e)
return true
}
return true
}

fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map<String, DiscoveryNode> {
val clusterService = monitorCtx.clusterService!!
return clusterService.state().nodes.dataNodes.filter {
it.value.version >= Version.CURRENT &&
isNodeEligibleForFanOut(it.value, clusterService.localNode())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ package org.opensearch.alerting.remote.monitors

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.Version
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
import org.opensearch.alerting.getNodes
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse
Expand Down Expand Up @@ -207,10 +206,6 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() {
}
}

private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map<String, DiscoveryNode> {
return monitorCtx.clusterService!!.state().nodes.dataNodes.filter { it.value.version >= Version.CURRENT }
}

private fun distributeShards(
monitorCtx: MonitorRunnerExecutionContext,
allNodes: List<String>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,22 @@ class TransportDocLevelMonitorFanOutAction
listener: ActionListener<DocLevelMonitorFanOutResponse>
) {
try {
// handle situations where coordinator node is not on the same opensearch version or
// from the same domain instance and return no-op response instead of failing entire fan-out
if (request.indexExecutionContext == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking this specific field for null seems brittle. Is there a more definitive way to see if the deserialization failed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

listener.onResponse(
DocLevelMonitorFanOutResponse(
nodeId = clusterService.localNode().id,
executionId = request.executionId,
monitorId = "",
lastRunContexts = mutableMapOf(),
inputResults = InputRunResults(emptyList()),
triggerResults = emptyMap(),
exception = null
)
)
return
}
val monitor = request.monitor
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, Instant.now(), Instant.now())
val updatedIndexNames = request.indexExecutionContext!!.updatedIndexNames
Expand Down
Loading