Skip to content

Commit fa46b87

Browse files
Threadpool merge scheduler (#120869)
This adds a new merge scheduler implementation that uses a (new) dedicated thread pool to run the merges. This way the number of concurrent merges is limited to the number of threads in the pool (i.e. the number of allocated processors to the ES JVM). It implements dynamic IO throttling (the same target IO rate for all merges, roughly, with caveats) that's adjusted based on the number of currently active (queued + running) merges. Smaller merges are always preferred to larger ones, irrespective of the index shard that they're coming from. The implementation also supports the per-shard "max thread count" and "max merge count" settings, the later being used today for indexing throttling. Note that IO throttling, max merge count, and max thread count work similarly, but not identical, to their siblings in the ConcurrentMergeScheduler. The per-shard merge statistics are not affected, and the thread-pool statistics should reflect the merge ones (i.e. the completed thread pool stats reflects the total number of merges, across shards, per node).
1 parent cace905 commit fa46b87

File tree

30 files changed

+2682
-60
lines changed

30 files changed

+2682
-60
lines changed

docs/changelog/120869.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120869
2+
summary: Threadpool merge scheduler
3+
area: Engine
4+
type: feature
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,40 @@
88
*/
99
package org.elasticsearch.index.engine;
1010

11+
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
12+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
1113
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1214
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1315
import org.elasticsearch.action.bulk.BulkResponse;
1416
import org.elasticsearch.action.index.IndexRequest;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.index.query.QueryBuilders;
1519
import org.elasticsearch.test.ESIntegTestCase;
1620
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
1721
import org.elasticsearch.test.ESIntegTestCase.Scope;
22+
import org.elasticsearch.threadpool.ThreadPool;
1823

1924
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
25+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
2026
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
2127
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
2228
import static org.hamcrest.Matchers.equalTo;
2329
import static org.hamcrest.Matchers.lessThan;
2430
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2531

26-
@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE)
32+
@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0, scope = Scope.TEST)
2733
public class InternalEngineMergeIT extends ESIntegTestCase {
2834

35+
private boolean useThreadPoolMerging;
36+
37+
@Override
38+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
39+
useThreadPoolMerging = randomBoolean();
40+
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
41+
settings.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), useThreadPoolMerging);
42+
return settings.build();
43+
}
44+
2945
public void testMergesHappening() throws Exception {
3046
final int numOfShards = randomIntBetween(1, 5);
3147
// some settings to keep num segments low
@@ -83,4 +99,60 @@ public void testMergesHappening() throws Exception {
8399
assertThat(count, lessThanOrEqualTo(upperNumberSegments));
84100
}
85101

102+
public void testMergesUseTheMergeThreadPool() throws Exception {
103+
final String indexName = randomIdentifier();
104+
createIndex(indexName, indexSettings(randomIntBetween(1, 3), 0).build());
105+
long id = 0;
106+
final int minMerges = randomIntBetween(1, 5);
107+
long totalDocs = 0;
108+
109+
while (true) {
110+
int docs = randomIntBetween(100, 200);
111+
totalDocs += docs;
112+
113+
BulkRequestBuilder request = client().prepareBulk();
114+
for (int j = 0; j < docs; ++j) {
115+
request.add(
116+
new IndexRequest(indexName).id(Long.toString(id++))
117+
.source(jsonBuilder().startObject().field("l", randomLong()).endObject())
118+
);
119+
}
120+
BulkResponse response = request.get();
121+
assertNoFailures(response);
122+
refresh(indexName);
123+
124+
var mergesResponse = client().admin().indices().prepareStats(indexName).clear().setMerge(true).get();
125+
var primaries = mergesResponse.getIndices().get(indexName).getPrimaries();
126+
if (primaries.merge.getTotal() >= minMerges) {
127+
break;
128+
}
129+
}
130+
131+
forceMerge();
132+
refresh(indexName);
133+
134+
// after a force merge there should only be 1 segment per shard
135+
var shardsWithMultipleSegments = getShardSegments().stream()
136+
.filter(shardSegments -> shardSegments.getSegments().size() > 1)
137+
.toList();
138+
assertTrue("there are shards with multiple segments " + shardsWithMultipleSegments, shardsWithMultipleSegments.isEmpty());
139+
140+
final long expectedTotalDocs = totalDocs;
141+
assertHitCount(prepareSearch(indexName).setQuery(QueryBuilders.matchAllQuery()).setTrackTotalHits(true), expectedTotalDocs);
142+
143+
IndicesStatsResponse indicesStats = client().admin().indices().prepareStats(indexName).setMerge(true).get();
144+
long mergeCount = indicesStats.getIndices().get(indexName).getPrimaries().merge.getTotal();
145+
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get();
146+
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
147+
148+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
149+
if (useThreadPoolMerging) {
150+
assertThat(
151+
nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().get().completed(),
152+
equalTo(mergeCount)
153+
);
154+
} else {
155+
assertTrue(nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().isEmpty());
156+
}
157+
}
86158
}

0 commit comments

Comments
 (0)