Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@

package org.opensearch.commons.alerting.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.model.Alert.Companion.NO_VERSION
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.seqno.SequenceNumbers
import java.io.EOFException
import java.io.IOException
import java.time.Instant
import java.time.temporal.ChronoUnit

class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

val monitor: Monitor
val dryRun: Boolean
val monitorMetadata: MonitorMetadata
Expand All @@ -28,6 +38,11 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
val shardIds: List<ShardId>
val concreteIndicesSeenSoFar: List<String>
val workflowRunContext: WorkflowRunContext?
val hasSerializationFailed: Boolean

companion object {
val log = LogManager.getLogger(DocLevelMonitorFanOutRequest::class.java)
}

constructor(
monitor: Monitor,
Expand All @@ -47,21 +62,70 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {
this.shardIds = shardIds
this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar
this.workflowRunContext = workflowRunContext
require(false == shardIds.isEmpty()) { }
this.hasSerializationFailed = false
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitor = Monitor.readFrom(sin)!!,
dryRun = sin.readBoolean(),
monitorMetadata = MonitorMetadata.readFrom(sin),
executionId = sin.readString(),
shardIds = sin.readList(::ShardId),
concreteIndicesSeenSoFar = sin.readStringList(),
workflowRunContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else { null },
indexExecutionContext = IndexExecutionContext(sin)
constructor(sin: StreamInput) : super() {
var monitorSerializationSucceeded = true
var parsedMonitor = getDummyMonitor()
var parsedDryRun = false
var parsedMonitorMetadata: MonitorMetadata = MonitorMetadata(
"failed_serde",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
"failed_serde",
emptyList(),
emptyMap(),
mutableMapOf()
)
var parsedShardIds: List<ShardId> = emptyList()
var parsedConcreteIndicesSeenSoFar = mutableListOf<String>()
var parsedExecutionId: String = ""
var parsedWorkflowContext: WorkflowRunContext? = null
var parsedIndexExecutionContext: IndexExecutionContext? = null
try {
parsedMonitor = Monitor(sin)
parsedDryRun = sin.readBoolean()
parsedMonitorMetadata = MonitorMetadata.readFrom(sin)
parsedExecutionId = sin.readString()
parsedShardIds = sin.readList(::ShardId)
parsedConcreteIndicesSeenSoFar = sin.readStringList()
parsedWorkflowContext = if (sin.readBoolean()) {
WorkflowRunContext(sin)
} else {
null
}
parsedIndexExecutionContext = IndexExecutionContext(sin)
} catch (e: Exception) {
log.error("Error parsing monitor in Doc level monitor fanout request", e)
monitorSerializationSucceeded = false
log.info("Force consuming stream in Doc level monitor fanout request")
while (sin.read() != 0) {
// read and discard bytes until stream is entirely consumed
try {
sin.readByte()
} catch (_: EOFException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log here as well?

}
}
}

this.monitor = parsedMonitor
this.dryRun = parsedDryRun
this.monitorMetadata = parsedMonitorMetadata
this.executionId = parsedExecutionId
this.shardIds = parsedShardIds
this.concreteIndicesSeenSoFar = parsedConcreteIndicesSeenSoFar
this.workflowRunContext = parsedWorkflowContext
this.indexExecutionContext = parsedIndexExecutionContext
this.hasSerializationFailed = false == monitorSerializationSucceeded
}

private fun getDummyMonitor() = Monitor(
"failed_serde", NO_VERSION, "failed_serde", true,
IntervalSchedule(1, ChronoUnit.MINUTES), Instant.now(), Instant.now(), "",
null, NO_SCHEMA_VERSION, emptyList(), emptyList(), emptyMap(),
DataSources(), false, false, "failed"
)

@Throws(IOException::class)
Expand All @@ -88,14 +152,14 @@ class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject {

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
return builder.startObject()
.field("monitor", monitor)
.field("dry_run", dryRun)
.field("execution_id", executionId)
.field("index_execution_context", indexExecutionContext)
.field("shard_ids", shardIds)
.field("concrete_indices", concreteIndicesSeenSoFar)
.field("workflow_run_context", workflowRunContext)
return builder.endObject()
.endObject()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.ActionExecutionTime
Expand Down Expand Up @@ -88,6 +90,8 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertEquals(sin.read(), 0)
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
}

@Test
Expand Down Expand Up @@ -150,5 +154,65 @@ class DocLevelMonitorFanOutRequestTests {
assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext)
assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds)
assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext)
assertFalse(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
assertEquals(sin.read().toString(), sin.read(), 0)
}

@Test
fun `test serde failure returning dummy object instead of exception`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("return true"))
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
val monitorMetadata = MonitorMetadata(
"test",
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Monitor.NO_ID,
listOf(ActionExecutionTime("", Instant.now())),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001")
)
val indexExecutionContext = IndexExecutionContext(
listOf(docQuery),
mutableMapOf("index" to mutableMapOf("1" to "1")),
mutableMapOf("index" to mutableMapOf("1" to "1")),
"test-index",
"test-index",
listOf("test-index"),
listOf("test-index"),
listOf("test-field"),
listOf("1", "2")
)
val workflowRunContext = WorkflowRunContext(
Workflow.NO_ID,
Workflow.NO_ID,
Monitor.NO_ID,
mutableMapOf("index" to listOf("1")),
true
)
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
false,
monitorMetadata,
UUID.randomUUID().toString(),
indexExecutionContext,
listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)),
listOf("test-index"),
workflowRunContext
)
val out = BytesStreamOutput()
out.writeString(UUID.randomUUID().toString())
docLevelMonitorFanOutRequest.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin)
assertEquals(sin.read(), 0)
assertTrue(newDocLevelMonitorFanOutRequest.hasSerializationFailed)
}
}