Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@

package org.opensearch.commons.alerting.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.Companion.NO_VERSION
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.seqno.SequenceNumbers
import java.io.EOFException
import java.io.IOException
import java.time.Instant
import java.time.temporal.ChronoUnit

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

val monitor: Monitor
val dryRun: Boolean
val monitorMetadata: MonitorMetadata
Expand All @@ -28,6 +38,141 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
val shardIds: List<ShardId>
val concreteIndicesSeenSoFar: List<String>
val workflowRunContext: WorkflowRunContext?
val hasSerializationFailed: Boolean

init {
serializationFailedFlag = false
}
companion object {
// flag flipped to true whenever a safeRead*() method fails to serialize a field correctly
private var serializationFailedFlag: Boolean = false
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
private fun safeReadMonitor(sin: StreamInput): Monitor =
try {
Monitor.readFrom(sin)!!
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing monitor in Doc level monitor fanout request", e)
Monitor(
"failed_serde", NO_VERSION, "failed_serde", true,
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
DataSources(), false, false, "failed"
)
}

private fun safeReadBoolean(sin: StreamInput): Boolean =
try {
sin.readBoolean()
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing boolean in Doc level monitor fanout request", e)
false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it affect execution if we end up with the default value for these fields? It seems like we need some way to signal that the fanout should not execute if we encounter a deserialization error like this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not gonna use these fields. the alerting plugin will look for null check on index execution context opensearch-project/alerting#1815

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a flag to indicate serialization failed if any of the safeRead*() methods fail

}

private fun safeReadMonitorMetadata(sin: StreamInput): MonitorMetadata =
try {
MonitorMetadata.readFrom(sin)
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing monitor in Doc level monitor fanout request", e)
MonitorMetadata(
"failed_serde",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
"failed_serde",
emptyList(),
emptyMap(),
mutableMapOf()
)
}

private fun safeReadString(sin: StreamInput): String =
try {
sin.readString()
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing string in Doc level monitor fanout request", e)
""
}

private fun safeReadShardIds(sin: StreamInput): List<ShardId> =
try {
sin.readList(::ShardId)
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing shardId list in Doc level monitor fanout request", e)
emptyList()
}

private fun safeReadStringList(sin: StreamInput): List<String> =
try {
sin.readStringList()
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing string list in Doc level monitor fanout request", e)
emptyList()
}

private fun safeReadWorkflowRunContext(sin: StreamInput): WorkflowRunContext? =
try {
if (sin.readBoolean()) WorkflowRunContext(sin) else null
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing workflow context in Doc level monitor fanout request", e)
null
}

private fun safeReadIndexExecutionContext(sin: StreamInput): IndexExecutionContext? {
var indexExecutionContext: IndexExecutionContext? = null
return try {
indexExecutionContext = IndexExecutionContext(sin)
while (sin.read() != 0) {
serializationFailedFlag = true
// read and discard bytes until stream is entirely consumed
try {
sin.readByte()
} catch (_: EOFException) {
}
}
return indexExecutionContext
} catch (e: EOFException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set serializationFailedFlag = true in this catch as well?

indexExecutionContext
} catch (e: Exception) {
serializationFailedFlag = true
log.error("Error parsing index execution context in Doc level monitor fanout request", e)
while (sin.read() != 0) {
try { // read and throw bytes until stream is entirely consumed
sin.readByte()
} catch (_: EOFException) {
}
}
null
}
}
}

private constructor(
monitor: Monitor,
dryRun: Boolean,
monitorMetadata: MonitorMetadata,
executionId: String,
indexExecutionContext: IndexExecutionContext?,
shardIds: List<ShardId>,
concreteIndicesSeenSoFar: List<String>,
workflowRunContext: WorkflowRunContext?,
hasSerializationFailed: Boolean
) : super() {
this.monitor = monitor
this.dryRun = dryRun
this.monitorMetadata = monitorMetadata
this.executionId = executionId
this.indexExecutionContext = indexExecutionContext
this.shardIds = shardIds
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
this.workflowRunContext = workflowRunContext
this.hasSerializationFailed = hasSerializationFailed ?: false
}

constructor(
monitor: Monitor,
Expand All @@ -47,21 +192,20 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
this.shardIds = shardIds
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
this.workflowRunContext = workflowRunContext
require(false == shardIds.isEmpty()) { }
this.hasSerializationFailed = false
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitor = Monitor.readFrom(sin)!!,
dryRun = sin.readBoolean(),
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
shardIds = sin.readList(::ShardId),
concreteIndicesSeenSoFar = sin.readStringList(),
workflowRunContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else { null },
indexExecutionContext = IndexExecutionContext(sin)
monitor = safeReadMonitor(sin),
dryRun = safeReadBoolean(sin),
monitorMetadata = safeReadMonitorMetadata(sin),
executionId = safeReadString(sin),
shardIds = safeReadShardIds(sin),
concreteIndicesSeenSoFar = safeReadStringList(sin),
workflowRunContext = safeReadWorkflowRunContext(sin),
indexExecutionContext = safeReadIndexExecutionContext(sin),
hasSerializationFailed = serializationFailedFlag
)

@Throws(IOException::class)
Expand All @@ -88,14 +232,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
return builder.startObject()
.field("monitor", monitor)
.field("dry_run", dryRun)
.field("execution_id", executionId)
.field("index_execution_context", indexExecutionContext)
.field("shard_ids", shardIds)
.field("concrete_indices", concreteIndicesSeenSoFar)
.field("workflow_run_context", workflowRunContext)
return builder.endObject()
.endObject()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.ActionExecutionTime
Expand Down Expand Up @@ -88,6 +91,8 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertEquals(sin.read(), 0)
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
}

@Test
Expand Down Expand Up @@ -150,5 +155,130 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
assertEquals(sin.read().toString(), sin.read(), 0)
}

@Test
fun `test serde failure returning dummy object instead of exception`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
val monitorMetadata = MonitorMetadata(
"test",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Monitor.NO_ID,
listOf(ActionExecutionTime("", Instant.now())),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
)
val indexExecutionContext = IndexExecutionContext(
listOf(docQuery),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("index" to mutableMapOf("1" to "1")),
"test-index",
"test-index",
listOf("test-index"),
listOf("test-index"),
listOf("test-field"),
listOf("1", "2")
)
val workflowRunContext = WorkflowRunContext(
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
mutableMapOf("index" to listOf("1")),
true,
listOf("finding1")
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
false,
monitorMetadata,
UUID.randomUUID().toString(),
indexExecutionContext,
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
listOf("test-index"),
workflowRunContext
)
val out = BytesStreamOutput()
monitor.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
assertNull(newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
assertEquals(sin.read(), -1)
}

@Test
fun `test doc level monitor fan out request as stream when there are additional bytes left to handle`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
val monitorMetadata = MonitorMetadata(
"test",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Monitor.NO_ID,
listOf(ActionExecutionTime("", Instant.now())),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
)
val indexExecutionContext = IndexExecutionContext(
listOf(docQuery),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("index" to mutableMapOf("1" to "1")),
"test-index",
"test-index",
listOf("test-index"),
listOf("test-index"),
listOf("test-field"),
listOf("1", "2")
)
val workflowRunContext = WorkflowRunContext(
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
mutableMapOf("index" to listOf("1")),
true
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
false,
monitorMetadata,
UUID.randomUUID().toString(),
indexExecutionContext,
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
listOf("test-index"),
workflowRunContext
)
val out = BytesStreamOutput()
docLevelMonitorFanOutRequest.writeTo(out)
out.writeByte(Byte.MIN_VALUE)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor)
assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId)
assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata)
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertEquals(sin.read(), 0)
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
}
}