Skip to content

Commit f893457

Browse files
authored
[ML] Modify test case to update running job (elastic#124287) (elastic#124524)
This PR makes a change to the existing Java REST test DetectionRulesIT.testCondition such that it updates detection rules for a running job. Previously it had relied on closing and re-opening the job for the update to take effect. Relates elastic/ml-cpp#2821
1 parent 9b9ee8e commit f893457

File tree

2 files changed

+132
-5
lines changed

2 files changed

+132
-5
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.search.SearchHit;
1212
import org.elasticsearch.search.sort.SortOrder;
1313
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
14+
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
1415
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
1516
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
1617
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
@@ -98,7 +99,7 @@ public void testCondition() throws Exception {
9899

99100
// push the data for the first half buckets
100101
postData(job.getId(), joinBetween(0, data.size() / 2, data));
101-
closeJob(job.getId());
102+
flushJob(job.getId(), true);
102103

103104
List<AnomalyRecord> records = getRecords(job.getId());
104105
// remove records that are not anomalies
@@ -116,18 +117,35 @@ public void testCondition() throws Exception {
116117
JobUpdate.Builder update = new JobUpdate.Builder(job.getId());
117118
update.setDetectorUpdates(Arrays.asList(new JobUpdate.DetectorUpdate(0, null, Arrays.asList(newRule))));
118119
updateJob(job.getId(), update.build());
120+
// Wait until the notification that the job was updated is indexed
121+
assertBusy(
122+
() -> assertResponse(
123+
prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).setSize(1)
124+
.addSort("timestamp", SortOrder.DESC)
125+
.setQuery(
126+
QueryBuilders.boolQuery()
127+
.filter(QueryBuilders.termQuery("job_id", job.getId()))
128+
.filter(QueryBuilders.termQuery("level", "info"))
129+
),
130+
searchResponse -> {
131+
SearchHit[] hits = searchResponse.getHits().getHits();
132+
assertThat(hits.length, equalTo(1));
133+
assertThat((String) hits[0].getSourceAsMap().get("message"), containsString("Job updated: [detectors]"));
134+
}
135+
)
136+
);
119137
}
120138

121139
// push second half
122-
openJob(job.getId());
123140
postData(job.getId(), joinBetween(data.size() / 2, data.size(), data));
124-
closeJob(job.getId());
141+
flushJob(job.getId(), true);
125142

126143
GetRecordsAction.Request recordsAfterFirstHalf = new GetRecordsAction.Request(job.getId());
127144
recordsAfterFirstHalf.setStart(String.valueOf(firstRecordTimestamp + 1));
128145
records = getRecords(recordsAfterFirstHalf);
129146
assertThat("records were " + records, (int) (records.stream().filter(r -> r.getProbability() < 0.01).count()), equalTo(1));
130147
assertThat(records.get(0).getByFieldValue(), equalTo("low"));
148+
closeJob(job.getId());
131149
}
132150

133151
public void testScope() throws Exception {
@@ -242,7 +260,7 @@ public void testScope() throws Exception {
242260
closeJob(job.getId());
243261
}
244262

245-
public void testScopeAndCondition() throws IOException {
263+
public void testScopeAndCondition() throws Exception {
246264
// We have 2 IPs and they're both safe-listed.
247265
List<String> ips = Arrays.asList("111.111.111.111", "222.222.222.222");
248266
MlFilter safeIps = MlFilter.builder("safe_ips").setItems(ips).build();
@@ -298,11 +316,112 @@ public void testScopeAndCondition() throws IOException {
298316
}
299317

300318
postData(job.getId(), joinBetween(0, data.size(), data));
301-
closeJob(job.getId());
319+
flushJob(job.getId(), true);
302320

303321
List<AnomalyRecord> records = getRecords(job.getId());
304322
assertThat(records.size(), equalTo(1));
305323
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));
324+
325+
// Remove "111.111.111.111" from the "safe_ips" filter
326+
List<String> addedIps = Arrays.asList();
327+
List<String> removedIps = Arrays.asList("111.111.111.111");
328+
PutFilterAction.Response updatedFilter = updateMlFilter("safe_ips", addedIps, removedIps);
329+
// Wait until the notification that the filter was updated is indexed
330+
assertBusy(
331+
() -> assertResponse(
332+
prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).setSize(1)
333+
.addSort("timestamp", SortOrder.DESC)
334+
.setQuery(
335+
QueryBuilders.boolQuery()
336+
.filter(QueryBuilders.termQuery("job_id", job.getId()))
337+
.filter(QueryBuilders.termQuery("level", "info"))
338+
),
339+
searchResponse -> {
340+
SearchHit[] hits = searchResponse.getHits().getHits();
341+
assertThat(hits.length, equalTo(1));
342+
assertThat(
343+
(String) hits[0].getSourceAsMap().get("message"),
344+
containsString("Filter [safe_ips] has been modified; removed items: ['111.111.111.111']")
345+
);
346+
}
347+
)
348+
);
349+
MlFilter updatedSafeIps = MlFilter.builder("safe_ips").setItems(Arrays.asList("222.222.222.222")).build();
350+
assertThat(updatedFilter.getFilter(), equalTo(updatedSafeIps));
351+
352+
data.clear();
353+
// Now send anomalous count of 9 for 111.111.111.111
354+
for (int i = 0; i < 9; i++) {
355+
data.add(createIpRecord(timestamp, "111.111.111.111"));
356+
}
357+
358+
// Some more normal buckets
359+
for (int bucket = 0; bucket < 3; bucket++) {
360+
for (String ip : ips) {
361+
data.add(createIpRecord(timestamp, ip));
362+
}
363+
timestamp += TimeValue.timeValueHours(1).getMillis();
364+
}
365+
366+
postData(job.getId(), joinBetween(0, data.size(), data));
367+
flushJob(job.getId(), true);
368+
369+
records = getRecords(job.getId());
370+
assertThat(records.size(), equalTo(2));
371+
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));
372+
assertThat(records.get(1).getOverFieldValue(), equalTo("111.111.111.111"));
373+
374+
{
375+
// Update detection rules such that it now applies only to actual values > 10.0
376+
DetectionRule newRule = new DetectionRule.Builder(
377+
Arrays.asList(new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.GT, 10.0))
378+
).build();
379+
JobUpdate.Builder update = new JobUpdate.Builder(job.getId());
380+
update.setDetectorUpdates(Arrays.asList(new JobUpdate.DetectorUpdate(0, null, Arrays.asList(newRule))));
381+
updateJob(job.getId(), update.build());
382+
// Wait until the notification that the job was updated is indexed
383+
assertBusy(
384+
() -> assertResponse(
385+
prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).setSize(1)
386+
.addSort("timestamp", SortOrder.DESC)
387+
.setQuery(
388+
QueryBuilders.boolQuery()
389+
.filter(QueryBuilders.termQuery("job_id", job.getId()))
390+
.filter(QueryBuilders.termQuery("level", "info"))
391+
),
392+
searchResponse -> {
393+
SearchHit[] hits = searchResponse.getHits().getHits();
394+
assertThat(hits.length, equalTo(1));
395+
assertThat((String) hits[0].getSourceAsMap().get("message"), containsString("Job updated: [detectors]"));
396+
}
397+
)
398+
);
399+
}
400+
401+
data.clear();
402+
// Now send anomalous count of 10 for 222.222.222.222
403+
for (int i = 0; i < 10; i++) {
404+
data.add(createIpRecord(timestamp, "222.222.222.222"));
405+
}
406+
407+
// Some more normal buckets
408+
for (int bucket = 0; bucket < 3; bucket++) {
409+
for (String ip : ips) {
410+
data.add(createIpRecord(timestamp, ip));
411+
}
412+
timestamp += TimeValue.timeValueHours(1).getMillis();
413+
}
414+
415+
postData(job.getId(), joinBetween(0, data.size(), data));
416+
417+
closeJob(job.getId());
418+
419+
// The anomalous records should not have changed.
420+
records = getRecords(job.getId());
421+
assertThat(records.size(), equalTo(2));
422+
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));
423+
assertThat(records.get(1).getOverFieldValue(), equalTo("111.111.111.111"));
424+
306425
}
307426

308427
public void testForceTimeShiftAction() throws Exception {

x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
8080
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
8181
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
82+
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
8283
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
8384
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
8485
import org.elasticsearch.xpack.core.ml.inference.ModelAliasMetadata;
@@ -311,6 +312,13 @@ protected PutFilterAction.Response putMlFilter(MlFilter filter) {
311312
return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet();
312313
}
313314

315+
protected PutFilterAction.Response updateMlFilter(String filterId, List<String> addItems, List<String> removeItems) {
316+
UpdateFilterAction.Request request = new UpdateFilterAction.Request(filterId);
317+
request.setAddItems(addItems);
318+
request.setRemoveItems(removeItems);
319+
return client().execute(UpdateFilterAction.INSTANCE, request).actionGet();
320+
}
321+
314322
protected static List<String> fetchAllAuditMessages(String jobId) throws Exception {
315323
RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
316324
BroadcastResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();

0 commit comments

Comments
 (0)