Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertV2Expirer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_HISTORY_WRITE_INDEX
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.cluster.ClusterChangedEvent
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.AlertV2.Companion.EXPIRATION_TIME_FIELD
import org.opensearch.core.action.ActionListener
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.threadpool.Scheduler
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.client.Client
import java.time.Instant
import java.util.concurrent.TimeUnit

private val logger = LogManager.getLogger(AlertV2Expirer::class.java)

class AlertV2Expirer(
private val client: Client,
private val threadPool: ThreadPool,
private val clusterService: ClusterService,
) : ClusterStateListener {

init {
clusterService.addListener(this)
}

@Volatile private var isClusterManager = false

private var alertIndexInitialized = false

private var alertHistoryIndexInitialized = false

private var scheduledAlertsV2CheckAndExpire: Scheduler.Cancellable? = null

private val executorName = ThreadPool.Names.MANAGEMENT

private val checkForExpirationInterval = TimeValue(1L, TimeUnit.MINUTES)

override fun clusterChanged(event: ClusterChangedEvent) {
if (this.isClusterManager != event.localNodeClusterManager()) {
this.isClusterManager = event.localNodeClusterManager()
if (this.isClusterManager) {
onManager()
} else {
offManager()
}
}

alertIndexInitialized = event.state().routingTable().hasIndex(ALERT_INDEX)
alertHistoryIndexInitialized = event.state().metadata().hasAlias(ALERT_HISTORY_WRITE_INDEX)
}

fun onManager() {
try {
// try to sweep current AlertV2s for expiration immediately as we might be restarting the cluster
expireAlertV2s()
// schedule expiration checks and expirations to happen repeatedly at some interval
scheduledAlertsV2CheckAndExpire = threadPool
.scheduleWithFixedDelay({ expireAlertV2s() }, checkForExpirationInterval, executorName)
} catch (e: Exception) {
// This should be run on cluster startup
logger.error(
"Error sweeping AlertV2s for expiration. This cannot be done until clustermanager node is restarted.",
e
)
}
}

fun offManager() {
scheduledAlertsV2CheckAndExpire?.cancel()
}

private fun expireAlertV2s() {
if (!areAlertsIndicesPresent()) {
return
}

try {
val deleteByQuery = QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD)
.lte(Instant.now().toEpochMilli())

DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(ALL_ALERT_INDEX_PATTERN)
.filter(deleteByQuery)
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) {
logger.info("Deleted ${response.deleted} expired alerts")
}
override fun onFailure(e: Exception) {
logger.error("Failed to delete expired alerts", e)
}
}
)
} catch (e: Exception) {
logger.error("Error during alert cleanup", e)
}
}

private fun areAlertsIndicesPresent(): Boolean {
return alertIndexInitialized || alertHistoryIndexInitialized
}
}
109 changes: 62 additions & 47 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,20 @@ import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsV2Handler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorV2Action
import org.opensearch.alerting.resthandler.RestExecuteMonitorV2Action
import org.opensearch.alerting.resthandler.RestGetAlertsV2Action
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexAlertingCommentAction
import org.opensearch.alerting.resthandler.RestGetMonitorV2Action
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.resthandler.RestIndexMonitorV2Action
import org.opensearch.alerting.resthandler.RestSearchMonitorV2Action
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
Expand All @@ -63,26 +49,31 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteMonitorV2Action
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteMonitorV2Action
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetMonitorV2Action
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexAlertingCommentAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexMonitorV2Action
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.transport.TransportSearchMonitorV2Action
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
Expand All @@ -103,6 +94,7 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorV2
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
Expand Down Expand Up @@ -157,6 +149,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val OPEN_SEARCH_DASHBOARDS_USER_AGENT = "OpenSearch-Dashboards"
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val MONITOR_V2_BASE_URI = "/_plugins/_alerting/v2/monitors"
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
Expand All @@ -169,7 +162,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val COMMENTS_BASE_URI = "/_plugins/_alerting/comments"

@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow")
@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow", "monitor_v2")
}

lateinit var runner: MonitorRunnerService
Expand All @@ -182,6 +175,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
lateinit var alertV2Expirer: AlertV2Expirer
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
Expand All @@ -194,35 +188,46 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
nodesInCluster: Supplier<DiscoveryNodes>
): List<RestHandler> {
return listOf(
RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
RestIndexWorkflowAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestExecuteWorkflowAction(),
RestAcknowledgeAlertAction(),
RestAcknowledgeChainedAlertAction(),
RestScheduledJobStatsHandler("_alerting"),
RestSearchEmailAccountAction(),
RestGetEmailAccountAction(),
RestSearchEmailGroupAction(),
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetWorkflowAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
// Alerting V1
// RestGetMonitorAction(),
// RestDeleteMonitorAction(),
// RestIndexMonitorAction(),
// RestIndexWorkflowAction(),
// RestSearchMonitorAction(settings, clusterService),
// RestExecuteMonitorAction(),
// RestExecuteWorkflowAction(),
// RestAcknowledgeAlertAction(),
// RestAcknowledgeChainedAlertAction(),
// RestScheduledJobStatsHandler("_alerting"),
// RestSearchEmailAccountAction(),
// RestGetEmailAccountAction(),
// RestSearchEmailGroupAction(),
// RestGetEmailGroupAction(),
// RestGetDestinationsAction(),
// RestGetAlertsAction(),
// RestGetWorkflowAlertsAction(),
// RestGetFindingsAction(),
// RestGetWorkflowAction(),
// RestDeleteWorkflowAction(),
// RestGetRemoteIndexesAction(),
// RestIndexAlertingCommentAction(),
// RestSearchAlertingCommentAction(),
// RestDeleteAlertingCommentAction(),

// Alerting V2
RestIndexMonitorV2Action(),
RestExecuteMonitorV2Action(),
RestDeleteMonitorV2Action(),
RestGetMonitorV2Action(),
RestSearchMonitorV2Action(settings, clusterService),
RestGetAlertsV2Action(),
RestScheduledJobStatsV2Handler()
)
}

override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
return listOf(
// Alerting V1
ActionPlugin.ActionHandler(ScheduledJobsStatsAction.INSTANCE, ScheduledJobsStatsTransportAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_MONITOR_ACTION_TYPE, TransportIndexMonitorAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_MONITOR_ACTION_TYPE, TransportGetMonitorAction::class.java),
Expand All @@ -249,13 +254,21 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java),

// Alerting V2
ActionPlugin.ActionHandler(AlertingActions.INDEX_MONITOR_V2_ACTION_TYPE, TransportIndexMonitorV2Action::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_MONITOR_V2_ACTION_TYPE, TransportGetMonitorV2Action::class.java),
ActionPlugin.ActionHandler(AlertingActions.SEARCH_MONITORS_V2_ACTION_TYPE, TransportSearchMonitorV2Action::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_MONITOR_V2_ACTION_TYPE, TransportDeleteMonitorV2Action::class.java),
ActionPlugin.ActionHandler(AlertingActions.EXECUTE_MONITOR_V2_ACTION_TYPE, TransportExecuteMonitorV2Action::class.java),
)
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(
Monitor.XCONTENT_REGISTRY,
MonitorV2.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
Expand Down Expand Up @@ -322,6 +335,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
alertV2Expirer = AlertV2Expirer(client, threadPool, clusterService)
this.threadPool = threadPool
this.clusterService = clusterService

Expand Down Expand Up @@ -349,6 +363,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
alertV2Expirer,
lockService,
alertService,
triggerService
Expand Down
Loading
Loading