Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Map;
Expand All @@ -26,6 +27,11 @@
*/
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable {

public static final Map<String, ThreadPoolUsageStats> ZERO_USAGE_THREAD_POOL_USAGE_MAP = Map.of(
ThreadPool.Names.WRITE,
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(5, 0, 0)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like test code in production classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the to the test file 👍


public NodeUsageStatsForThreadPools(StreamInput in) throws IOException {
this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -75,18 +76,32 @@ public void onNewInfo(ClusterInfo clusterInfo) {
logger.trace("processing new cluster info");

final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size();
final Set<String> nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
final Set<String> nodeIdsExceedingQueueLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
final Set<String> nodeIdsBelowQueueLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We don't really need capture the nodeId. A boolean flag should do the job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, don't appear to need a Set. Swapped 👍 Thanks

clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> {
if (state.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) {
// Search nodes are not expected to have write load hot-spots and are not considered for shard relocation.
// TODO (ES-13314): consider stateful data tiers
return;
}
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]";
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
nodeIdsExceedingLatencyThreshold.add(nodeId);
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
nodeIdsExceedingQueueLatencyThreshold.add(nodeId);
} else {
nodeIdsBelowQueueLatencyThreshold.add(nodeId);
}
});

if (nodeIdsExceedingLatencyThreshold.isEmpty()) {
logger.trace("No hot-spotting nodes detected");
if (nodeIdsExceedingQueueLatencyThreshold.isEmpty()) {
logger.trace("No hot-spotting write nodes detected");
return;
}
if (nodeIdsBelowQueueLatencyThreshold.isEmpty()) {
logger.debug("""
Nodes [{}] are above the queue latency threshold, but there are no write nodes below the threshold. \
Cannot rebalance shards.""", nodeSummary(nodeIdsExceedingQueueLatencyThreshold));
return;
}

Expand All @@ -98,14 +113,14 @@ public void onNewInfo(ClusterInfo clusterInfo) {
// We know that there is at least one hot-spotting node if we've reached this code. Now check whether there are any new hot-spots
// or hot-spots that are persisting and need further balancing work.
if (haveCalledRerouteRecently == false
|| Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) {
|| Sets.difference(nodeIdsExceedingQueueLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) {
if (logger.isDebugEnabled()) {
logger.debug(
"""
Nodes [{}] are hot-spotting, of {} total cluster nodes. Reroute for hot-spotting {}. \
Previously hot-spotting nodes are [{}]. The write thread pool queue latency threshold is [{}]. Triggering reroute.
""",
nodeSummary(nodeIdsExceedingLatencyThreshold),
nodeSummary(nodeIdsExceedingQueueLatencyThreshold),
state.nodes().size(),
lastRerouteTimeMillis == 0
? "has never previously been called"
Expand All @@ -124,7 +139,7 @@ public void onNewInfo(ClusterInfo clusterInfo) {
)
);
lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong();
lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold;
lastSetOfHotSpottedNodes = nodeIdsExceedingQueueLatencyThreshold;
} else {
logger.debug(
"Not calling reroute because we called reroute [{}] ago and there are no new hot spots",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -36,6 +37,7 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ZERO_USAGE_THREAD_POOL_USAGE_MAP;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -147,7 +149,7 @@ public void testRerouteIsNotCalledWhenNoNodesAreHotSpotting() {
"don't reroute due to no nodes hot-spotting",
WriteLoadConstraintMonitor.class.getCanonicalName(),
Level.TRACE,
"No hot-spotting nodes detected"
"No hot-spotting write nodes detected"
)
);

Expand All @@ -164,6 +166,47 @@ public void testRerouteIsNotCalledWhenNoNodesAreHotSpotting() {
}
}

@TestLogging(
value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG",
reason = "ensure we're skipping reroute for the right reason"
)
public void testRerouteIsNotCalledInAnAllNodesAreHotSpottingCluster() {
final int numberOfIndexNodes = randomIntBetween(1, 5);
final TestState testState = createTestStateWithNumberOfNodesAndHotSpots(
numberOfIndexNodes,
randomIntBetween(1, 5), // Search nodes should not be considered to address write load hot-spots.
numberOfIndexNodes
);
final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor(
testState.clusterSettings,
testState.currentTimeSupplier,
() -> testState.clusterState,
testState.mockRerouteService
);
try (MockLog mockLog = MockLog.capture(WriteLoadConstraintMonitor.class)) {
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"don't reroute when all nodes are hot-spotting",
WriteLoadConstraintMonitor.class.getCanonicalName(),
Level.DEBUG,
"Nodes * are above the queue latency threshold, but there are no write nodes below the threshold. "
+ "Cannot rebalance shards."
)
);

writeLoadConstraintMonitor.onNewInfo(
createClusterInfoWithHotSpots(
testState.clusterState,
numberOfIndexNodes,
testState.latencyThresholdMillis,
testState.highUtilizationThresholdPercent
)
);
mockLog.assertAllExpectationsMatched();
verifyNoInteractions(testState.mockRerouteService);
}
}

@TestLogging(
value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG",
reason = "ensure we're skipping reroute for the right reason"
Expand Down Expand Up @@ -281,30 +324,41 @@ private boolean nodeExceedsQueueLatencyThreshold(NodeUsageStatsForThreadPools no
}

private TestState createRandomTestStateThatWillTriggerReroute() {
int numberOfNodes = randomIntBetween(3, 10);
int numberOfHotSpottingNodes = numberOfNodes - 2; // Leave at least 1 non-hot-spotting node.
return createTestStateWithNumberOfNodesAndHotSpots(numberOfNodes, randomIntBetween(0, 5), numberOfHotSpottingNodes);
}

private TestState createTestStateWithNumberOfNodesAndHotSpots(
int numberOfIndexNodes,
int numberOfSearchNodes,
int numberOfHotSpottingNodes
) {
final long queueLatencyThresholdMillis = randomLongBetween(1000, 5000);
Copy link
Contributor

@nicktindall nicktindall Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should we assert that numberOfHotSpottingNodes <= numberOfIndexNodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

final int highUtilizationThresholdPercent = randomIntBetween(70, 100);
final int numberOfNodes = randomIntBetween(3, 10);
final ClusterSettings clusterSettings = createClusterSettings(
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED,
queueLatencyThresholdMillis,
highUtilizationThresholdPercent
);
final ClusterState state = ClusterStateCreationUtils.state(
numberOfNodes,
new String[] { randomIdentifier() },
randomIntBetween(1, numberOfNodes)
final ClusterState state = ClusterStateCreationUtils.buildServerlessRoleNodes(
randomIdentifier(), // index name
randomIntBetween(1, numberOfIndexNodes), // num shard primaries
numberOfIndexNodes, // number of index role nodes
numberOfSearchNodes // number of search role nodes
);

final RerouteService rerouteService = mock(RerouteService.class);
final ClusterInfo clusterInfo = createClusterInfoWithHotSpots(
state,
randomIntBetween(1, numberOfNodes - 2),
randomIntBetween(1, numberOfHotSpottingNodes),
queueLatencyThresholdMillis,
highUtilizationThresholdPercent
);
return new TestState(
queueLatencyThresholdMillis,
highUtilizationThresholdPercent,
numberOfNodes,
numberOfIndexNodes,
clusterSettings,
System::currentTimeMillis,
state,
Expand Down Expand Up @@ -338,9 +392,9 @@ private static ClusterSettings createClusterSettings(
}

/**
* Create a {@link ClusterInfo} with the specified number of hot spotting nodes,
* all other nodes will have no queue latency and have utilization below the specified
* high-utilization threshold.
* Create a {@link ClusterInfo} with the specified number of hot spotting index nodes,
* all other index nodes will have no queue latency and have utilization below the specified
* high-utilization threshold. Any search nodes in the cluster will have zero usage write load stats.
*
* @param state The cluster state
* @param numberOfNodesHotSpotting The number of nodes that should be hot-spotting
Expand All @@ -354,10 +408,25 @@ private static ClusterInfo createClusterInfoWithHotSpots(
long queueLatencyThresholdMillis,
int highUtilizationThresholdPercent
) {
assert numberOfNodesHotSpotting <= state.getNodes()
.stream()
.filter(node -> node.getRoles().contains(DiscoveryNodeRole.INDEX_ROLE))
.toList()
.size()
: "Requested "
+ numberOfNodesHotSpotting
+ " hot spotting nodes, but there are only "
+ state.getRoutingNodes().size()
+ " nodes in the cluster";

final float maxRatioForUnderUtilised = (highUtilizationThresholdPercent - 1) / 100.0f;
final AtomicInteger hotSpottingNodes = new AtomicInteger(numberOfNodesHotSpotting);
return ClusterInfo.builder()
.nodeUsageStatsForThreadPools(state.nodes().stream().collect(Collectors.toMap(DiscoveryNode::getId, node -> {
if (node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) {
// Search nodes are skipped for write load hot-spots.
return new NodeUsageStatsForThreadPools(node.getId(), ZERO_USAGE_THREAD_POOL_USAGE_MAP);
}
if (hotSpottingNodes.getAndDecrement() > 0) {
// hot-spotting node
return new NodeUsageStatsForThreadPools(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -311,7 +312,7 @@ public static ClusterState state(String index, int numberOfNodes, int numberOfPr
* Creates cluster state with an index that has #(numberOfPrimaries) primary shards in the started state and no replicas.
* The cluster state contains #(numberOfNodes) nodes and assigns primaries to those nodes.
*/
public static ClusterState state(ProjectId projectId, String index, int numberOfNodes, int numberOfPrimaries) {
public static ClusterState state(ProjectId projectId, String indexName, int numberOfNodes, int numberOfPrimaries) {
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> nodes = new HashSet<>();
for (int i = 0; i < numberOfNodes; i++) {
Expand All @@ -321,7 +322,56 @@ public static ClusterState state(ProjectId projectId, String index, int numberOf
}
discoBuilder.localNodeId(newNode(0).getId());
discoBuilder.masterNodeId(randomFrom(nodes));
IndexMetadata indexMetadata = IndexMetadata.builder(index)
IndexState index = buildIndex(indexName, numberOfPrimaries, nodes);

ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metadata(
Metadata.builder().put(ProjectMetadata.builder(projectId).put(index.indexMetadata, false)).generateClusterUuidIfNeeded()
);
state.routingTable(GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(index.indexRoutingTableBuilder)).build());
return state.build();
}

/**
* Creates cluster state with an index that has #(numberOfPrimaries) primary shards in the started state and no replicas. The cluster
* state contains #(numberOfIndexNodes) nodes with {@link DiscoveryNodeRole#INDEX_ROLE}, assigning the primary shards to those nodes,
* and #(numberOfSearchNodes) nodes with {@link DiscoveryNodeRole#SEARCH_ROLE}.
*/
public static ClusterState buildServerlessRoleNodes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhubotang-wq this new utility method might be useful in your work, too, if you didn't already build something.

String indexName,
int numberOfPrimaries,
int numberOfIndexNodes,
int numberOfSearchNodes
) {
ProjectId projectId = Metadata.DEFAULT_PROJECT_ID;
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> indexNodeIds = new HashSet<>();
for (int i = 0; i < numberOfIndexNodes; i++) {
final DiscoveryNode node = DiscoveryNodeUtils.builder("index_" + i).roles(Set.of(DiscoveryNodeRole.INDEX_ROLE)).build();
discoBuilder = discoBuilder.add(node);
indexNodeIds.add(node.getId());
}
for (int i = 0; i < numberOfSearchNodes; i++) {
final DiscoveryNode node = DiscoveryNodeUtils.builder("search_" + i).roles(Set.of(DiscoveryNodeRole.SEARCH_ROLE)).build();
discoBuilder = discoBuilder.add(node);
}
discoBuilder.localNodeId(randomFrom(indexNodeIds));
discoBuilder.masterNodeId(randomFrom(indexNodeIds));
IndexState index = buildIndex(indexName, numberOfPrimaries, indexNodeIds);

ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metadata(
Metadata.builder().put(ProjectMetadata.builder(projectId).put(index.indexMetadata, false)).generateClusterUuidIfNeeded()
);
state.routingTable(GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(index.indexRoutingTableBuilder)).build());
return state.build();
}

public record IndexState(IndexMetadata indexMetadata, IndexRoutingTable.Builder indexRoutingTableBuilder) {}
Copy link
Contributor

@nicktindall nicktindall Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could this record be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

private static IndexState buildIndex(String indexName, int numberOfPrimaries, Set<String> nodeIds) {
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(indexSettings(IndexVersion.current(), numberOfPrimaries, 0).put(SETTING_CREATION_DATE, System.currentTimeMillis()))
.build();

Expand All @@ -330,16 +380,11 @@ public static ClusterState state(ProjectId projectId, String index, int numberOf
ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId);
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRouting(shardId, randomFrom(nodes), true, ShardRoutingState.STARTED)
TestShardRouting.newShardRouting(shardId, randomFrom(nodeIds), true, ShardRoutingState.STARTED)
);
indexRoutingTable.addIndexShard(indexShardRoutingBuilder);
}

ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
state.nodes(discoBuilder);
state.metadata(Metadata.builder().put(ProjectMetadata.builder(projectId).put(indexMetadata, false)).generateClusterUuidIfNeeded());
state.routingTable(GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(indexRoutingTable)).build());
return state.build();
return new IndexState(indexMetadata, indexRoutingTable);
}

/**
Expand All @@ -357,6 +402,8 @@ public static ClusterState state(int numberOfNodes, String[] indices, int number
* and assigns primaries to those nodes.
*/
public static ClusterState state(ProjectId projectId, int numberOfNodes, String[] indices, int numberOfPrimaries) {
assert numberOfPrimaries <= numberOfNodes
: "Requested " + numberOfPrimaries + " primary shards, but only " + numberOfNodes + " nodes";
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Set<String> nodes = new HashSet<>();
for (int i = 0; i < numberOfNodes; i++) {
Expand Down