diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 5d389ad5ef11a..9d9ec7c667266 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -24,8 +24,8 @@ import org.elasticsearch.cluster.EstimatedHeapUsage; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; -import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; +import org.elasticsearch.cluster.ThreadPoolUsage; +import org.elasticsearch.cluster.ThreadPoolUsageCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -79,7 +79,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.InternalSettingsPlugin; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; import org.junit.Assert; @@ -92,7 +91,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -135,11 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList( - InternalSettingsPlugin.class, - BogusEstimatedHeapUsagePlugin.class, - BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class - ); + return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusNodeThreadPoolUsageCollectorPlugin.class); } public void testLockTryingToDelete() throws Exception { @@ -313,12 +307,11 @@ public void testHeapUsageEstimateIsPresent() { public void testNodeWriteLoadsArePresent() { InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); ClusterInfoServiceUtils.refresh(clusterInfoService); - Map nodeThreadPoolStats = clusterInfoService.getClusterInfo() - .getNodeUsageStatsForThreadPools(); - assertNotNull(nodeThreadPoolStats); + var writeThreadPoolUsages = clusterInfoService.getClusterInfo().getWriteThreadPoolUsages(); + assertNotNull(writeThreadPoolUsages); /** Not collecting stats yet because allocation write load stats collection is disabled by default. * see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */ - assertTrue(nodeThreadPoolStats.isEmpty()); + assertTrue(writeThreadPoolUsages.isEmpty()); // Enable collection for node write loads. updateClusterSettings( @@ -332,23 +325,20 @@ public void testNodeWriteLoadsArePresent() { try { // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. ClusterInfoServiceUtils.refresh(clusterInfoService); - nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools(); + writeThreadPoolUsages = clusterInfoService.getClusterInfo().getWriteThreadPoolUsages(); - /** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation + /** Verify that each node has usage stats reported. The test {@link BogusNodeThreadPoolUsageCollector} implementation * generates random usage values */ ClusterState state = getInstanceFromNode(ClusterService.class).state(); - assertEquals(state.nodes().size(), nodeThreadPoolStats.size()); + assertEquals(state.nodes().size(), writeThreadPoolUsages.size()); for (DiscoveryNode node : state.nodes()) { - assertTrue(nodeThreadPoolStats.containsKey(node.getId())); - NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId()); - assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId())); - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools - .threadPoolUsageStatsMap() - .get(ThreadPool.Names.WRITE); - assertNotNull(writeThreadPoolStats); - assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); - assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); - assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + assertTrue(writeThreadPoolUsages.containsKey(node.getId())); + var sample = writeThreadPoolUsages.get(node.getId()).samples().getFirst(); + assertNotNull(sample); + assertThat(sample.timeNano(), greaterThanOrEqualTo(0L)); + assertThat(sample.totalThreads(), greaterThanOrEqualTo(0)); + assertThat(sample.averageUtilization(), greaterThanOrEqualTo(0.0f)); + assertThat(sample.averageQueueLatency(), greaterThanOrEqualTo(0L)); } } finally { updateClusterSettings( @@ -995,48 +985,46 @@ public ClusterService getClusterService() { } /** - * A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random - * {@link NodeUsageStatsForThreadPools} for each node in the cluster. + * A simple {@link ThreadPoolUsageCollector} implementation that creates and returns random + * {@link ThreadPoolUsage} for each node in the cluster. *

* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the * plugin system can pick it up and use it for the test set-up. */ - public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { + public static class BogusNodeThreadPoolUsageCollector implements ThreadPoolUsageCollector { - private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin; + private final BogusNodeThreadPoolUsageCollectorPlugin plugin; - public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) { + public BogusNodeThreadPoolUsageCollector(BogusNodeThreadPoolUsageCollectorPlugin plugin) { this.plugin = plugin; } @Override - public void collectUsageStats(ActionListener> listener) { + public void collectUsageStats(String threadPoolName, ActionListener> listener) { ActionListener.completeWith( listener, () -> plugin.getClusterService() .state() .nodes() .stream() - .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId()))) - ); - } - - private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) { - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - randomNonNegativeInt(), - randomFloat(), - randomNonNegativeLong() + .collect( + Collectors.toUnmodifiableMap( + DiscoveryNode::getId, + node -> new ThreadPoolUsage( + List.of( + new ThreadPoolUsage.Sample(randomNonNegativeLong(), randomInt(), randomFloat(), randomNonNegativeLong()) + ) + ) + ) + ) ); - Map statsForThreadPools = new HashMap<>(); - statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); - return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools); } } /** * Make a plugin to gain access to the {@link ClusterService} instance. */ - public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin { + public static class BogusNodeThreadPoolUsageCollectorPlugin extends Plugin implements ClusterPlugin { private final SetOnce clusterService = new SetOnce<>(); diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ThreadPoolUsageCollector similarity index 84% rename from server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector rename to server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ThreadPoolUsageCollector index 787ce436c3ca6..09cd2496399c1 100644 --- a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector +++ b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ThreadPoolUsageCollector @@ -7,4 +7,4 @@ # License v3.0 only", or the "Server Side Public License, v 1". # -org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector +org.elasticsearch.index.shard.IndexShardIT$BogusNodeThreadPoolUsageCollector diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 33172e30fb107..907c5a1552c52 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -58,7 +58,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map dataPath; final Map reservedSpace; final Map estimatedHeapUsages; - final Map nodeUsageStatsForThreadPools; + final Map writeThreadPoolUsages; final Map shardWriteLoads; protected ClusterInfo() { @@ -75,7 +75,7 @@ protected ClusterInfo() { * @param dataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path * @param estimatedHeapUsages estimated heap usage broken down by node - * @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node + * @param writeThreadPoolUsages node-level usage stats (operational load) broken down by node * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -86,7 +86,7 @@ public ClusterInfo( Map dataPath, Map reservedSpace, Map estimatedHeapUsages, - Map nodeUsageStatsForThreadPools, + Map writeThreadPoolUsages, Map shardWriteLoads ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); @@ -96,7 +96,7 @@ public ClusterInfo( this.dataPath = Map.copyOf(dataPath); this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); - this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools); + this.writeThreadPoolUsages = Map.copyOf(writeThreadPoolUsages); this.shardWriteLoads = Map.copyOf(shardWriteLoads); } @@ -115,9 +115,9 @@ public ClusterInfo(StreamInput in) throws IOException { this.estimatedHeapUsages = Map.of(); } if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { - this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new); + this.writeThreadPoolUsages = in.readImmutableMap(ThreadPoolUsage::fromStreamInput); } else { - this.nodeUsageStatsForThreadPools = Map.of(); + this.writeThreadPoolUsages = Map.of(); } if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble); @@ -142,7 +142,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable); } if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { - out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable); + out.writeMap(this.writeThreadPoolUsages, StreamOutput::writeWriteable); } if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble); @@ -244,10 +244,10 @@ public Map getEstimatedHeapUsages() { } /** - * Returns a map containing thread pool usage stats for each node, keyed by node ID. + * Returns a map containing write thread pool usage stats for each node, keyed by node ID. */ - public Map getNodeUsageStatsForThreadPools() { - return nodeUsageStatsForThreadPools; + public Map getWriteThreadPoolUsages() { + return writeThreadPoolUsages; } /** @@ -353,7 +353,7 @@ public boolean equals(Object o) { && dataPath.equals(that.dataPath) && reservedSpace.equals(that.reservedSpace) && estimatedHeapUsages.equals(that.estimatedHeapUsages) - && nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools) + && writeThreadPoolUsages.equals(that.writeThreadPoolUsages) && shardWriteLoads.equals(that.shardWriteLoads); } @@ -367,7 +367,7 @@ public int hashCode() { dataPath, reservedSpace, estimatedHeapUsages, - nodeUsageStatsForThreadPools, + writeThreadPoolUsages, shardWriteLoads ); } @@ -490,7 +490,7 @@ public static class Builder { private Map dataPath = Map.of(); private Map reservedSpace = Map.of(); private Map estimatedHeapUsages = Map.of(); - private Map nodeUsageStatsForThreadPools = Map.of(); + private Map writeThreadPoolUsages = Map.of(); private Map shardWriteLoads = Map.of(); public ClusterInfo build() { @@ -502,7 +502,7 @@ public ClusterInfo build() { dataPath, reservedSpace, estimatedHeapUsages, - nodeUsageStatsForThreadPools, + writeThreadPoolUsages, shardWriteLoads ); } @@ -542,8 +542,8 @@ public Builder estimatedHeapUsages(Map estimatedHeap return this; } - public Builder nodeUsageStatsForThreadPools(Map nodeUsageStatsForThreadPools) { - this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools; + public Builder writeThreadPoolUsages(Map writeThreadPoolUsages) { + this.writeThreadPoolUsages = writeThreadPoolUsages; return this; } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index fd9c62daebd29..45d813179dd5f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -34,7 +34,7 @@ public class ClusterInfoSimulator { private final Map shardDataSetSizes; private final Map dataPath; private final Map estimatedHeapUsages; - private final Map nodeThreadPoolUsageStats; + private final Map writeThreadPoolUsages; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -44,7 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) { this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages(); - this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + this.writeThreadPoolUsages = allocation.clusterInfo().getWriteThreadPoolUsages(); } /** @@ -159,7 +159,7 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - nodeThreadPoolUsageStats, + writeThreadPoolUsages, allocation.clusterInfo().getShardWriteLoads() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index d4ecec83ebc8c..f3b0c727cf752 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -95,7 +95,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile boolean diskThresholdEnabled; private volatile boolean estimatedHeapThresholdEnabled; - private volatile WriteLoadDeciderStatus writeLoadConstraintEnabled; + private volatile boolean writeLoadConstraintEnabled; private volatile TimeValue updateFrequency; private volatile TimeValue fetchTimeout; @@ -103,7 +103,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private volatile Map mostAvailableSpaceUsages; private volatile Map maxHeapPerNode; private volatile Map estimatedHeapUsagePerNode; - private volatile Map nodeThreadPoolUsageStatsPerNode; + private volatile Map writeThreadPoolUsagePerNode; private volatile IndicesStatsSummary indicesStatsSummary; private final ThreadPool threadPool; @@ -113,7 +113,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final Object mutex = new Object(); private final List> nextRefreshListeners = new ArrayList<>(); private final EstimatedHeapUsageCollector estimatedHeapUsageCollector; - private final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector; + private final ThreadPoolUsageCollector threadPoolUsageCollector; private AsyncRefresh currentRefresh; private RefreshScheduler refreshScheduler; @@ -125,18 +125,18 @@ public InternalClusterInfoService( ThreadPool threadPool, Client client, EstimatedHeapUsageCollector estimatedHeapUsageCollector, - NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector + ThreadPoolUsageCollector nodeUsageStatsForThreadPoolsCollector ) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); this.maxHeapPerNode = Map.of(); this.estimatedHeapUsagePerNode = Map.of(); - this.nodeThreadPoolUsageStatsPerNode = Map.of(); + this.writeThreadPoolUsagePerNode = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.threadPool = threadPool; this.client = client; this.estimatedHeapUsageCollector = estimatedHeapUsageCollector; - this.nodeUsageStatsForThreadPoolsCollector = nodeUsageStatsForThreadPoolsCollector; + this.threadPoolUsageCollector = nodeUsageStatsForThreadPoolsCollector; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); @@ -164,7 +164,7 @@ private void setEstimatedHeapThresholdEnabled(boolean estimatedHeapThresholdEnab } private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus writeLoadConstraintEnabled) { - this.writeLoadConstraintEnabled = writeLoadConstraintEnabled; + this.writeLoadConstraintEnabled = writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED; } private void setFetchTimeout(TimeValue fetchTimeout) { @@ -216,10 +216,10 @@ void execute() { logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED); + maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); - maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); + maybeFetchWriteThreadPoolUsage(writeLoadConstraintEnabled); } } @@ -258,28 +258,28 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) { } } - private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) { - if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) { + private void maybeFetchWriteThreadPoolUsage(boolean shouldFetch) { + if (shouldFetch) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchNodesUsageStatsForThreadPools(); + fetchWriteThreadPoolUsage(); } } else { logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled"); - nodeThreadPoolUsageStatsPerNode = Map.of(); + writeThreadPoolUsagePerNode = Map.of(); } } - private void fetchNodesUsageStatsForThreadPools() { - nodeUsageStatsForThreadPoolsCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() { + private void fetchWriteThreadPoolUsage() { + threadPoolUsageCollector.collectUsageStats(ThreadPool.Names.WRITE, ActionListener.releaseAfter(new ActionListener<>() { @Override - public void onResponse(Map writeLoads) { - nodeThreadPoolUsageStatsPerNode = writeLoads; + public void onResponse(Map threadPoolUsage) { + writeThreadPoolUsagePerNode = threadPoolUsage; } @Override public void onFailure(Exception e) { logger.warn("failed to fetch write load estimates for nodes", e); - nodeThreadPoolUsageStatsPerNode = Map.of(); + writeThreadPoolUsagePerNode = Map.of(); } }, fetchRefs.acquire())); } @@ -306,7 +306,7 @@ private void fetchIndicesStats() { // This returns the shard sizes on disk indicesStatsRequest.store(true); } - if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) { + if (writeLoadConstraintEnabled) { // This returns the shard write-loads indicesStatsRequest.indexing(true); } @@ -546,7 +546,7 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, estimatedHeapUsages, - nodeThreadPoolUsageStatsPerNode, + writeThreadPoolUsagePerNode, indicesStatsSummary.shardWriteLoads() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java deleted file mode 100644 index 5e84f29af8412..0000000000000 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.cluster; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; -import java.util.Map; -import java.util.Objects; - -/** - * Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name. - * - * @param nodeId The node ID. - * @param threadPoolUsageStatsMap A map of thread pool name ({@link org.elasticsearch.threadpool.ThreadPool.Names}) to the thread pool's - * usage stats ({@link ThreadPoolUsageStats}). - */ -public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap) implements Writeable { - - public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { - this(in.readString(), in.readMap(ThreadPoolUsageStats::new)); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(this.nodeId); - out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable); - } - - @Override - public int hashCode() { - return Objects.hash(nodeId, threadPoolUsageStatsMap); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; - for (var entry : other.threadPoolUsageStatsMap.entrySet()) { - var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); - if (loadStats == null || loadStats.equals(entry.getValue()) == false) { - return false; - } - } - return true; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=["); - for (var entry : threadPoolUsageStatsMap.entrySet()) { - builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}"); - } - builder.append("]}"); - return builder.toString(); - } - - /** - * Record of usage stats for a thread pool. - * - * @param totalThreadPoolThreads Total number of threads in the thread pool. - * @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time. - * @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued - * in the write thread pool. - */ - public record ThreadPoolUsageStats( - int totalThreadPoolThreads, - float averageThreadPoolUtilization, - long averageThreadPoolQueueLatencyMillis - ) implements Writeable { - - public ThreadPoolUsageStats(StreamInput in) throws IOException { - this(in.readVInt(), in.readFloat(), in.readVLong()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(this.totalThreadPoolThreads); - out.writeFloat(this.averageThreadPoolUtilization); - out.writeVLong(this.averageThreadPoolQueueLatencyMillis); - } - - @Override - public int hashCode() { - return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis); - } - - @Override - public String toString() { - return "[totalThreadPoolThreads=" - + totalThreadPoolThreads - + ", averageThreadPoolUtilization=" - + averageThreadPoolUtilization - + ", averageThreadPoolQueueLatencyMillis=" - + averageThreadPoolQueueLatencyMillis - + "]"; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; - return totalThreadPoolThreads == other.totalThreadPoolThreads - && averageThreadPoolUtilization == other.averageThreadPoolUtilization - && averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis; - } - - } // ThreadPoolUsageStats - -} diff --git a/server/src/main/java/org/elasticsearch/cluster/ThreadPoolUsage.java b/server/src/main/java/org/elasticsearch/cluster/ThreadPoolUsage.java new file mode 100644 index 0000000000000..29be090387114 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/ThreadPoolUsage.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.List; + +/** + * A collection of {@link Sample}s on a specific Node and ThreadPool. + */ +public record ThreadPoolUsage(List samples) implements Writeable { + + static ThreadPoolUsage fromStreamInput(StreamInput in) throws IOException { + return new ThreadPoolUsage(in.readCollectionAsImmutableList(Sample::fromStreamInput)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(samples); + } + + /** + * Record of usage stats for a thread pool. + * + * @param timeNano Sample time from System.nanoTime() + * @param totalThreads Total number of threads in the thread pool. + * @param averageUtilization Percent of thread pool threads that are in use, averaged over some period of time. + * @param averageQueueLatency How much time tasks spend in the thread pool queue. Zero if there is nothing being queued in the write + * thread pool. + */ + public record Sample(long timeNano, int totalThreads, float averageUtilization, long averageQueueLatency) implements Writeable { + + public static Sample fromStreamInput(StreamInput in) throws IOException { + return new Sample(in.readVLong(), in.readVInt(), in.readFloat(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(timeNano); + out.writeVInt(totalThreads); + out.writeFloat(averageUtilization); + out.writeVLong(averageQueueLatency); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java b/server/src/main/java/org/elasticsearch/cluster/ThreadPoolUsageCollector.java similarity index 73% rename from server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java rename to server/src/main/java/org/elasticsearch/cluster/ThreadPoolUsageCollector.java index e302a4abed559..f15028c4c85d1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPoolsCollector.java +++ b/server/src/main/java/org/elasticsearch/cluster/ThreadPoolUsageCollector.java @@ -18,16 +18,19 @@ *

* Results are returned as a map of node ID to node usage stats. */ -public interface NodeUsageStatsForThreadPoolsCollector { +public interface ThreadPoolUsageCollector { /** * This will be used when there is no NodeUsageLoadCollector available. */ - NodeUsageStatsForThreadPoolsCollector EMPTY = listener -> listener.onResponse(Map.of()); + static ThreadPoolUsageCollector noop() { + return (name, listener) -> listener.onResponse(Map.of()); + } /** - * Collects the write load estimates from the cluster. + * Collects usage for given thread-pool and sampling setting. * * @param listener The listener to receive the write load results. */ - void collectUsageStats(ActionListener> listener); + void collectUsageStats(String threadPoolName, ActionListener> listener); + } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index cba02ed207b81..e91cc7b9e8505 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -29,11 +29,11 @@ public enum WriteLoadDeciderStatus { */ DISABLED, /** - * Only the low-threshold is enabled (write-load will not trigger rebalance) + * Enables only for shard (re)allocations, but not balancing. */ - LOW_ONLY, + ALLOCATOR_ONLY, /** - * The decider is enabled + * Enables for allocations and balancing. */ ENABLED } diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 326002c7d346c..caeaf6d7cb8be 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -14,7 +14,7 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.EstimatedHeapUsageCollector; import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; +import org.elasticsearch.cluster.ThreadPoolUsageCollector; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -80,9 +80,9 @@ ClusterInfoService newClusterInfoService( EstimatedHeapUsageCollector.class, () -> EstimatedHeapUsageCollector.EMPTY ); - final NodeUsageStatsForThreadPoolsCollector nodeUsageStatsForThreadPoolsCollector = pluginsService.loadSingletonServiceProvider( - NodeUsageStatsForThreadPoolsCollector.class, - () -> NodeUsageStatsForThreadPoolsCollector.EMPTY + final ThreadPoolUsageCollector nodeUsageStatsForThreadPoolsCollector = pluginsService.loadSingletonServiceProvider( + ThreadPoolUsageCollector.class, + ThreadPoolUsageCollector::noop ); final InternalClusterInfoService service = new InternalClusterInfoService( settings, diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index e0e749aaa2360..79183356ed99e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -13,9 +13,9 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.threadpool.ThreadPool; import java.util.HashMap; +import java.util.List; import java.util.Map; public class ClusterInfoTests extends AbstractWireSerializingTestCase { @@ -44,7 +44,7 @@ public static ClusterInfo randomClusterInfo() { randomRoutingToDataPath(), randomReservedSpace(), randomNodeHeapUsage(), - randomNodeUsageStatsForThreadPools(), + randomWriteThreadPoolUsage(), randomShardWriteLoad() ); } @@ -74,19 +74,17 @@ private static Map randomNodeHeapUsage() { return nodeHeapUsage; } - private static Map randomNodeUsageStatsForThreadPools() { + private static Map randomWriteThreadPoolUsage() { int numEntries = randomIntBetween(0, 128); - Map nodeUsageStatsForThreadPools = new HashMap<>(numEntries); + Map nodeUsageStatsForThreadPools = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { String nodeIdKey = randomAlphaOfLength(32); - NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats = - new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16), - /* averageThreadPoolUtilization= */ randomFloat(), - /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) - ); - Map usageStatsForThreadPools = new HashMap<>(); - usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); - nodeUsageStatsForThreadPools.put(ThreadPool.Names.WRITE, new NodeUsageStatsForThreadPools(nodeIdKey, usageStatsForThreadPools)); + final var sample = new ThreadPoolUsage.Sample(/* timeNano */ randomNonNegativeLong(), + /* totalThreads= */ randomIntBetween(1, 16), + /* averageUtilization= */ randomFloat(), + /* averageQueueLatency= */ randomLongBetween(0, 50000) + ); + nodeUsageStatsForThreadPools.put(nodeIdKey, new ThreadPoolUsage(List.of(sample))); } return nodeUsageStatsForThreadPools; } diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 6e80e0d087993..8f6745cbf7c98 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -44,6 +44,7 @@ import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -84,9 +85,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final EstimatedHeapUsageCollector mockEstimatedHeapUsageCollector = spy(new StubEstimatedEstimatedHeapUsageCollector()); - final NodeUsageStatsForThreadPoolsCollector mockNodeUsageStatsForThreadPoolsCollector = spy( - new StubNodeUsageStatsForThreadPoolsCollector() - ); + final ThreadPoolUsageCollector mockNodeUsageStatsForThreadPoolsCollector = spy(new StubThreadPoolUsageCollector()); final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( settings, clusterService, @@ -138,7 +137,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval - verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(any()); + verify(mockNodeUsageStatsForThreadPoolsCollector).collectUsageStats(eq(ThreadPool.Names.WRITE), any()); } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -164,12 +163,12 @@ public void collectClusterHeapUsage(ActionListener> listener) } /** - * Simple for test {@link NodeUsageStatsForThreadPoolsCollector} implementation that returns an empty map of nodeId string to - * {@link NodeUsageStatsForThreadPools}. + * Simple for test {@link ThreadPoolUsageCollector} implementation that returns an empty map of nodeId string to + * {@link ThreadPoolUsage}. */ - private static class StubNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector { + private static class StubThreadPoolUsageCollector implements ThreadPoolUsageCollector { @Override - public void collectUsageStats(ActionListener> listener) { + public void collectUsageStats(String threadPoolName, ActionListener> listener) { listener.onResponse(Map.of()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 6b6136c6c861b..564a67d3b05dd 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {} private volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, NodeUsageStatsForThreadPoolsCollector.EMPTY); + super(settings, clusterService, threadPool, client, EstimatedHeapUsageCollector.EMPTY, ThreadPoolUsageCollector.noop()); } public void setDiskUsageFunctionAndRefresh(BiFunction diskUsageFn) {