From 6d1093f7c64a616b24d998b45da5293364572ded Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 18 Jul 2025 13:22:26 +1000 Subject: [PATCH 01/11] Add shard write-load to cluster info --- .../index/shard/IndexShardIT.java | 58 +++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/cluster/ClusterInfo.java | 31 +++++++- .../cluster/ClusterInfoSimulator.java | 3 +- .../cluster/InternalClusterInfoService.java | 70 ++++++++++++------- .../cluster/ClusterInfoTests.java | 12 +++- .../elasticsearch/cluster/DiskUsageTests.java | 10 ++- .../DesiredBalanceReconcilerTests.java | 1 + .../decider/DiskThresholdDeciderTests.java | 12 +++- .../DiskThresholdDeciderUnitTests.java | 3 + 10 files changed, 168 insertions(+), 33 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 25ae21964ba0e..5508c501b30a2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; @@ -104,6 +105,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; @@ -355,6 +357,62 @@ public void testNodeWriteLoadsArePresent() { } } + public void testShardWriteLoadsArePresent() { + // Create some indices and some write-load + final int numIndices = randomIntBetween(1, 5); + final String indexPrefix = randomIdentifier(); + IntStream.range(0, numIndices).forEach(i -> { + final String indexName = indexPrefix + "_" + i; + createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)).build()); + IntStream.range(0, randomIntBetween(1, 500)) + .forEach(j -> prepareIndex(indexName).setSource("foo", randomIdentifier(), "bar", randomIdentifier()).get()); + }); + + final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + + // Not collecting stats yet because allocation write load stats collection is disabled by default. + { + ClusterInfoServiceUtils.refresh(clusterInfoService); + final Map shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); + assertNotNull(shardWriteLoads); + assertTrue(shardWriteLoads.isEmpty()); + } + + // Enable collection for node write loads. + updateClusterSettings( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ); + + try { + // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. + ClusterInfoServiceUtils.refresh(clusterInfoService); + final Map shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); + + // Verify that each shard has write-load reported. + final ClusterState state = getInstanceFromNode(ClusterService.class).state(); + assertEquals(state.projectState(ProjectId.DEFAULT).metadata().getTotalNumberOfShards(), shardWriteLoads.size()); + double maximumLoadRecorded = 0; + for (IndexMetadata indexMetadata : state.projectState(ProjectId.DEFAULT).metadata()) { + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + final ShardId shardId = new ShardId(indexMetadata.getIndex(), i); + assertTrue(shardWriteLoads.containsKey(shardId)); + maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded); + } + } + // And that at least one is greater than zero + assertThat(maximumLoadRecorded, greaterThan(0.0)); + } finally { + updateClusterSettings( + Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build() + ); + } + } + public void testIndexCanChangeCustomDataPath() throws Exception { final String index = "test-custom-data-path"; final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 11a0103cd22e0..d5aa9e78a8c8a 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -342,6 +342,7 @@ static TransportVersion def(int id) { public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); public static final TransportVersion ESQL_CATEGORIZE_OPTIONS = def(9_122_0_00); public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00); + public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_124_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 6d11700500c24..802bd1a83fca7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -59,9 +59,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map reservedSpace; final Map estimatedHeapUsages; final Map nodeUsageStatsForThreadPools; + final Map shardWriteLoads; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -85,7 +86,8 @@ public ClusterInfo( Map dataPath, Map reservedSpace, Map estimatedHeapUsages, - Map nodeUsageStatsForThreadPools + Map nodeUsageStatsForThreadPools, + Map shardWriteLoads ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); @@ -95,6 +97,7 @@ public ClusterInfo( this.reservedSpace = Map.copyOf(reservedSpace); this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages); this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools); + this.shardWriteLoads = Map.copyOf(shardWriteLoads); } public ClusterInfo(StreamInput in) throws IOException { @@ -116,6 +119,11 @@ public ClusterInfo(StreamInput in) throws IOException { } else { this.nodeUsageStatsForThreadPools = Map.of(); } + if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { + this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble); + } else { + this.shardWriteLoads = Map.of(); + } } @Override @@ -255,6 +263,16 @@ public Map getNodeMostAvailableDiskUsages() { return this.mostAvailableSpaceUsage; } + /** + * Returns a map of shard IDs to the write-loads for use in balancing. The write-loads can be interpreted + * as the average number of threads that ingestion to the shard will consume. + * This information may be partial or missing altogether under some circumstances. The absence of a shard + * write load from the map should be interpreted as "unknown". + */ + public Map getShardWriteLoads() { + return shardWriteLoads; + } + /** * Returns the shard size for the given shardId or null if that metric is not available. */ @@ -466,6 +484,7 @@ public static class Builder { private Map reservedSpace = Map.of(); private Map estimatedHeapUsages = Map.of(); private Map nodeUsageStatsForThreadPools = Map.of(); + private Map shardWriteLoads = Map.of(); public ClusterInfo build() { return new ClusterInfo( @@ -476,7 +495,8 @@ public ClusterInfo build() { dataPath, reservedSpace, estimatedHeapUsages, - nodeUsageStatsForThreadPools + nodeUsageStatsForThreadPools, + shardWriteLoads ); } @@ -519,5 +539,10 @@ public Builder nodeUsageStatsForThreadPools(Map shardWriteLoads) { + this.shardWriteLoads = shardWriteLoads; + return this; + } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 7e995404191d6..fd9c62daebd29 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -159,7 +159,8 @@ public ClusterInfo getClusterInfo() { dataPath, Map.of(), estimatedHeapUsages, - nodeThreadPoolUsageStats + nodeThreadPoolUsageStats, + allocation.clusterInfo().getShardWriteLoads() ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 89394c8fa8ba8..fbe449d908727 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.threadpool.ThreadPool; @@ -215,7 +216,7 @@ void execute() { logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - maybeFetchIndicesStats(diskThresholdEnabled); + maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED); maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled); maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled); maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled); @@ -301,7 +302,14 @@ public void onFailure(Exception e) { private void fetchIndicesStats() { final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.clear(); - indicesStatsRequest.store(true); + if (diskThresholdEnabled) { + // This returns the shard sizes on disk + indicesStatsRequest.store(true); + } + if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) { + // This returns the shard write-loads + indicesStatsRequest.indexing(true); + } indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN); indicesStatsRequest.timeout(fetchTimeout); client.admin() @@ -350,6 +358,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { } final ShardStats[] stats = indicesStatsResponse.getShards(); + final Map shardWriteLoadByIdentifierBuilder = new HashMap<>(); final Map shardSizeByIdentifierBuilder = new HashMap<>(); final Map shardDataSetSizeBuilder = new HashMap<>(); final Map dataPath = new HashMap<>(); @@ -357,6 +366,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { new HashMap<>(); buildShardLevelInfo( adjustShardStats(stats), + shardWriteLoadByIdentifierBuilder, shardSizeByIdentifierBuilder, shardDataSetSizeBuilder, dataPath, @@ -370,7 +380,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { Map.copyOf(shardSizeByIdentifierBuilder), Map.copyOf(shardDataSetSizeBuilder), Map.copyOf(dataPath), - Map.copyOf(reservedSpace) + Map.copyOf(reservedSpace), + Map.copyOf(shardWriteLoadByIdentifierBuilder) ); } @@ -527,8 +538,6 @@ public ClusterInfo getClusterInfo() { estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); } }); - final Map nodeThreadPoolUsageStats = new HashMap<>(); - nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, @@ -537,7 +546,8 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, estimatedHeapUsages, - nodeThreadPoolUsageStats + Map.copyOf(nodeThreadPoolUsageStatsPerNode), + indicesStatsSummary.shardWriteLoads() ); } @@ -567,6 +577,7 @@ public void addListener(Consumer clusterInfoConsumer) { static void buildShardLevelInfo( ShardStats[] stats, + Map shardWriteLoads, Map shardSizes, Map shardDataSetSizeBuilder, Map dataPathByShard, @@ -577,25 +588,31 @@ static void buildShardLevelInfo( dataPathByShard.put(ClusterInfo.NodeAndShard.from(shardRouting), s.getDataPath()); final StoreStats storeStats = s.getStats().getStore(); - if (storeStats == null) { - continue; - } - final long size = storeStats.sizeInBytes(); - final long dataSetSize = storeStats.totalDataSetSizeInBytes(); - final long reserved = storeStats.reservedSizeInBytes(); - - final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); - logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); - shardSizes.put(shardIdentifier, size); - if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) { - shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize); + if (storeStats != null) { + final long size = storeStats.sizeInBytes(); + final long dataSetSize = storeStats.totalDataSetSizeInBytes(); + final long reserved = storeStats.reservedSizeInBytes(); + + final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); + logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); + shardSizes.put(shardIdentifier, size); + if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) { + shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize); + } + if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { + final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( + new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), + t -> new ClusterInfo.ReservedSpace.Builder() + ); + reservedSpaceBuilder.add(shardRouting.shardId(), reserved); + } } - if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { - final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( - new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), - t -> new ClusterInfo.ReservedSpace.Builder() - ); - reservedSpaceBuilder.add(shardRouting.shardId(), reserved); + final IndexingStats indexingStats = s.getStats().getIndexing(); + if (indexingStats != null) { + final double shardWriteLoad = indexingStats.getTotal().getPeakWriteLoad(); + if (shardWriteLoad > shardWriteLoads.getOrDefault(shardRouting.shardId(), -1.0)) { + shardWriteLoads.put(shardRouting.shardId(), shardWriteLoad); + } } } } @@ -623,9 +640,10 @@ private record IndicesStatsSummary( Map shardSizes, Map shardDataSetSizes, Map dataPath, - Map reservedSpace + Map reservedSpace, + Map shardWriteLoads ) { - static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of()); + static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 814aa102ce284..e0e749aaa2360 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -44,10 +44,20 @@ public static ClusterInfo randomClusterInfo() { randomRoutingToDataPath(), randomReservedSpace(), randomNodeHeapUsage(), - randomNodeUsageStatsForThreadPools() + randomNodeUsageStatsForThreadPools(), + randomShardWriteLoad() ); } + private static Map randomShardWriteLoad() { + final int numEntries = randomIntBetween(0, 128); + final Map builder = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + builder.put(randomShardId(), randomDouble()); + } + return builder; + } + private static Map randomNodeHeapUsage() { int numEntries = randomIntBetween(0, 128); Map nodeHeapUsage = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 80c2395ae9644..3eacc0c4fcec0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -135,9 +135,17 @@ public void testFillShardLevelInfo() { 0 ) }; Map shardSizes = new HashMap<>(); + HashMap shardWriteLoads = new HashMap<>(); Map shardDataSetSizes = new HashMap<>(); Map routingToPath = new HashMap<>(); - InternalClusterInfoService.buildShardLevelInfo(stats, shardSizes, shardDataSetSizes, routingToPath, new HashMap<>()); + InternalClusterInfoService.buildShardLevelInfo( + stats, + shardWriteLoads, + shardSizes, + shardDataSetSizes, + routingToPath, + new HashMap<>() + ); assertThat( shardSizes, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 37646d376f8fd..1f8d59a958bfe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -621,6 +621,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), + ImmutableOpenMap.of(), ImmutableOpenMap.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index f85c2678e04e7..c4ca84e6e977f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1406,7 +1406,17 @@ static class DevNullClusterInfo extends ClusterInfo { Map shardSizes, Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of(), Map.of()); + super( + leastAvailableSpaceUsage, + mostAvailableSpaceUsage, + shardSizes, + Map.of(), + Map.of(), + reservedSpace, + Map.of(), + Map.of(), + Map.of() + ); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 117afe0cec877..debb4343931d7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -110,6 +110,7 @@ public void testCanAllocateUsesMaxAvailableSpace() { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -183,6 +184,7 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -330,6 +332,7 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) { shardRoutingMap, Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( From 4fcea56f26a14965d7ad246f4f0f9f3776a71f02 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 18 Jul 2025 13:34:18 +1000 Subject: [PATCH 02/11] Fix ClusterInfo serialization --- .../src/main/java/org/elasticsearch/cluster/ClusterInfo.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 802bd1a83fca7..d1e2387d44053 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -144,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) { out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable); } + if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) { + out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble); + } } /** From db99e30c8d5664f16a155cdfe73d7de411292ac6 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 18 Jul 2025 13:36:08 +1000 Subject: [PATCH 03/11] Remove redundant copy --- .../org/elasticsearch/cluster/InternalClusterInfoService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index fbe449d908727..d4ecec83ebc8c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -546,7 +546,7 @@ public ClusterInfo getClusterInfo() { indicesStatsSummary.dataPath, indicesStatsSummary.reservedSpace, estimatedHeapUsages, - Map.copyOf(nodeThreadPoolUsageStatsPerNode), + nodeThreadPoolUsageStatsPerNode, indicesStatsSummary.shardWriteLoads() ); } From b24aa2319d65a1387b386a0c65e85732b08b7cd4 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 18 Jul 2025 13:38:55 +1000 Subject: [PATCH 04/11] Fix build break --- .../xpack/autoscaling/storage/ReactiveStorageDeciderService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 6c4066a447b67..c76a88b0da2f9 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -961,6 +961,7 @@ private ExtendedClusterInfo(Map extraShardSizes, ClusterInfo info) Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); this.delegate = info; From 529cba3b8b6aecab1426a27b758115e61629732a Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Fri, 18 Jul 2025 14:08:13 +1000 Subject: [PATCH 05/11] Note that we're not rendering shard write loads yet --- server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index d1e2387d44053..58f8993e3c529 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -227,7 +227,7 @@ public Iterator toXContentChunked(ToXContent.Params params return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" - // NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid + // NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid // committing to API payloads until the features are settled ); } From 1bb0203870542ed90ea37f263c7f248af0a361c7 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 13:27:04 +1000 Subject: [PATCH 06/11] Apply suggestion from @DiannaHohensee Co-authored-by: Dianna Hohensee --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 5508c501b30a2..f93ef1f427256 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -378,7 +378,7 @@ public void testShardWriteLoadsArePresent() { assertTrue(shardWriteLoads.isEmpty()); } - // Enable collection for node write loads. + // Turn on collection of write load stats. updateClusterSettings( Settings.builder() .put( From cd3169fabdc21c00193a89b9b6f3d06c4e7a0a4f Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 13:38:18 +1000 Subject: [PATCH 07/11] Add missing fields to equals/hashCode --- .../main/java/org/elasticsearch/cluster/ClusterInfo.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 58f8993e3c529..5c322fe31e6ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -352,7 +352,9 @@ public boolean equals(Object o) { && shardDataSetSizes.equals(that.shardDataSetSizes) && dataPath.equals(that.dataPath) && reservedSpace.equals(that.reservedSpace) - && nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools); + && estimatedHeapUsages.equals(that.estimatedHeapUsages) + && nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools) + && shardWriteLoads.equals(that.shardWriteLoads); } @Override @@ -364,7 +366,9 @@ public int hashCode() { shardDataSetSizes, dataPath, reservedSpace, - nodeUsageStatsForThreadPools + estimatedHeapUsages, + nodeUsageStatsForThreadPools, + shardWriteLoads ); } From 45020c5859e4cd7488de69fb19659ee8a815f4c2 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 13:48:47 +1000 Subject: [PATCH 08/11] Type --- .../src/test/java/org/elasticsearch/cluster/DiskUsageTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 3eacc0c4fcec0..4eb03d700d6aa 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -135,7 +135,7 @@ public void testFillShardLevelInfo() { 0 ) }; Map shardSizes = new HashMap<>(); - HashMap shardWriteLoads = new HashMap<>(); + Map shardWriteLoads = new HashMap<>(); Map shardDataSetSizes = new HashMap<>(); Map routingToPath = new HashMap<>(); InternalClusterInfoService.buildShardLevelInfo( From c5ceb92c00840dd2f6b47693d7689833a0c01def Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 14:02:41 +1000 Subject: [PATCH 09/11] Test that shard write load is extracted --- .../elasticsearch/cluster/DiskUsageTests.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 4eb03d700d6aa..3551971f0daa0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.StoreStats; @@ -107,6 +108,7 @@ public void testFillShardLevelInfo() { Path test0Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("0"); CommonStats commonStats0 = new CommonStats(); commonStats0.store = new StoreStats(100, 101, 0L); + commonStats0.indexing = randomIndexingStats(); ShardRouting test_1 = ShardRouting.newUnassigned( new ShardId(index, 1), false, @@ -119,8 +121,10 @@ public void testFillShardLevelInfo() { Path test1Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("1"); CommonStats commonStats1 = new CommonStats(); commonStats1.store = new StoreStats(1000, 1001, 0L); + commonStats1.indexing = randomIndexingStats(); CommonStats commonStats2 = new CommonStats(); commonStats2.store = new StoreStats(1000, 999, 0L); + commonStats2.indexing = randomIndexingStats(); ShardStats[] stats = new ShardStats[] { new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0, null, null, null, false, 0), new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1, null, null, null, false, 0), @@ -166,6 +170,41 @@ public void testFillShardLevelInfo() { hasEntry(ClusterInfo.NodeAndShard.from(test_1), test1Path.getParent().getParent().getParent().toAbsolutePath().toString()) ) ); + + assertThat( + shardWriteLoads, + equalTo( + Map.of( + test_0.shardId(), + commonStats0.indexing.getTotal().getPeakWriteLoad(), + test_1.shardId(), + Math.max(commonStats1.indexing.getTotal().getPeakWriteLoad(), commonStats2.indexing.getTotal().getPeakWriteLoad()) + ) + ) + ); + } + + private IndexingStats randomIndexingStats() { + return new IndexingStats( + new IndexingStats.Stats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomMillisUpToYear9999(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomBoolean(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomDoubleBetween(0d, 10d, true), + randomDoubleBetween(0d, 10d, true) + ) + ); } public void testLeastAndMostAvailableDiskSpace() { From 78866599b245e419eb57720a70ab0af748790806 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 14:14:25 +1000 Subject: [PATCH 10/11] Fix copy/paste comment --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index f93ef1f427256..5d389ad5ef11a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -378,7 +378,7 @@ public void testShardWriteLoadsArePresent() { assertTrue(shardWriteLoads.isEmpty()); } - // Turn on collection of write load stats. + // Turn on collection of write-load stats. updateClusterSettings( Settings.builder() .put( @@ -389,7 +389,7 @@ public void testShardWriteLoadsArePresent() { ); try { - // Force a ClusterInfo refresh to run collection of the node thread pool usage stats. + // Force a ClusterInfo refresh to run collection of the write-load stats. ClusterInfoServiceUtils.refresh(clusterInfoService); final Map shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads(); From cfe5759c59ae1f39ce26a3bfc31ed9dea93abb18 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 21 Jul 2025 14:24:49 +1000 Subject: [PATCH 11/11] Add write loads to class comment --- server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 5c322fe31e6ca..33172e30fb107 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -41,7 +41,7 @@ /** * ClusterInfo is an object representing a map of nodes to {@link DiskUsage} - * and a map of shard ids to shard sizes, see + * and a map of shard ids to shard sizes and shard write-loads, see * InternalClusterInfoService.shardIdentifierFromRouting(String) * for the key used in the shardSizes map */