From e5631ac471d9e74abb946db928f1a725097f118b Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Mon, 20 Nov 2023 21:57:06 -0500 Subject: [PATCH 01/19] adding write index check for alias of target index Signed-off-by: n-dohrmann --- .../indexmanagement/transform/TransformIndexer.kt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 904c6c6cd..d6c5dad86 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -63,6 +63,13 @@ class TransformIndexer( throw TransformIndexException("Failed to create the target index") } } + if (clusterService.state().metadata.hasAlias(targetIndex)) { + // return error if no write index + val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex + if (writeIndexMetadata == null) { + throw TransformIndexException("Alias has no write index") + } + } } @Suppress("ThrowsCount", "RethrowCaughtException") From 7fbde10784ee4776288f9d07175f74c7ebf589b0 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 13:53:02 -0500 Subject: [PATCH 02/19] trying out putMapping changes Signed-off-by: n-dohrmann --- .../transform/TransformIndexer.kt | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index d6c5dad86..eb0f97fb3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -11,6 +11,8 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.action.DocWriteRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.bulk.BulkItemResponse import org.opensearch.action.bulk.BulkRequest @@ -26,6 +28,8 @@ import org.opensearch.indexmanagement.transform.settings.TransformSettings import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.core.rest.RestStatus import org.opensearch.transport.RemoteTransportException +import org.opensearch.client.Requests.putMappingRequest +import org.opensearch.action.support.master.AcknowledgedResponse @Suppress("ComplexMethod") class TransformIndexer( @@ -63,13 +67,20 @@ class TransformIndexer( throw TransformIndexException("Failed to create the target index") } } + val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex if (clusterService.state().metadata.hasAlias(targetIndex)) { - // return error if no write index - val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex + // return error if no write index with the alias if (writeIndexMetadata == null) { - throw TransformIndexException("Alias has no write index") + throw TransformIndexException("Target index alias has no write index") } } + val putMappingReq = putMappingRequest(targetIndex).source(targetFieldMappings) + val mapResp : AcknowledgedResponse = client.admin().indices().suspendUntil { + putMapping(putMappingReq) + } + if (!mapResp.isAcknowledged) { + logger.error("Target index mapping request failed") + } } @Suppress("ThrowsCount", "RethrowCaughtException") From e0274ec81bc78ea169f35512f41250f90fb9c51f Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 14:01:53 -0500 Subject: [PATCH 03/19] linting previous change Signed-off-by: n-dohrmann --- .../indexmanagement/transform/TransformIndexer.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index eb0f97fb3..2b03f0ccd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -11,7 +11,6 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.action.DocWriteRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse -import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.bulk.BulkItemResponse @@ -28,7 +27,6 @@ import org.opensearch.indexmanagement.transform.settings.TransformSettings import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.core.rest.RestStatus import org.opensearch.transport.RemoteTransportException -import org.opensearch.client.Requests.putMappingRequest import org.opensearch.action.support.master.AcknowledgedResponse @Suppress("ComplexMethod") @@ -74,8 +72,8 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = putMappingRequest(targetIndex).source(targetFieldMappings) - val mapResp : AcknowledgedResponse = client.admin().indices().suspendUntil { + val putMappingReq = PutMappingRequest(targetIndex).source(targetFieldMappings) + val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } if (!mapResp.isAcknowledged) { From f8663783d16a9b667a605af047f6e2877f911d0a Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 17:12:55 -0500 Subject: [PATCH 04/19] variable target index PR ready for discussion... Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformIndexer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 2b03f0ccd..a913a532b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -72,7 +72,7 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = PutMappingRequest(targetIndex).source(targetFieldMappings) + val putMappingReq = PutMappingRequest(writeIndexMetadata.index.name).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } From e687f5f0177b9ad59faab768b59cf46e7714ca7e Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 17:28:00 -0500 Subject: [PATCH 05/19] linting previous commit Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformIndexer.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index a913a532b..276eab24c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -72,7 +72,9 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = PutMappingRequest(writeIndexMetadata.index.name).source(targetFieldMappings) + val putMappingReq = PutMappingRequest( + writeIndexMetadata?.index?.name ?: throw TransformIndexException("Target index alias has no write index!") + ).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } From a3586f5e930c9773dc02f0b7ce061b09f7171f42 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 17:35:51 -0500 Subject: [PATCH 06/19] reduce throw count to < 2 in createTargetIndex Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformIndexer.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 276eab24c..a98d098df 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -72,9 +72,7 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = PutMappingRequest( - writeIndexMetadata?.index?.name ?: throw TransformIndexException("Target index alias has no write index!") - ).source(targetFieldMappings) + val putMappingReq = PutMappingRequest(writeIndexMetadata?.index?.name).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } From 024696c49ada296e8bddd40d2054b5ddcee72252 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Mon, 20 Nov 2023 21:57:06 -0500 Subject: [PATCH 07/19] adding write index check for alias of target index Signed-off-by: n-dohrmann --- .../indexmanagement/transform/TransformIndexer.kt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 904c6c6cd..d6c5dad86 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -63,6 +63,13 @@ class TransformIndexer( throw TransformIndexException("Failed to create the target index") } } + if (clusterService.state().metadata.hasAlias(targetIndex)) { + // return error if no write index + val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex + if (writeIndexMetadata == null) { + throw TransformIndexException("Alias has no write index") + } + } } @Suppress("ThrowsCount", "RethrowCaughtException") From 3e43d363398129500fb3cdcc1ffd81cce5be2823 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 13:53:02 -0500 Subject: [PATCH 08/19] trying out putMapping changes Signed-off-by: n-dohrmann --- .../transform/TransformIndexer.kt | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index d6c5dad86..eb0f97fb3 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -11,6 +11,8 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.action.DocWriteRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.bulk.BulkItemResponse import org.opensearch.action.bulk.BulkRequest @@ -26,6 +28,8 @@ import org.opensearch.indexmanagement.transform.settings.TransformSettings import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.core.rest.RestStatus import org.opensearch.transport.RemoteTransportException +import org.opensearch.client.Requests.putMappingRequest +import org.opensearch.action.support.master.AcknowledgedResponse @Suppress("ComplexMethod") class TransformIndexer( @@ -63,13 +67,20 @@ class TransformIndexer( throw TransformIndexException("Failed to create the target index") } } + val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex if (clusterService.state().metadata.hasAlias(targetIndex)) { - // return error if no write index - val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex + // return error if no write index with the alias if (writeIndexMetadata == null) { - throw TransformIndexException("Alias has no write index") + throw TransformIndexException("Target index alias has no write index") } } + val putMappingReq = putMappingRequest(targetIndex).source(targetFieldMappings) + val mapResp : AcknowledgedResponse = client.admin().indices().suspendUntil { + putMapping(putMappingReq) + } + if (!mapResp.isAcknowledged) { + logger.error("Target index mapping request failed") + } } @Suppress("ThrowsCount", "RethrowCaughtException") From 123378b70ee60769ddbac9e08c52413e70d96cb6 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 14:01:53 -0500 Subject: [PATCH 09/19] linting previous change Signed-off-by: n-dohrmann --- .../indexmanagement/transform/TransformIndexer.kt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index eb0f97fb3..2b03f0ccd 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -11,7 +11,6 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.action.DocWriteRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse -import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.bulk.BulkItemResponse @@ -28,7 +27,6 @@ import org.opensearch.indexmanagement.transform.settings.TransformSettings import org.opensearch.indexmanagement.transform.util.TransformContext import org.opensearch.core.rest.RestStatus import org.opensearch.transport.RemoteTransportException -import org.opensearch.client.Requests.putMappingRequest import org.opensearch.action.support.master.AcknowledgedResponse @Suppress("ComplexMethod") @@ -74,8 +72,8 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = putMappingRequest(targetIndex).source(targetFieldMappings) - val mapResp : AcknowledgedResponse = client.admin().indices().suspendUntil { + val putMappingReq = PutMappingRequest(targetIndex).source(targetFieldMappings) + val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } if (!mapResp.isAcknowledged) { From ef70a7485c043932cac0389bd9354784193955ff Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 17:12:55 -0500 Subject: [PATCH 10/19] variable target index PR ready for discussion... Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformIndexer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 2b03f0ccd..a913a532b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -72,7 +72,7 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = PutMappingRequest(targetIndex).source(targetFieldMappings) + val putMappingReq = PutMappingRequest(writeIndexMetadata.index.name).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } From 3be1246f0a9db04e40ceb6232af301e0ebeb99f1 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 17:28:00 -0500 Subject: [PATCH 11/19] linting previous commit Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformIndexer.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index a913a532b..276eab24c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -72,7 +72,9 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = PutMappingRequest(writeIndexMetadata.index.name).source(targetFieldMappings) + val putMappingReq = PutMappingRequest( + writeIndexMetadata?.index?.name ?: throw TransformIndexException("Target index alias has no write index!") + ).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } From 4138ff0d2ff1d8a1e67323787fb3c6c34254d169 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Wed, 29 Nov 2023 17:35:51 -0500 Subject: [PATCH 12/19] reduce throw count to < 2 in createTargetIndex Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformIndexer.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index 276eab24c..a98d098df 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -72,9 +72,7 @@ class TransformIndexer( throw TransformIndexException("Target index alias has no write index") } } - val putMappingReq = PutMappingRequest( - writeIndexMetadata?.index?.name ?: throw TransformIndexException("Target index alias has no write index!") - ).source(targetFieldMappings) + val putMappingReq = PutMappingRequest(writeIndexMetadata?.index?.name).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } From 01bd3a2d96566ada355a4ff82b8591610e611777 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Tue, 5 Dec 2023 17:48:59 -0500 Subject: [PATCH 13/19] changing alias checker control flow Signed-off-by: n-dohrmann --- .../transform/TransformIndexer.kt | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index a98d098df..a0749d672 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -65,20 +65,21 @@ class TransformIndexer( throw TransformIndexException("Failed to create the target index") } } - val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex if (clusterService.state().metadata.hasAlias(targetIndex)) { // return error if no write index with the alias + val writeIndexMetadata = clusterService.state().metadata.indicesLookup[targetIndex]!!.writeIndex if (writeIndexMetadata == null) { - throw TransformIndexException("Target index alias has no write index") + throw TransformIndexException("target_index [$targetIndex] is an alias but doesn't have write index") + } + val putMappingReq = PutMappingRequest(writeIndexMetadata?.index?.name).source(targetFieldMappings) + val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { + putMapping(putMappingReq) + } + if (!mapResp.isAcknowledged) { + logger.error("Target index mapping request failed") } } - val putMappingReq = PutMappingRequest(writeIndexMetadata?.index?.name).source(targetFieldMappings) - val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { - putMapping(putMappingReq) - } - if (!mapResp.isAcknowledged) { - logger.error("Target index mapping request failed") - } + } @Suppress("ThrowsCount", "RethrowCaughtException") From ab17632d1baa9fb81c04966b87eec059d46fa09e Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Tue, 5 Dec 2023 19:32:31 -0500 Subject: [PATCH 14/19] adding test case for aliased transform target index Signed-off-by: n-dohrmann --- .../transform/TransformIndexer.kt | 3 +- .../transform/TransformRunnerIT.kt | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt index a0749d672..646e92cdc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt @@ -71,7 +71,7 @@ class TransformIndexer( if (writeIndexMetadata == null) { throw TransformIndexException("target_index [$targetIndex] is an alias but doesn't have write index") } - val putMappingReq = PutMappingRequest(writeIndexMetadata?.index?.name).source(targetFieldMappings) + val putMappingReq = PutMappingRequest(writeIndexMetadata.index?.name).source(targetFieldMappings) val mapResp: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingReq) } @@ -79,7 +79,6 @@ class TransformIndexer( logger.error("Target index mapping request failed") } } - } @Suppress("ThrowsCount", "RethrowCaughtException") diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index a948899ff..0d211f166 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -22,6 +22,8 @@ import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestRequest import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS +import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS import org.opensearch.script.Script import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilders @@ -1430,6 +1432,52 @@ class TransformRunnerIT : TransformRestTestCase() { disableTransform(transform.id) } + fun `test transform with wildcard, aliased target index`() { + validateSourceIndex("source-index") + + // create alias + val indexAlias = "wildcard_index_alias" + val resolvedTargetIndex = "resolved_target_index" + val builtSettings = Settings.builder().let { + it.put(INDEX_NUMBER_OF_REPLICAS, 1) + it.put(INDEX_NUMBER_OF_SHARDS, 1) + it + }.build() + val aliases = "\"$indexAlias\": { \"is_write_index\": true }" + createIndex(resolvedTargetIndex, builtSettings, null, aliases) + + refreshAllIndices() + + val transform = Transform( + id = "id_18", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "source-index", + targetIndex = indexAlias, + roles = emptyList(), + pageSize = 100, + groups = listOf( + Terms(sourceField = "store_and_fwd_flag", targetField = "flag") + ) + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + waitFor { assertTrue("Target transform index was not created", indexExists(resolvedTargetIndex)) } + + val metadata = waitFor { + val job = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", job.metadataId) + val transformMetadata = getTransformMetadata(job.metadataId!!) + assertEquals("Transform had not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + transformMetadata + } + } private fun getStrictMappings(): String { return """ "dynamic": "strict", From 448161bd83a92cb035cfa3b22a380e4a2aad518d Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Thu, 14 Dec 2023 16:15:13 -0500 Subject: [PATCH 15/19] quick commit before changing branches Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformRunnerIT.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 0d211f166..9cedb1596 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -1477,6 +1477,9 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Transform had not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) transformMetadata } + + // TODO - make sure we're written to the correct index! + } private fun getStrictMappings(): String { return """ From e50468b80895cc8b65a5d00ce4759b1b9c948409 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Thu, 14 Dec 2023 22:29:20 -0500 Subject: [PATCH 16/19] adding code for quick question Signed-off-by: n-dohrmann --- .../indexmanagement/transform/TransformRunnerIT.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 9cedb1596..2b259d927 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -1478,7 +1478,11 @@ class TransformRunnerIT : TransformRestTestCase() { transformMetadata } - // TODO - make sure we're written to the correct index! + // TODO - make sure we've written to the correct index! + val targetIndexMapping = client().makeRequest("GET", "/$indexAlias/_mapping") + val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> + + // how to check if the results are correctly written to the write index of the alias? } private fun getStrictMappings(): String { From 7dd35300929284d31896af308f13db2c2997f480 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Fri, 15 Dec 2023 15:46:32 -0500 Subject: [PATCH 17/19] adding to target alias transform test Signed-off-by: n-dohrmann --- .../transform/TransformRunnerIT.kt | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 2b259d927..a8a1a3495 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -1447,6 +1447,8 @@ class TransformRunnerIT : TransformRestTestCase() { createIndex(resolvedTargetIndex, builtSettings, null, aliases) refreshAllIndices() + val pickupDateTime = "tpep_pickup_datetime" + val fareAmount = "fare_amount" val transform = Transform( id = "id_18", @@ -1462,8 +1464,9 @@ class TransformRunnerIT : TransformRestTestCase() { roles = emptyList(), pageSize = 100, groups = listOf( - Terms(sourceField = "store_and_fwd_flag", targetField = "flag") - ) + Terms(sourceField = pickupDateTime, targetField = pickupDateTime) + ), + aggregations = AggregatorFactories.builder().addAggregator(AggregationBuilders.avg(fareAmount).field(fareAmount)) ).let { createTransform(it, it.id) } updateTransformStartTime(transform) @@ -1479,10 +1482,48 @@ class TransformRunnerIT : TransformRestTestCase() { } // TODO - make sure we've written to the correct index! + val sourceIndexMapping = client().makeRequest("GET", "/source-index/_mapping") + val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> val targetIndexMapping = client().makeRequest("GET", "/$indexAlias/_mapping") val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> // how to check if the results are correctly written to the write index of the alias? + val sourcePickupDate = (((sourceIndexParserMap["source-index"]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val targetPickupDate = (((targetIndexParserMap[indexAlias]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + + assertEquals(sourcePickupDate, targetPickupDate) + + val pickupDateTimeTerm = "pickupDateTerm14" + + val request = """ + { + "size": 0, + "aggs": { + "$pickupDateTimeTerm": { + "terms": { + "field": "$pickupDateTime", "order": { "_key": "asc" } + }, + "aggs": { + "avgFareAmount": { "avg": { "field": "$fareAmount" } } } + } + } + } + """ + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/source-index/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$indexAlias/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + assertTrue(transformRes.restStatus() == RestStatus.OK) + + val rawAggBuckets = (rawRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + val transformAggBuckets = (transformRes.asMap()["aggregations"] as Map>>>>)[pickupDateTimeTerm]!!["buckets"]!! + assertEquals("Different bucket sizes", rawAggBuckets.size, transformAggBuckets.size) + + // Verify the values of keys and metrics in all buckets + for (i in rawAggBuckets.indices) { + assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) + assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) + } } private fun getStrictMappings(): String { From a73894483206fdb47f4279c477c05f69ec5d2fd9 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Fri, 15 Dec 2023 16:23:22 -0500 Subject: [PATCH 18/19] adding explicit variable for sourceIndex Signed-off-by: n-dohrmann --- .../transform/TransformRunnerIT.kt | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index a8a1a3495..4a0ebf5dc 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -1433,7 +1433,8 @@ class TransformRunnerIT : TransformRestTestCase() { } fun `test transform with wildcard, aliased target index`() { - validateSourceIndex("source-index") + val sourceIndex = "source-index" + validateSourceIndex(sourceIndex) // create alias val indexAlias = "wildcard_index_alias" @@ -1459,7 +1460,7 @@ class TransformRunnerIT : TransformRestTestCase() { jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), description = "test transform", metadataId = null, - sourceIndex = "source-index", + sourceIndex = sourceIndex, targetIndex = indexAlias, roles = emptyList(), pageSize = 100, @@ -1473,22 +1474,20 @@ class TransformRunnerIT : TransformRestTestCase() { waitFor { assertTrue("Target transform index was not created", indexExists(resolvedTargetIndex)) } - val metadata = waitFor { + waitFor { val job = getTransform(transformId = transform.id) assertNotNull("Transform job doesn't have metadata set", job.metadataId) val transformMetadata = getTransformMetadata(job.metadataId!!) assertEquals("Transform had not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - transformMetadata } - // TODO - make sure we've written to the correct index! - val sourceIndexMapping = client().makeRequest("GET", "/source-index/_mapping") + val sourceIndexMapping = client().makeRequest("GET", "/$sourceIndex/_mapping") val sourceIndexParserMap = createParser(XContentType.JSON.xContent(), sourceIndexMapping.entity.content).map() as Map> val targetIndexMapping = client().makeRequest("GET", "/$indexAlias/_mapping") val targetIndexParserMap = createParser(XContentType.JSON.xContent(), targetIndexMapping.entity.content).map() as Map> // how to check if the results are correctly written to the write index of the alias? - val sourcePickupDate = (((sourceIndexParserMap["source-index"]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] + val sourcePickupDate = (((sourceIndexParserMap[sourceIndex]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] val targetPickupDate = (((targetIndexParserMap[indexAlias]?.get("mappings") as Map)["properties"] as Map)["tpep_pickup_datetime"] as Map)["type"] assertEquals(sourcePickupDate, targetPickupDate) @@ -1509,7 +1508,7 @@ class TransformRunnerIT : TransformRestTestCase() { } } """ - var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/source-index/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) + var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIndex/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) assertTrue(rawRes.restStatus() == RestStatus.OK) var transformRes = client().makeRequest(RestRequest.Method.POST.name, "/$indexAlias/_search", emptyMap(), StringEntity(request, ContentType.APPLICATION_JSON)) @@ -1524,7 +1523,6 @@ class TransformRunnerIT : TransformRestTestCase() { assertEquals("Term pickup date bucket keys are not the same", rawAggBuckets[i]["key"], transformAggBuckets[i]["key"]) assertEquals("Avg fare amounts are not the same", rawAggBuckets[i]["avgFareAmount"], transformAggBuckets[i]["avgFareAmount"]) } - } private fun getStrictMappings(): String { return """ From 0bb9ce87246e810e8666c831f57c0b1fbfd61618 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Fri, 15 Dec 2023 16:30:27 -0500 Subject: [PATCH 19/19] adding unchecked cast suppressor to test method Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformRunnerIT.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 4a0ebf5dc..4c5c789ee 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -1432,6 +1432,7 @@ class TransformRunnerIT : TransformRestTestCase() { disableTransform(transform.id) } + @Suppress("UNCHECKED_CAST") fun `test transform with wildcard, aliased target index`() { val sourceIndex = "source-index" validateSourceIndex(sourceIndex)