Skip to content

Commit 3432472

Browse files
authored
Improving random sampling performance by lazily calling getSamplingConfiguration() (#137223)
1 parent c1be09f commit 3432472

File tree

2 files changed

+49
-10
lines changed

2 files changed

+49
-10
lines changed

server/src/main/java/org/elasticsearch/ingest/SamplingService.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,16 +214,24 @@ private void maybeSample(
214214
return;
215215
}
216216
long startTime = statsTimeSupplier.getAsLong();
217-
SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName);
218-
if (samplingConfig == null) {
219-
return;
220-
}
221-
SoftReference<SampleInfo> sampleInfoReference = samples.compute(
222-
new ProjectIndex(projectMetadata.id(), indexName),
223-
(k, v) -> v == null || v.get() == null ? new SoftReference<>(new SampleInfo(samplingConfig.maxSamples())) : v
224-
);
217+
SoftReference<SampleInfo> sampleInfoReference = samples.compute(new ProjectIndex(projectMetadata.id(), indexName), (k, v) -> {
218+
if (v == null || v.get() == null) {
219+
SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName);
220+
if (samplingConfig == null) {
221+
/*
222+
* Calls to getSamplingConfiguration() are relatively expensive. So we store the NONE object here to indicate that there
223+
* was no sampling configuration. This way we don't have to do the lookup every single time for every index that has no
224+
* sampling configuration. If a sampling configuration is added for this index, this NONE sample will be removed by
225+
* the cluster state change listener.
226+
*/
227+
return new SoftReference<>(SampleInfo.NONE);
228+
}
229+
return new SoftReference<>(new SampleInfo(samplingConfig.maxSamples()));
230+
}
231+
return v;
232+
});
225233
SampleInfo sampleInfo = sampleInfoReference.get();
226-
if (sampleInfo == null) {
234+
if (sampleInfo == null || sampleInfo == SampleInfo.NONE) {
227235
return;
228236
}
229237
SampleStats stats = sampleInfo.stats;
@@ -233,6 +241,10 @@ private void maybeSample(
233241
stats.samplesRejectedForMaxSamplesExceeded.increment();
234242
return;
235243
}
244+
SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName);
245+
if (samplingConfig == null) {
246+
return; // it was not null above, but has since become null because the index was deleted asynchronously
247+
}
236248
if (sampleInfo.getSizeInBytes() + indexRequest.source().length() > samplingConfig.maxSize().getBytes()) {
237249
stats.samplesRejectedForSize.increment();
238250
return;
@@ -496,7 +508,12 @@ private void maybeRemoveStaleSamples(ClusterChangedEvent event, ProjectId projec
496508
if (oldSampleConfigsMap.containsKey(indexName) && entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) {
497509
logger.debug("Removing sample info for {} because its configuration has changed", indexName);
498510
samples.remove(new ProjectIndex(projectId, indexName));
499-
}
511+
} else if (oldSampleConfigsMap.containsKey(indexName) == false
512+
&& samples.containsKey(new ProjectIndex(projectId, indexName))) {
513+
// There had previously been a NONE sample here. There is a real config now, so delete the NONE sample
514+
logger.debug("Removing sample info for {} because its configuration has been created", indexName);
515+
samples.remove(new ProjectIndex(projectId, indexName));
516+
}
500517
}
501518
}
502519
}
@@ -1040,6 +1057,7 @@ public SampleStats adjustForMaxSize(int maxSize) {
10401057
* This is used internally to store information about a sample in the samples Map.
10411058
*/
10421059
private static final class SampleInfo {
1060+
public static final SampleInfo NONE = new SampleInfo(0);
10431061
private final RawDocument[] rawDocuments;
10441062
/*
10451063
* This stores the maximum index in rawDocuments that has data currently. This is incremented speculatively before writing data to

server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public void testMaybeSample() {
5959
ProjectMetadata.Builder projectBuilder = ProjectMetadata.builder(ProjectId.DEFAULT);
6060
final ProjectId projectId = projectBuilder.getId();
6161
ProjectMetadata projectMetadata = projectBuilder.build();
62+
ClusterState originalClusterState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build();
6263
Map<String, Object> inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10)));
6364
final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource);
6465
samplingService.maybeSample(projectMetadata, indexRequest);
@@ -74,6 +75,26 @@ public void testMaybeSample() {
7475
)
7576
);
7677
projectMetadata = projectBuilder.build();
78+
{
79+
/*
80+
* First we ingest some docs without notifying samplingService of the cluster state change. It will have cached the fact that
81+
* there is no config for this index, and so it will not store any samples.
82+
*/
83+
int docsToSample = randomIntBetween(1, maxSize);
84+
for (int i = 0; i < docsToSample; i++) {
85+
samplingService.maybeSample(projectMetadata, indexRequest);
86+
}
87+
List<SamplingService.RawDocument> sample = samplingService.getLocalSample(projectId, indexName);
88+
assertThat(sample, empty());
89+
}
90+
// Now we notify samplingService that the cluster state has changed, and it will pick up the new sampling config
91+
samplingService.clusterChanged(
92+
new ClusterChangedEvent(
93+
"test",
94+
ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(),
95+
originalClusterState
96+
)
97+
);
7798
int docsToSample = randomIntBetween(1, maxSize);
7899
for (int i = 0; i < docsToSample; i++) {
79100
samplingService.maybeSample(projectMetadata, indexRequest);

0 commit comments

Comments
 (0)