Skip to content

Add shard write-load to cluster info #131496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -104,6 +105,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
Expand Down Expand Up @@ -355,6 +357,62 @@ public void testNodeWriteLoadsArePresent() {
}
}

public void testShardWriteLoadsArePresent() {
// Create some indices and some write-load
final int numIndices = randomIntBetween(1, 5);
final String indexPrefix = randomIdentifier();
IntStream.range(0, numIndices).forEach(i -> {
final String indexName = indexPrefix + "_" + i;
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)).build());
IntStream.range(0, randomIntBetween(1, 500))
.forEach(j -> prepareIndex(indexName).setSource("foo", randomIdentifier(), "bar", randomIdentifier()).get());
});

final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);

// Not collecting stats yet because allocation write load stats collection is disabled by default.
{
ClusterInfoServiceUtils.refresh(clusterInfoService);
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads();
assertNotNull(shardWriteLoads);
assertTrue(shardWriteLoads.isEmpty());
}

// Turn on collection of write-load stats.
updateClusterSettings(
Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build()
);

try {
// Force a ClusterInfo refresh to run collection of the write-load stats.
ClusterInfoServiceUtils.refresh(clusterInfoService);
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads();

// Verify that each shard has write-load reported.
final ClusterState state = getInstanceFromNode(ClusterService.class).state();
assertEquals(state.projectState(ProjectId.DEFAULT).metadata().getTotalNumberOfShards(), shardWriteLoads.size());
double maximumLoadRecorded = 0;
for (IndexMetadata indexMetadata : state.projectState(ProjectId.DEFAULT).metadata()) {
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
final ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
assertTrue(shardWriteLoads.containsKey(shardId));
maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded);
}
}
// And that at least one is greater than zero
assertThat(maximumLoadRecorded, greaterThan(0.0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this assert be per index? Since all the indices received writes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we insert between 1 and 500 documents in indices with between 1 and 3 shards, so it's possible some shards will not be written to. This is just a sanity check anyhow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I follow. Some shards won't receive writes, I agree. But all indices will have writes (to at least one shard), and thus maximumLoadRecorded would have a value per index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, you're right. For some reason I read that as per shard. Indeed it could be tightened up to be per index. I'll put up a small PR to do that.

} finally {
updateClusterSettings(
Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build()
);
}
}

public void testIndexCanChangeCustomDataPath() throws Exception {
final String index = "test-custom-data-path";
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
public static final TransportVersion PROJECT_STATE_REGISTRY_ENTRY = def(9_124_0_00);
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
46 changes: 39 additions & 7 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

/**
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
* and a map of shard ids to shard sizes, see
* and a map of shard ids to shard sizes and shard write-loads, see
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
*/
Expand All @@ -59,9 +59,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<NodeAndPath, ReservedSpace> reservedSpace;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class comment further up ^ could use an update at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfe5759

final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
final Map<ShardId, Double> shardWriteLoads;

protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -85,7 +86,8 @@ public ClusterInfo(
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
Map<ShardId, Double> shardWriteLoads
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
Expand All @@ -95,6 +97,7 @@ public ClusterInfo(
this.reservedSpace = Map.copyOf(reservedSpace);
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -116,6 +119,11 @@ public ClusterInfo(StreamInput in) throws IOException {
} else {
this.nodeUsageStatsForThreadPools = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble);
} else {
this.shardWriteLoads = Map.of();
}
}

@Override
Expand All @@ -136,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
}
}

/**
Expand Down Expand Up @@ -216,7 +227,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
return builder.endObject(); // NodeAndPath
}),
endArray() // end "reserved_sizes"
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid
// committing to API payloads until the features are settled
);
}
Expand Down Expand Up @@ -255,6 +266,16 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return this.mostAvailableSpaceUsage;
}

/**
* Returns a map of shard IDs to the write-loads for use in balancing. The write-loads can be interpreted
* as the average number of threads that ingestion to the shard will consume.
* This information may be partial or missing altogether under some circumstances. The absence of a shard
* write load from the map should be interpreted as "unknown".
*/
public Map<ShardId, Double> getShardWriteLoads() {
return shardWriteLoads;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the #equals, #hashCode methods below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we add equals and hashCode just for testing, but I updated them in cd3169f

/**
* Returns the shard size for the given shardId or <code>null</code> if that metric is not available.
*/
Expand Down Expand Up @@ -331,7 +352,9 @@ public boolean equals(Object o) {
&& shardDataSetSizes.equals(that.shardDataSetSizes)
&& dataPath.equals(that.dataPath)
&& reservedSpace.equals(that.reservedSpace)
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools);
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
&& shardWriteLoads.equals(that.shardWriteLoads);
}

@Override
Expand All @@ -343,7 +366,9 @@ public int hashCode() {
shardDataSetSizes,
dataPath,
reservedSpace,
nodeUsageStatsForThreadPools
estimatedHeapUsages,
nodeUsageStatsForThreadPools,
shardWriteLoads
);
}

Expand Down Expand Up @@ -466,6 +491,7 @@ public static class Builder {
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of();
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
private Map<ShardId, Double> shardWriteLoads = Map.of();

public ClusterInfo build() {
return new ClusterInfo(
Expand All @@ -476,7 +502,8 @@ public ClusterInfo build() {
dataPath,
reservedSpace,
estimatedHeapUsages,
nodeUsageStatsForThreadPools
nodeUsageStatsForThreadPools,
shardWriteLoads
);
}

Expand Down Expand Up @@ -519,5 +546,10 @@ public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadP
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
return this;
}

public Builder shardWriteLoads(Map<ShardId, Double> shardWriteLoads) {
this.shardWriteLoads = shardWriteLoads;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public ClusterInfo getClusterInfo() {
dataPath,
Map.of(),
estimatedHeapUsages,
nodeThreadPoolUsageStats
nodeThreadPoolUsageStats,
allocation.clusterInfo().getShardWriteLoads()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be saved as a private variable initialized in the ClusterInfoSimulator constructor? Similar to estimatedHeapUsages and nodeThreadPoolUsageStats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we won't use it at this level in the ClusterInfoSimulator and I don't think we'll modify it as part of the simulation either so I think it's fine to pass it through.

);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -215,7 +216,7 @@ void execute() {
logger.trace("starting async refresh");

try (var ignoredRefs = fetchRefs) {
maybeFetchIndicesStats(diskThresholdEnabled);
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED);
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled);
Expand Down Expand Up @@ -301,7 +302,14 @@ public void onFailure(Exception e) {
private void fetchIndicesStats() {
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
indicesStatsRequest.store(true);
if (diskThresholdEnabled) {
// This returns the shard sizes on disk
indicesStatsRequest.store(true);
}
if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) {
// This returns the shard write-loads
indicesStatsRequest.indexing(true);
}
indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN);
indicesStatsRequest.timeout(fetchTimeout);
client.admin()
Expand Down Expand Up @@ -350,13 +358,15 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
}

final ShardStats[] stats = indicesStatsResponse.getShards();
final Map<ShardId, Double> shardWriteLoadByIdentifierBuilder = new HashMap<>();
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>();
final Map<ShardId, Long> shardDataSetSizeBuilder = new HashMap<>();
final Map<ClusterInfo.NodeAndShard, String> dataPath = new HashMap<>();
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders =
new HashMap<>();
buildShardLevelInfo(
adjustShardStats(stats),
shardWriteLoadByIdentifierBuilder,
shardSizeByIdentifierBuilder,
shardDataSetSizeBuilder,
dataPath,
Expand All @@ -370,7 +380,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
Map.copyOf(shardSizeByIdentifierBuilder),
Map.copyOf(shardDataSetSizeBuilder),
Map.copyOf(dataPath),
Map.copyOf(reservedSpace)
Map.copyOf(reservedSpace),
Map.copyOf(shardWriteLoadByIdentifierBuilder)
);
}

Expand Down Expand Up @@ -527,8 +538,6 @@ public ClusterInfo getClusterInfo() {
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
}
});
final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats = new HashMap<>();
nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); });
Copy link
Contributor Author

@nicktindall nicktindall Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appeared to be just a copy, which already happens in the ClusterInfo constructor, I assume remnants of something that was since refactored away.

return new ClusterInfo(
leastAvailableSpaceUsages,
mostAvailableSpaceUsages,
Expand All @@ -537,7 +546,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.dataPath,
indicesStatsSummary.reservedSpace,
estimatedHeapUsages,
nodeThreadPoolUsageStats
nodeThreadPoolUsageStatsPerNode,
indicesStatsSummary.shardWriteLoads()
);
}

Expand Down Expand Up @@ -567,6 +577,7 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {

static void buildShardLevelInfo(
ShardStats[] stats,
Map<ShardId, Double> shardWriteLoads,
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizeBuilder,
Map<ClusterInfo.NodeAndShard, String> dataPathByShard,
Expand All @@ -577,25 +588,31 @@ static void buildShardLevelInfo(
dataPathByShard.put(ClusterInfo.NodeAndShard.from(shardRouting), s.getDataPath());

final StoreStats storeStats = s.getStats().getStore();
if (storeStats == null) {
continue;
}
final long size = storeStats.sizeInBytes();
final long dataSetSize = storeStats.totalDataSetSizeInBytes();
final long reserved = storeStats.reservedSizeInBytes();

final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved);
shardSizes.put(shardIdentifier, size);
if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) {
shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize);
if (storeStats != null) {
final long size = storeStats.sizeInBytes();
final long dataSetSize = storeStats.totalDataSetSizeInBytes();
final long reserved = storeStats.reservedSizeInBytes();

final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved);
shardSizes.put(shardIdentifier, size);
if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) {
shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize);
}
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) {
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent(
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()),
t -> new ClusterInfo.ReservedSpace.Builder()
);
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
}
}
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) {
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent(
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()),
t -> new ClusterInfo.ReservedSpace.Builder()
);
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
final IndexingStats indexingStats = s.getStats().getIndexing();
if (indexingStats != null) {
final double shardWriteLoad = indexingStats.getTotal().getPeakWriteLoad();
if (shardWriteLoad > shardWriteLoads.getOrDefault(shardRouting.shardId(), -1.0)) {
shardWriteLoads.put(shardRouting.shardId(), shardWriteLoad);
}
}
}
}
Expand Down Expand Up @@ -623,9 +640,10 @@ private record IndicesStatsSummary(
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizes,
Map<ClusterInfo.NodeAndShard, String> dataPath,
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace,
Map<ShardId, Double> shardWriteLoads
) {
static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of());
static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

}
Loading