Skip to content

Change ThreadPoolUsage from single to list of samples #131738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.cluster.EstimatedHeapUsage;
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
import org.elasticsearch.cluster.ThreadPoolUsage;
import org.elasticsearch.cluster.ThreadPoolUsageCollector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -79,7 +79,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Assert;

Expand All @@ -92,7 +91,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -135,11 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(
InternalSettingsPlugin.class,
BogusEstimatedHeapUsagePlugin.class,
BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class
);
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusNodeThreadPoolUsageCollectorPlugin.class);
}

public void testLockTryingToDelete() throws Exception {
Expand Down Expand Up @@ -313,12 +307,11 @@ public void testHeapUsageEstimateIsPresent() {
public void testNodeWriteLoadsArePresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolStats = clusterInfoService.getClusterInfo()
.getNodeUsageStatsForThreadPools();
assertNotNull(nodeThreadPoolStats);
var writeThreadPoolUsages = clusterInfoService.getClusterInfo().getWriteThreadPoolUsages();
assertNotNull(writeThreadPoolUsages);
/** Not collecting stats yet because allocation write load stats collection is disabled by default.
* see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */
assertTrue(nodeThreadPoolStats.isEmpty());
assertTrue(writeThreadPoolUsages.isEmpty());

// Enable collection for node write loads.
updateClusterSettings(
Expand All @@ -332,23 +325,20 @@ public void testNodeWriteLoadsArePresent() {
try {
// Force a ClusterInfo refresh to run collection of the node thread pool usage stats.
ClusterInfoServiceUtils.refresh(clusterInfoService);
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();
writeThreadPoolUsages = clusterInfoService.getClusterInfo().getWriteThreadPoolUsages();

/** Verify that each node has usage stats reported. The test {@link BogusNodeUsageStatsForThreadPoolsCollector} implementation
/** Verify that each node has usage stats reported. The test {@link BogusNodeThreadPoolUsageCollector} implementation
* generates random usage values */
ClusterState state = getInstanceFromNode(ClusterService.class).state();
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
assertEquals(state.nodes().size(), writeThreadPoolUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(nodeThreadPoolStats.containsKey(node.getId()));
NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId());
assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId()));
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools
.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
assertNotNull(writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
assertTrue(writeThreadPoolUsages.containsKey(node.getId()));
var sample = writeThreadPoolUsages.get(node.getId()).samples().getFirst();
assertNotNull(sample);
assertThat(sample.timeNano(), greaterThanOrEqualTo(0L));
assertThat(sample.totalThreads(), greaterThanOrEqualTo(0));
assertThat(sample.averageUtilization(), greaterThanOrEqualTo(0.0f));
assertThat(sample.averageQueueLatency(), greaterThanOrEqualTo(0L));
}
} finally {
updateClusterSettings(
Expand Down Expand Up @@ -995,48 +985,46 @@ public ClusterService getClusterService() {
}

/**
* A simple {@link NodeUsageStatsForThreadPoolsCollector} implementation that creates and returns random
* {@link NodeUsageStatsForThreadPools} for each node in the cluster.
* A simple {@link ThreadPoolUsageCollector} implementation that creates and returns random
* {@link ThreadPoolUsage} for each node in the cluster.
* <p>
* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the
* plugin system can pick it up and use it for the test set-up.
*/
public static class BogusNodeUsageStatsForThreadPoolsCollector implements NodeUsageStatsForThreadPoolsCollector {
public static class BogusNodeThreadPoolUsageCollector implements ThreadPoolUsageCollector {

private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin;
private final BogusNodeThreadPoolUsageCollectorPlugin plugin;

public BogusNodeUsageStatsForThreadPoolsCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) {
public BogusNodeThreadPoolUsageCollector(BogusNodeThreadPoolUsageCollectorPlugin plugin) {
this.plugin = plugin;
}

@Override
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
public void collectUsageStats(String threadPoolName, ActionListener<Map<String, ThreadPoolUsage>> listener) {
ActionListener.completeWith(
listener,
() -> plugin.getClusterService()
.state()
.nodes()
.stream()
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId())))
);
}

private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
randomNonNegativeInt(),
randomFloat(),
randomNonNegativeLong()
.collect(
Collectors.toUnmodifiableMap(
DiscoveryNode::getId,
node -> new ThreadPoolUsage(
List.of(
new ThreadPoolUsage.Sample(randomNonNegativeLong(), randomInt(), randomFloat(), randomNonNegativeLong())
)
)
)
)
);
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
}
}

/**
* Make a plugin to gain access to the {@link ClusterService} instance.
*/
public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin {
public static class BogusNodeThreadPoolUsageCollectorPlugin extends Plugin implements ClusterPlugin {

private final SetOnce<ClusterService> clusterService = new SetOnce<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
# License v3.0 only", or the "Server Side Public License, v 1".
#

org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageStatsForThreadPoolsCollector
org.elasticsearch.index.shard.IndexShardIT$BogusNodeThreadPoolUsageCollector
32 changes: 16 additions & 16 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
final Map<String, ThreadPoolUsage> writeThreadPoolUsages;
final Map<ShardId, Double> shardWriteLoads;

protected ClusterInfo() {
Expand All @@ -75,7 +75,7 @@ protected ClusterInfo() {
* @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param estimatedHeapUsages estimated heap usage broken down by node
* @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node
* @param writeThreadPoolUsages node-level usage stats (operational load) broken down by node
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -86,7 +86,7 @@ public ClusterInfo(
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
Map<String, ThreadPoolUsage> writeThreadPoolUsages,
Map<ShardId, Double> shardWriteLoads
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
Expand All @@ -96,7 +96,7 @@ public ClusterInfo(
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
this.writeThreadPoolUsages = Map.copyOf(writeThreadPoolUsages);
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
}

Expand All @@ -115,9 +115,9 @@ public ClusterInfo(StreamInput in) throws IOException {
this.estimatedHeapUsages = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new);
this.writeThreadPoolUsages = in.readImmutableMap(ThreadPoolUsage::fromStreamInput);
} else {
this.nodeUsageStatsForThreadPools = Map.of();
this.writeThreadPoolUsages = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble);
Expand All @@ -142,7 +142,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable);
out.writeMap(this.writeThreadPoolUsages, StreamOutput::writeWriteable);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
Expand Down Expand Up @@ -244,10 +244,10 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
}

/**
* Returns a map containing thread pool usage stats for each node, keyed by node ID.
* Returns a map containing write thread pool usage stats for each node, keyed by node ID.
*/
public Map<String, NodeUsageStatsForThreadPools> getNodeUsageStatsForThreadPools() {
return nodeUsageStatsForThreadPools;
public Map<String, ThreadPoolUsage> getWriteThreadPoolUsages() {
return writeThreadPoolUsages;
}

/**
Expand Down Expand Up @@ -353,7 +353,7 @@ public boolean equals(Object o) {
&& dataPath.equals(that.dataPath)
&& reservedSpace.equals(that.reservedSpace)
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
&& writeThreadPoolUsages.equals(that.writeThreadPoolUsages)
&& shardWriteLoads.equals(that.shardWriteLoads);
}

Expand All @@ -367,7 +367,7 @@ public int hashCode() {
dataPath,
reservedSpace,
estimatedHeapUsages,
nodeUsageStatsForThreadPools,
writeThreadPoolUsages,
shardWriteLoads
);
}
Expand Down Expand Up @@ -490,7 +490,7 @@ public static class Builder {
private Map<NodeAndShard, String> dataPath = Map.of();
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of();
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
private Map<String, ThreadPoolUsage> writeThreadPoolUsages = Map.of();
private Map<ShardId, Double> shardWriteLoads = Map.of();

public ClusterInfo build() {
Expand All @@ -502,7 +502,7 @@ public ClusterInfo build() {
dataPath,
reservedSpace,
estimatedHeapUsages,
nodeUsageStatsForThreadPools,
writeThreadPoolUsages,
shardWriteLoads
);
}
Expand Down Expand Up @@ -542,8 +542,8 @@ public Builder estimatedHeapUsages(Map<String, EstimatedHeapUsage> estimatedHeap
return this;
}

public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools) {
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
public Builder writeThreadPoolUsages(Map<String, ThreadPoolUsage> writeThreadPoolUsages) {
this.writeThreadPoolUsages = writeThreadPoolUsages;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ClusterInfoSimulator {
private final Map<ShardId, Long> shardDataSetSizes;
private final Map<NodeAndShard, String> dataPath;
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
private final Map<String, ThreadPoolUsage> writeThreadPoolUsages;

public ClusterInfoSimulator(RoutingAllocation allocation) {
this.allocation = allocation;
Expand All @@ -44,7 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
this.writeThreadPoolUsages = allocation.clusterInfo().getWriteThreadPoolUsages();
}

/**
Expand Down Expand Up @@ -159,7 +159,7 @@ public ClusterInfo getClusterInfo() {
dataPath,
Map.of(),
estimatedHeapUsages,
nodeThreadPoolUsageStats,
writeThreadPoolUsages,
allocation.clusterInfo().getShardWriteLoads()
);
}
Expand Down
Loading