diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java index 14de086c1bf2b..661baaa640641 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java @@ -51,6 +51,8 @@ public void testRerouteIsCalledWhenHotSpotAppears() { internalCluster().startMasterOnlyNode(settings); final String dataNodeOne = internalCluster().startDataOnlyNode(settings); final String dataNodeTwo = internalCluster().startDataOnlyNode(settings); + // Maintain a third node so that there's always at least one non-hot-spotting node that can receive shards. + internalCluster().startDataOnlyNode(settings); // Unmodified cluster info should detect no hot-spotting nodes MockLog.awaitLogger( @@ -60,7 +62,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() { "no hot-spots detected", WriteLoadConstraintMonitor.class.getCanonicalName(), Level.TRACE, - "No hot-spotting nodes detected" + "No hot-spotting write nodes detected" ) ); @@ -76,7 +78,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() { WriteLoadConstraintMonitor.class.getCanonicalName(), Level.DEBUG, Strings.format(""" - Nodes [[%s]] are hot-spotting, of 3 total cluster nodes. Reroute for hot-spotting has never previously been called. \ + Nodes [[%s]] are hot-spotting, of 4 total cluster nodes. Reroute for hot-spotting has never previously been called. \ Previously hot-spotting nodes are [0 nodes]. The write thread pool queue latency threshold is [%s]. \ Triggering reroute. """, getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis)) @@ -111,7 +113,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() { WriteLoadConstraintMonitor.class.getCanonicalName(), Level.DEBUG, Strings.format(""" - Nodes [[*]] are hot-spotting, of 3 total cluster nodes. \ + Nodes [[*]] are hot-spotting, of 4 total cluster nodes. \ Reroute for hot-spotting was last called [*] ago. Previously hot-spotting nodes are [[%s]]. \ The write thread pool queue latency threshold is [%s]. Triggering reroute. """, getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis)) @@ -130,7 +132,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() { "no hot-spots detected", WriteLoadConstraintMonitor.class.getCanonicalName(), Level.TRACE, - "No hot-spotting nodes detected" + "No hot-spotting write nodes detected" ) ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java index c2deff8608593..d3a8a8e80b31a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -26,6 +27,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -75,18 +77,32 @@ public void onNewInfo(ClusterInfo clusterInfo) { logger.trace("processing new cluster info"); final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size(); - final Set nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes); + final Set writeNodeIdsExceedingQueueLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes); + AtomicBoolean haveWriteNodesBelowQueueLatencyThreshold = new AtomicBoolean(false); clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> { + if (state.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) { + // Search nodes are not expected to have write load hot-spots and are not considered for shard relocation. + // TODO (ES-13314): consider stateful data tiers + return; + } final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE); assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]"; - if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) { - nodeIdsExceedingLatencyThreshold.add(nodeId); + if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) { + writeNodeIdsExceedingQueueLatencyThreshold.add(nodeId); + } else { + haveWriteNodesBelowQueueLatencyThreshold.set(true); } }); - if (nodeIdsExceedingLatencyThreshold.isEmpty()) { - logger.trace("No hot-spotting nodes detected"); + if (writeNodeIdsExceedingQueueLatencyThreshold.isEmpty()) { + logger.trace("No hot-spotting write nodes detected"); + return; + } + if (haveWriteNodesBelowQueueLatencyThreshold.get() == false) { + logger.debug(""" + Nodes [{}] are above the queue latency threshold, but there are no write nodes below the threshold. \ + Cannot rebalance shards.""", nodeSummary(writeNodeIdsExceedingQueueLatencyThreshold)); return; } @@ -98,14 +114,14 @@ public void onNewInfo(ClusterInfo clusterInfo) { // We know that there is at least one hot-spotting node if we've reached this code. Now check whether there are any new hot-spots // or hot-spots that are persisting and need further balancing work. if (haveCalledRerouteRecently == false - || Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) { + || Sets.difference(writeNodeIdsExceedingQueueLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) { if (logger.isDebugEnabled()) { logger.debug( """ Nodes [{}] are hot-spotting, of {} total cluster nodes. Reroute for hot-spotting {}. \ Previously hot-spotting nodes are [{}]. The write thread pool queue latency threshold is [{}]. Triggering reroute. """, - nodeSummary(nodeIdsExceedingLatencyThreshold), + nodeSummary(writeNodeIdsExceedingQueueLatencyThreshold), state.nodes().size(), lastRerouteTimeMillis == 0 ? "has never previously been called" @@ -124,7 +140,7 @@ public void onNewInfo(ClusterInfo clusterInfo) { ) ); lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong(); - lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold; + lastSetOfHotSpottedNodes = writeNodeIdsExceedingQueueLatencyThreshold; } else { logger.debug( "Not calling reroute because we called reroute [{}] ago and there are no new hot spots", diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java index 210ca90750e5c..72e6c99bb0e56 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ClusterSettings; @@ -28,6 +29,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,7 +39,6 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -47,7 +48,6 @@ import static org.mockito.Mockito.verifyNoInteractions; public class WriteLoadConstraintMonitorTests extends ESTestCase { - public void testRerouteIsCalledWhenAHotSpotIsDetected() { final TestState testState = createRandomTestStateThatWillTriggerReroute(); final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( @@ -147,7 +147,7 @@ public void testRerouteIsNotCalledWhenNoNodesAreHotSpotting() { "don't reroute due to no nodes hot-spotting", WriteLoadConstraintMonitor.class.getCanonicalName(), Level.TRACE, - "No hot-spotting nodes detected" + "No hot-spotting write nodes detected" ) ); @@ -164,6 +164,47 @@ public void testRerouteIsNotCalledWhenNoNodesAreHotSpotting() { } } + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", + reason = "ensure we're skipping reroute for the right reason" + ) + public void testRerouteIsNotCalledInAnAllNodesAreHotSpottingCluster() { + final int numberOfIndexNodes = randomIntBetween(1, 5); + final TestState testState = createTestStateWithNumberOfNodesAndHotSpots( + numberOfIndexNodes, + randomIntBetween(1, 5), // Search nodes should not be considered to address write load hot-spots. + numberOfIndexNodes + ); + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + testState.clusterSettings, + testState.currentTimeSupplier, + () -> testState.clusterState, + testState.mockRerouteService + ); + try (MockLog mockLog = MockLog.capture(WriteLoadConstraintMonitor.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "don't reroute when all nodes are hot-spotting", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + "Nodes * are above the queue latency threshold, but there are no write nodes below the threshold. " + + "Cannot rebalance shards." + ) + ); + + writeLoadConstraintMonitor.onNewInfo( + createClusterInfoWithHotSpots( + testState.clusterState, + numberOfIndexNodes, + testState.latencyThresholdMillis, + testState.highUtilizationThresholdPercent + ) + ); + mockLog.assertAllExpectationsMatched(); + verifyNoInteractions(testState.mockRerouteService); + } + } + @TestLogging( value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", reason = "ensure we're skipping reroute for the right reason" @@ -236,35 +277,36 @@ public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHot verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); reset(testState.mockRerouteService); - assertThat( - "Test setup should leave at least two nodes not hot-spotted", - testState.clusterInfo.getNodeUsageStatsForThreadPools().size() - testState.clusterInfo.getNodeUsageStatsForThreadPools() - .values() - .stream() - .filter(stats -> nodeExceedsQueueLatencyThreshold(stats, testState.latencyThresholdMillis)) - .count(), - greaterThanOrEqualTo(2L) - ); - // Now update cluster info to add another hot-spotted node final AtomicBoolean thresholdIncreased = new AtomicBoolean(false); - var nodeUsageStatsWithExtraHotSpot = Maps.transformValues(testState.clusterInfo.getNodeUsageStatsForThreadPools(), stats -> { - if (thresholdIncreased.get() == false && nodeExceedsQueueLatencyThreshold(stats, testState.latencyThresholdMillis) == false) { + Map nodeUsageStatsWithExtraHotSpot = new HashMap<>(); + for (var entry : testState.clusterInfo.getNodeUsageStatsForThreadPools().entrySet()) { + if (thresholdIncreased.get() == false + && nonSearchNodeBelowQueueLatencyThreshold( + testState.clusterState, + entry.getKey(), + entry.getValue(), + testState.latencyThresholdMillis + )) { thresholdIncreased.set(true); - return new NodeUsageStatsForThreadPools( - stats.nodeId(), - Maps.transformValues( - stats.threadPoolUsageStatsMap(), - tpStats -> new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - tpStats.totalThreadPoolThreads(), - tpStats.averageThreadPoolUtilization(), - testState.latencyThresholdMillis + randomLongBetween(1, 100_000) + nodeUsageStatsWithExtraHotSpot.put( + entry.getKey(), + new NodeUsageStatsForThreadPools( + entry.getKey(), + Maps.transformValues( + entry.getValue().threadPoolUsageStatsMap(), + tpStats -> new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + tpStats.totalThreadPoolThreads(), + tpStats.averageThreadPoolUtilization(), + testState.latencyThresholdMillis + randomLongBetween(1, 100_000) + ) ) ) ); + } else { + nodeUsageStatsWithExtraHotSpot.put(entry.getKey(), entry.getValue()); } - return stats; - }); + } // Advance the clock by less than the re-route interval currentTimeMillis.addAndGet(randomLongBetween(0, minimumInterval.millis() - 1)); @@ -274,37 +316,55 @@ public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHot verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); } - private boolean nodeExceedsQueueLatencyThreshold(NodeUsageStatsForThreadPools nodeUsageStats, long latencyThresholdMillis) { - return nodeUsageStats.threadPoolUsageStatsMap() - .get(ThreadPool.Names.WRITE) - .maxThreadPoolQueueLatencyMillis() > latencyThresholdMillis; + private boolean nonSearchNodeBelowQueueLatencyThreshold( + ClusterState clusterState, + String nodeId, + NodeUsageStatsForThreadPools nodeUsageStats, + long latencyThresholdMillis + ) { + return clusterState.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE) == false + && nodeUsageStats.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE) + .maxThreadPoolQueueLatencyMillis() < latencyThresholdMillis; } private TestState createRandomTestStateThatWillTriggerReroute() { + int numberOfNodes = randomIntBetween(3, 10); + int numberOfHotSpottingNodes = numberOfNodes - 2; // Leave at least 2 non-hot-spotting nodes. + return createTestStateWithNumberOfNodesAndHotSpots(numberOfNodes, randomIntBetween(0, 5), numberOfHotSpottingNodes); + } + + private TestState createTestStateWithNumberOfNodesAndHotSpots( + int numberOfIndexNodes, + int numberOfSearchNodes, + int numberOfHotSpottingNodes + ) { + assert numberOfHotSpottingNodes <= numberOfIndexNodes; final long queueLatencyThresholdMillis = randomLongBetween(1000, 5000); final int highUtilizationThresholdPercent = randomIntBetween(70, 100); - final int numberOfNodes = randomIntBetween(3, 10); final ClusterSettings clusterSettings = createClusterSettings( WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED, queueLatencyThresholdMillis, highUtilizationThresholdPercent ); - final ClusterState state = ClusterStateCreationUtils.state( - numberOfNodes, - new String[] { randomIdentifier() }, - randomIntBetween(1, numberOfNodes) + final ClusterState state = ClusterStateCreationUtils.buildServerlessRoleNodes( + randomIdentifier(), // index name + randomIntBetween(1, numberOfIndexNodes), // num shard primaries + numberOfIndexNodes, // number of index role nodes + numberOfSearchNodes // number of search role nodes ); + final RerouteService rerouteService = mock(RerouteService.class); final ClusterInfo clusterInfo = createClusterInfoWithHotSpots( state, - randomIntBetween(1, numberOfNodes - 2), + randomIntBetween(1, numberOfHotSpottingNodes), queueLatencyThresholdMillis, highUtilizationThresholdPercent ); return new TestState( queueLatencyThresholdMillis, highUtilizationThresholdPercent, - numberOfNodes, + numberOfIndexNodes, clusterSettings, System::currentTimeMillis, state, @@ -338,9 +398,9 @@ private static ClusterSettings createClusterSettings( } /** - * Create a {@link ClusterInfo} with the specified number of hot spotting nodes, - * all other nodes will have no queue latency and have utilization below the specified - * high-utilization threshold. + * Create a {@link ClusterInfo} with the specified number of hot spotting index nodes, + * all other index nodes will have no queue latency and have utilization below the specified + * high-utilization threshold. Any search nodes in the cluster will have zero usage write load stats. * * @param state The cluster state * @param numberOfNodesHotSpotting The number of nodes that should be hot-spotting @@ -354,10 +414,25 @@ private static ClusterInfo createClusterInfoWithHotSpots( long queueLatencyThresholdMillis, int highUtilizationThresholdPercent ) { + assert numberOfNodesHotSpotting <= state.getNodes() + .stream() + .filter(node -> node.getRoles().contains(DiscoveryNodeRole.INDEX_ROLE)) + .toList() + .size() + : "Requested " + + numberOfNodesHotSpotting + + " hot spotting nodes, but there are only " + + state.getRoutingNodes().size() + + " nodes in the cluster"; + final float maxRatioForUnderUtilised = (highUtilizationThresholdPercent - 1) / 100.0f; final AtomicInteger hotSpottingNodes = new AtomicInteger(numberOfNodesHotSpotting); return ClusterInfo.builder() .nodeUsageStatsForThreadPools(state.nodes().stream().collect(Collectors.toMap(DiscoveryNode::getId, node -> { + if (node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) { + // Search nodes are skipped for write load hot-spots. + return new NodeUsageStatsForThreadPools(node.getId(), ZERO_USAGE_THREAD_POOL_USAGE_MAP); + } if (hotSpottingNodes.getAndDecrement() > 0) { // hot-spotting node return new NodeUsageStatsForThreadPools( @@ -399,4 +474,9 @@ private record TestState( RerouteService mockRerouteService, ClusterInfo clusterInfo ) {} + + public static final Map ZERO_USAGE_THREAD_POOL_USAGE_MAP = Map.of( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(5, 0, 0) + ); } diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 36d71387df59f..43b2cef0e39df 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.AllocationId; @@ -311,7 +312,7 @@ public static ClusterState state(String index, int numberOfNodes, int numberOfPr * Creates cluster state with an index that has #(numberOfPrimaries) primary shards in the started state and no replicas. * The cluster state contains #(numberOfNodes) nodes and assigns primaries to those nodes. */ - public static ClusterState state(ProjectId projectId, String index, int numberOfNodes, int numberOfPrimaries) { + public static ClusterState state(ProjectId projectId, String indexName, int numberOfNodes, int numberOfPrimaries) { DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); Set nodes = new HashSet<>(); for (int i = 0; i < numberOfNodes; i++) { @@ -321,7 +322,57 @@ public static ClusterState state(ProjectId projectId, String index, int numberOf } discoBuilder.localNodeId(newNode(0).getId()); discoBuilder.masterNodeId(randomFrom(nodes)); - IndexMetadata indexMetadata = IndexMetadata.builder(index) + IndexState index = buildIndex(indexName, numberOfPrimaries, nodes); + + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + state.metadata( + Metadata.builder().put(ProjectMetadata.builder(projectId).put(index.indexMetadata, false)).generateClusterUuidIfNeeded() + ); + state.routingTable(GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(index.indexRoutingTableBuilder)).build()); + return state.build(); + } + + /** + * Creates cluster state with an index that has #(numberOfPrimaries) primary shards in the started state and no replicas. The cluster + * state contains #(numberOfIndexNodes) nodes with {@link DiscoveryNodeRole#INDEX_ROLE}, assigning the primary shards to those nodes, + * and #(numberOfSearchNodes) nodes with {@link DiscoveryNodeRole#SEARCH_ROLE}. + */ + public static ClusterState buildServerlessRoleNodes( + String indexName, + int numberOfPrimaries, + int numberOfIndexNodes, + int numberOfSearchNodes + ) { + ProjectId projectId = Metadata.DEFAULT_PROJECT_ID; + DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); + Set indexNodeIds = new HashSet<>(); + for (int i = 0; i < numberOfIndexNodes; i++) { + final DiscoveryNode node = DiscoveryNodeUtils.builder("index_" + i).roles(Set.of(DiscoveryNodeRole.INDEX_ROLE)).build(); + discoBuilder = discoBuilder.add(node); + indexNodeIds.add(node.getId()); + } + for (int i = 0; i < numberOfSearchNodes; i++) { + final DiscoveryNode node = DiscoveryNodeUtils.builder("search_" + i).roles(Set.of(DiscoveryNodeRole.SEARCH_ROLE)).build(); + discoBuilder = discoBuilder.add(node); + } + discoBuilder.localNodeId(randomFrom(indexNodeIds)); + discoBuilder.masterNodeId(randomFrom(indexNodeIds)); + IndexState index = buildIndex(indexName, numberOfPrimaries, indexNodeIds); + + ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); + state.nodes(discoBuilder); + state.metadata( + Metadata.builder().put(ProjectMetadata.builder(projectId).put(index.indexMetadata, false)).generateClusterUuidIfNeeded() + ); + state.routingTable(GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(index.indexRoutingTableBuilder)).build()); + return state.build(); + } + + private record IndexState(IndexMetadata indexMetadata, IndexRoutingTable.Builder indexRoutingTableBuilder) {} + + private static IndexState buildIndex(String indexName, int numberOfPrimaries, Set nodeIds) { + IndexMetadata indexMetadata = IndexMetadata.builder(indexName) .settings(indexSettings(IndexVersion.current(), numberOfPrimaries, 0).put(SETTING_CREATION_DATE, System.currentTimeMillis())) .build(); @@ -330,16 +381,11 @@ public static ClusterState state(ProjectId projectId, String index, int numberOf ShardId shardId = new ShardId(indexMetadata.getIndex(), i); IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId); indexShardRoutingBuilder.addShard( - TestShardRouting.newShardRouting(shardId, randomFrom(nodes), true, ShardRoutingState.STARTED) + TestShardRouting.newShardRouting(shardId, randomFrom(nodeIds), true, ShardRoutingState.STARTED) ); indexRoutingTable.addIndexShard(indexShardRoutingBuilder); } - - ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); - state.nodes(discoBuilder); - state.metadata(Metadata.builder().put(ProjectMetadata.builder(projectId).put(indexMetadata, false)).generateClusterUuidIfNeeded()); - state.routingTable(GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(indexRoutingTable)).build()); - return state.build(); + return new IndexState(indexMetadata, indexRoutingTable); } /**