Skip to content

Commit c5425a5

Browse files
Index Management Action Metrics (#1195)
* Initial integration of TelemetryAwarePlugin to ISM Signed-off-by: harycash <[email protected]> * Initial integration of TelemetryAwarePlugin to ISM Signed-off-by: harycash <[email protected]> * Initial integration of TelemetryAwarePlugin to ISM with Rollover Step Integration Signed-off-by: harycash <[email protected]> * Initial integration of TelemetryAwarePlugin to ISM with Rollover Step Integration Signed-off-by: harycash <[email protected]> * Additional actions metrics with requested changes from previous commit Signed-off-by: harycash <[email protected]> * Fixed Build Issues Signed-off-by: harycash <[email protected]> * Fixed Build Issues Signed-off-by: harycash <[email protected]> * Fixed Build Issues, Added new metric : Cumulative Latency Signed-off-by: harycash <[email protected]> * Fixed Build Issues, Added new metric : Cumulative Latency Signed-off-by: harycash <[email protected]> * Requested Changes and Addition of Metrics to all the remaining Actions Signed-off-by: harycash <[email protected]> * Updates on Action Metrics Signed-off-by: harycash <[email protected]> * Updates on Action Metrics Signed-off-by: harycash <[email protected]> * Build issues fixed Signed-off-by: harycash <[email protected]> * Build issues fixed Signed-off-by: harycash <[email protected]> --------- Signed-off-by: harycash <[email protected]> Co-authored-by: harycash <[email protected]>
1 parent 52f331f commit c5425a5

28 files changed

+1339
-73
lines changed

spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger
99
import org.opensearch.core.common.io.stream.StreamInput
1010
import org.opensearch.core.common.io.stream.StreamOutput
1111
import org.opensearch.core.common.io.stream.Writeable
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
1213
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
1314
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
1415
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
@@ -27,12 +28,109 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
2728

2829
abstract suspend fun execute(): Step
2930

30-
fun postExecute(logger: Logger): Step {
31+
fun postExecute(
32+
logger: Logger,
33+
indexManagementActionMetrics: IndexManagementActionsMetrics,
34+
step: Step,
35+
startingManagedIndexMetaData: ManagedIndexMetaData,
36+
): Step {
3137
logger.info("Finished executing $name for ${context?.metadata?.index}")
38+
val updatedStepMetaData = step.getUpdatedManagedIndexMetadata(startingManagedIndexMetaData)
39+
emitTelemetry(indexManagementActionMetrics, updatedStepMetaData, logger)
3240
this.context = null
3341
return this
3442
}
3543

44+
private fun emitTelemetry(
45+
indexManagementActionMetrics: IndexManagementActionsMetrics,
46+
updatedStepMetaData: ManagedIndexMetaData,
47+
logger: Logger,
48+
) {
49+
when (context?.metadata?.actionMetaData?.name) {
50+
IndexManagementActionsMetrics.ROLLOVER -> indexManagementActionMetrics.getActionMetrics(
51+
IndexManagementActionsMetrics.ROLLOVER,
52+
)
53+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
54+
55+
IndexManagementActionsMetrics.FORCE_MERGE -> indexManagementActionMetrics.getActionMetrics(
56+
IndexManagementActionsMetrics.FORCE_MERGE,
57+
)
58+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
59+
60+
IndexManagementActionsMetrics.DELETE -> indexManagementActionMetrics.getActionMetrics(
61+
IndexManagementActionsMetrics.DELETE,
62+
)
63+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
64+
65+
IndexManagementActionsMetrics.REPLICA_COUNT -> indexManagementActionMetrics.getActionMetrics(
66+
IndexManagementActionsMetrics.REPLICA_COUNT,
67+
)
68+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
69+
70+
IndexManagementActionsMetrics.TRANSITION -> indexManagementActionMetrics.getActionMetrics(
71+
IndexManagementActionsMetrics.TRANSITION,
72+
)
73+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
74+
75+
IndexManagementActionsMetrics.NOTIFICATION -> indexManagementActionMetrics.getActionMetrics(
76+
IndexManagementActionsMetrics.NOTIFICATION,
77+
)
78+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
79+
80+
IndexManagementActionsMetrics.CLOSE -> indexManagementActionMetrics.getActionMetrics(
81+
IndexManagementActionsMetrics.CLOSE,
82+
)
83+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
84+
85+
IndexManagementActionsMetrics.SET_INDEX_PRIORITY -> indexManagementActionMetrics.getActionMetrics(
86+
IndexManagementActionsMetrics.SET_INDEX_PRIORITY, // problem in test
87+
)
88+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
89+
90+
IndexManagementActionsMetrics.OPEN -> indexManagementActionMetrics.getActionMetrics(
91+
IndexManagementActionsMetrics.OPEN,
92+
)
93+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
94+
95+
IndexManagementActionsMetrics.MOVE_SHARD -> indexManagementActionMetrics.getActionMetrics(
96+
IndexManagementActionsMetrics.MOVE_SHARD,
97+
)
98+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
99+
100+
IndexManagementActionsMetrics.SET_READ_ONLY -> indexManagementActionMetrics.getActionMetrics(
101+
IndexManagementActionsMetrics.SET_READ_ONLY,
102+
)
103+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
104+
105+
IndexManagementActionsMetrics.SHRINK -> indexManagementActionMetrics.getActionMetrics(
106+
IndexManagementActionsMetrics.SHRINK,
107+
)
108+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
109+
110+
IndexManagementActionsMetrics.SNAPSHOT -> indexManagementActionMetrics.getActionMetrics(
111+
IndexManagementActionsMetrics.SNAPSHOT,
112+
)
113+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
114+
115+
IndexManagementActionsMetrics.ALIAS_ACTION -> indexManagementActionMetrics.getActionMetrics(
116+
IndexManagementActionsMetrics.ALIAS_ACTION,
117+
)
118+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
119+
120+
IndexManagementActionsMetrics.ALLOCATION -> indexManagementActionMetrics.getActionMetrics(
121+
IndexManagementActionsMetrics.ALLOCATION,
122+
)
123+
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
124+
125+
else -> {
126+
logger.info(
127+
"Action Metrics is not supported for this action [%s]",
128+
context?.metadata?.actionMetaData?.name,
129+
)
130+
}
131+
}
132+
}
133+
36134
abstract fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData
37135

38136
abstract fun isIdempotent(): Boolean
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics
7+
8+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AliasActionMetrics
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AllocationActionMetrics
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
13+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.MoveShardActionMetrics
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics
16+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics
17+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
18+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics
19+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics
20+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ShrinkActionMetrics
21+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics
22+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics
23+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
24+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
25+
import org.opensearch.telemetry.metrics.MetricsRegistry
26+
import org.opensearch.telemetry.metrics.tags.Tags
27+
28+
abstract class ActionMetrics {
29+
abstract val actionName: String
30+
31+
fun createTags(context: StepContext): Tags {
32+
val tags = Tags.create()
33+
.addTag("index_name", context.metadata.index)
34+
.addTag("policy_id", context.metadata.policyID)
35+
.addTag("node_id", context.clusterService.nodeName ?: "")
36+
.addTag("index_uuid", context.metadata.indexUuid)
37+
return tags
38+
}
39+
40+
abstract fun emitMetrics(
41+
context: StepContext,
42+
indexManagementActionsMetrics: IndexManagementActionsMetrics,
43+
stepMetaData: StepMetaData?,
44+
)
45+
}
46+
47+
class IndexManagementActionsMetrics private constructor() {
48+
private lateinit var metricsRegistry: MetricsRegistry
49+
private lateinit var actionMetricsMap: Map<String, ActionMetrics>
50+
51+
companion object {
52+
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }
53+
54+
const val ROLLOVER = "rollover"
55+
const val NOTIFICATION = "notification"
56+
const val FORCE_MERGE = "force_merge"
57+
const val DELETE = "delete"
58+
const val REPLICA_COUNT = "replica_count"
59+
const val TRANSITION = "transition"
60+
const val CLOSE = "close"
61+
const val SET_INDEX_PRIORITY = "set_index_priority"
62+
const val OPEN = "open"
63+
const val MOVE_SHARD = "move_shard"
64+
const val SET_READ_ONLY = "set_read_only"
65+
const val SHRINK = "shrink"
66+
const val SNAPSHOT = "snapshot"
67+
const val ALIAS_ACTION = "alias_action"
68+
const val ALLOCATION = "allocation"
69+
70+
private object HOLDER {
71+
val instance = IndexManagementActionsMetrics()
72+
}
73+
}
74+
75+
fun initialize(metricsRegistry: MetricsRegistry) {
76+
this.metricsRegistry = metricsRegistry
77+
78+
RolloverActionMetrics.instance.initializeCounters(metricsRegistry)
79+
NotificationActionMetrics.instance.initializeCounters(metricsRegistry)
80+
ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry)
81+
DeleteActionMetrics.instance.initializeCounters(metricsRegistry)
82+
ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry)
83+
TransitionActionMetrics.instance.initializeCounters(metricsRegistry)
84+
CloseActionMetrics.instance.initializeCounters(metricsRegistry)
85+
SetIndexPriorityActionMetrics.instance.initializeCounters(metricsRegistry)
86+
OpenActionMetrics.instance.initializeCounters(metricsRegistry)
87+
MoveShardActionMetrics.instance.initializeCounters(metricsRegistry)
88+
SetReadOnlyActionMetrics.instance.initializeCounters(metricsRegistry)
89+
ShrinkActionMetrics.instance.initializeCounters(metricsRegistry)
90+
SnapshotActionMetrics.instance.initializeCounters(metricsRegistry)
91+
AliasActionMetrics.instance.initializeCounters(metricsRegistry)
92+
AllocationActionMetrics.instance.initializeCounters(metricsRegistry)
93+
94+
actionMetricsMap = mapOf(
95+
ROLLOVER to RolloverActionMetrics.instance,
96+
NOTIFICATION to NotificationActionMetrics.instance,
97+
FORCE_MERGE to ForceMergeActionMetrics.instance,
98+
DELETE to DeleteActionMetrics.instance,
99+
REPLICA_COUNT to ReplicaCountActionMetrics.instance,
100+
TRANSITION to TransitionActionMetrics.instance,
101+
CLOSE to CloseActionMetrics.instance,
102+
SET_INDEX_PRIORITY to SetIndexPriorityActionMetrics.instance,
103+
OPEN to OpenActionMetrics.instance,
104+
MOVE_SHARD to MoveShardActionMetrics.instance,
105+
SET_READ_ONLY to SetReadOnlyActionMetrics.instance,
106+
SHRINK to ShrinkActionMetrics.instance,
107+
SNAPSHOT to SnapshotActionMetrics.instance,
108+
ALIAS_ACTION to AliasActionMetrics.instance,
109+
ALLOCATION to AllocationActionMetrics.instance,
110+
)
111+
}
112+
113+
fun getActionMetrics(actionName: String): ActionMetrics? {
114+
return actionMetricsMap[actionName]
115+
}
116+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics
7+
8+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
13+
import org.opensearch.telemetry.metrics.Counter
14+
import org.opensearch.telemetry.metrics.MetricsRegistry
15+
16+
class AliasActionMetrics private constructor() : ActionMetrics() {
17+
override val actionName: String = IndexManagementActionsMetrics.ALIAS_ACTION
18+
lateinit var successes: Counter
19+
lateinit var failures: Counter
20+
lateinit var cumulativeLatency: Counter
21+
22+
fun initializeCounters(metricsRegistry: MetricsRegistry) {
23+
successes = metricsRegistry.createCounter("${actionName}_successes", "Alias Action Successes", "count")
24+
failures = metricsRegistry.createCounter("${actionName}_failures", "Alias Action Failures", "count")
25+
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Alias Actions", "milliseconds")
26+
}
27+
28+
companion object {
29+
val instance: AliasActionMetrics by lazy { HOLDER.instance }
30+
}
31+
32+
private object HOLDER {
33+
val instance = AliasActionMetrics()
34+
}
35+
36+
override fun emitMetrics(
37+
context: StepContext,
38+
indexManagementActionsMetrics: IndexManagementActionsMetrics,
39+
stepMetaData: StepMetaData?,
40+
) {
41+
val aliasActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALIAS_ACTION) as AliasActionMetrics
42+
val stepStatus = stepMetaData?.stepStatus
43+
if (stepStatus == StepStatus.COMPLETED) {
44+
aliasActionMetrics.successes.add(1.0, context.let { aliasActionMetrics.createTags(it) })
45+
}
46+
if (stepStatus == StepStatus.FAILED) {
47+
aliasActionMetrics.failures.add(1.0, context.let { aliasActionMetrics.createTags(it) })
48+
}
49+
val endTime = System.currentTimeMillis()
50+
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
51+
aliasActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { aliasActionMetrics.createTags(it) })
52+
}
53+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics
7+
8+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
13+
import org.opensearch.telemetry.metrics.Counter
14+
import org.opensearch.telemetry.metrics.MetricsRegistry
15+
16+
class AllocationActionMetrics private constructor() : ActionMetrics() {
17+
override val actionName: String = IndexManagementActionsMetrics.ALLOCATION
18+
lateinit var successes: Counter
19+
lateinit var failures: Counter
20+
lateinit var cumulativeLatency: Counter
21+
22+
fun initializeCounters(metricsRegistry: MetricsRegistry) {
23+
successes = metricsRegistry.createCounter("${actionName}_successes", "Allocation Action Successes", "count")
24+
failures = metricsRegistry.createCounter("${actionName}_failures", "Allocation Action Failures", "count")
25+
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Allocation Actions", "milliseconds")
26+
}
27+
28+
companion object {
29+
val instance: AllocationActionMetrics by lazy { HOLDER.instance }
30+
}
31+
32+
private object HOLDER {
33+
val instance = AllocationActionMetrics()
34+
}
35+
36+
override fun emitMetrics(
37+
context: StepContext,
38+
indexManagementActionsMetrics: IndexManagementActionsMetrics,
39+
stepMetaData: StepMetaData?,
40+
) {
41+
val allocationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALLOCATION) as AllocationActionMetrics
42+
val stepStatus = stepMetaData?.stepStatus
43+
if (stepStatus == StepStatus.COMPLETED) {
44+
allocationActionMetrics.successes.add(1.0, context.let { allocationActionMetrics.createTags(it) })
45+
}
46+
if (stepStatus == StepStatus.FAILED) {
47+
allocationActionMetrics.failures.add(1.0, context.let { allocationActionMetrics.createTags(it) })
48+
}
49+
val endTime = System.currentTimeMillis()
50+
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
51+
allocationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { allocationActionMetrics.createTags(it) })
52+
}
53+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics
7+
8+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
13+
import org.opensearch.telemetry.metrics.Counter
14+
import org.opensearch.telemetry.metrics.MetricsRegistry
15+
16+
class CloseActionMetrics private constructor() : ActionMetrics() {
17+
override val actionName: String = IndexManagementActionsMetrics.CLOSE
18+
lateinit var successes: Counter
19+
lateinit var failures: Counter
20+
lateinit var cumulativeLatency: Counter
21+
22+
fun initializeCounters(metricsRegistry: MetricsRegistry) {
23+
successes = metricsRegistry.createCounter("${actionName}_successes", "Close Action Successes", "count")
24+
failures = metricsRegistry.createCounter("${actionName}_failures", "Close Action Failures", "count")
25+
cumulativeLatency = metricsRegistry.createCounter("${actionName}_cumulative_latency", "Cumulative Latency of Close Actions", "milliseconds")
26+
}
27+
28+
companion object {
29+
val instance: CloseActionMetrics by lazy { HOLDER.instance }
30+
}
31+
32+
private object HOLDER {
33+
val instance = CloseActionMetrics()
34+
}
35+
36+
override fun emitMetrics(
37+
context: StepContext,
38+
indexManagementActionsMetrics: IndexManagementActionsMetrics,
39+
stepMetaData: StepMetaData?,
40+
) {
41+
val closeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics
42+
val stepStatus = stepMetaData?.stepStatus
43+
if (stepStatus == StepStatus.COMPLETED) {
44+
closeActionMetrics.successes.add(1.0, context.let { closeActionMetrics.createTags(it) })
45+
}
46+
if (stepStatus == StepStatus.FAILED) {
47+
closeActionMetrics.failures.add(1.0, context.let { closeActionMetrics.createTags(it) })
48+
}
49+
val endTime = System.currentTimeMillis()
50+
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
51+
closeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { closeActionMetrics.createTags(it) })
52+
}
53+
}

0 commit comments

Comments
 (0)