Skip to content

Commit 3eab53c

Browse files
authored
Enable failure store for log data streams (#131261)
1 parent aa58fc7 commit 3eab53c

File tree

8 files changed

+183
-11
lines changed

8 files changed

+183
-11
lines changed

docs/changelog/131261.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
pr: 131261
2+
summary: Enable Failure Store for new logs-*-* data streams
3+
area: Data streams
4+
type: feature
5+
issues:
6+
- 131105
7+
highlight:
8+
title: Enable Failure Store for new logs data streams
9+
body: |-
10+
The [Failure Store](docs-content://manage-data/data-store/data-streams/failure-store.md) is now enabled by default for new logs data streams matching the pattern `logs-*-*`. This means that such data streams will now store invalid documents in a
11+
dedicated failure index instead of rejecting them, allowing better visibility and control over data quality issues without loosing data. This can be [enabled manually](docs-content://manage-data/data-store/data-streams/failure-store.md#set-up-failure-store-existing) for existing data streams.
12+
Note: With the failure store enabled, the http response code clients receive when indexing invalid documents will change from `400 Bad Request` to `201 Created`, with an additional response attribute `"failure_store" : "used"`.
13+
notable: true

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static org.hamcrest.Matchers.contains;
1818
import static org.hamcrest.Matchers.containsInAnyOrder;
19+
import static org.hamcrest.Matchers.containsString;
1920
import static org.hamcrest.Matchers.empty;
2021
import static org.hamcrest.Matchers.equalTo;
2122
import static org.hamcrest.Matchers.is;
@@ -740,6 +741,73 @@ public void testIgnoreDynamicBeyondLimit() throws Exception {
740741
assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
741742
}
742743

744+
@SuppressWarnings("unchecked")
745+
public void testFailureStoreWithInvalidFieldType() throws Exception {
746+
String dataStreamName = "logs-app-with-failure-store";
747+
createDataStream(client, dataStreamName);
748+
749+
indexDoc(client, dataStreamName, """
750+
{
751+
"@timestamp": "2023-11-30T12:00:00Z",
752+
"message": "This is a valid message"
753+
}
754+
""");
755+
756+
// invalid document (message as an object instead of string)
757+
indexDoc(client, dataStreamName, """
758+
{
759+
"@timestamp": "2023-11-30T12:01:00Z",
760+
"message": {
761+
"nested": "This should fail because message should be a string"
762+
}
763+
}
764+
""");
765+
766+
refreshAllIndices();
767+
768+
Request dsInfoRequest = new Request("GET", "/_data_stream/" + dataStreamName);
769+
Map<String, Object> dsInfoResponse = entityAsMap(client.performRequest(dsInfoRequest));
770+
List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) dsInfoResponse.get("data_streams");
771+
Map<String, Object> dataStream = dataStreams.getFirst();
772+
Map<String, Object> failureStoreInfo = (Map<String, Object>) dataStream.get("failure_store");
773+
assertNotNull(failureStoreInfo);
774+
assertThat(failureStoreInfo.get("enabled"), is(true));
775+
List<Map<String, Object>> failureIndices = (List<Map<String, Object>>) failureStoreInfo.get("indices");
776+
777+
assertThat(failureIndices, not(empty()));
778+
String failureIndex = (String) failureIndices.getFirst().get("index_name");
779+
assertThat(failureIndex, matchesRegex("\\.fs-" + dataStreamName + "-.*"));
780+
781+
// query the failure store index
782+
Request failureStoreQuery = new Request("GET", "/" + failureIndex + "/_search");
783+
failureStoreQuery.setJsonEntity("""
784+
{
785+
"query": {
786+
"match_all": {}
787+
}
788+
}
789+
""");
790+
Map<String, Object> failureStoreResponse = entityAsMap(client.performRequest(failureStoreQuery));
791+
Map<String, Object> hits = (Map<String, Object>) failureStoreResponse.get("hits");
792+
List<Map<String, Object>> hitsList = (List<Map<String, Object>>) hits.get("hits");
793+
794+
// Verify the failed document is in the failure store
795+
assertThat(hitsList.size(), is(1));
796+
Map<String, Object> failedDoc = (Map<String, Object>) hitsList.getFirst().get("_source");
797+
Map<String, Object> document = (Map<String, Object>) failedDoc.get("document");
798+
assertNotNull(document);
799+
Map<String, Object> source = (Map<String, Object>) document.get("source");
800+
assertNotNull(source);
801+
Map<String, Object> message = (Map<String, Object>) source.get("message");
802+
assertNotNull(message);
803+
assertThat(message.get("nested"), equalTo("This should fail because message should be a string"));
804+
Map<String, Object> error = (Map<String, Object>) failedDoc.get("error");
805+
assertNotNull(error);
806+
assertEquals("document_parsing_exception", error.get("type"));
807+
String errorMessage = (String) error.get("message");
808+
assertThat(errorMessage, containsString("failed to parse field [message] of type [match_only_text] in document with id"));
809+
}
810+
743811
@Override
744812
protected String indexTemplateName() {
745813
return "logs";

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,20 @@ public class DataStreamFeatures implements FeatureSpecification {
2828

2929
public static final NodeFeature LOGS_STREAM_FEATURE = new NodeFeature("logs_stream");
3030

31+
public static final NodeFeature FAILURE_STORE_IN_LOG_DATA_STREAMS = new NodeFeature("logs_data_streams.failure_store.enabled");
32+
3133
@Override
3234
public Set<NodeFeature> getFeatures() {
3335
return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
3436
}
3537

3638
@Override
3739
public Set<NodeFeature> getTestFeatures() {
38-
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX, LOGS_STREAM_FEATURE);
40+
return Set.of(
41+
DATA_STREAM_FAILURE_STORE_TSDB_FIX,
42+
DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX,
43+
LOGS_STREAM_FEATURE,
44+
FAILURE_STORE_IN_LOG_DATA_STREAMS
45+
);
3946
}
4047
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
---
2+
setup:
3+
- requires:
4+
cluster_features: [ "logs_data_streams.failure_store.enabled" ]
5+
reason: "failure store became enabled by default for log data streams in 9.2.0"
6+
7+
- do:
8+
indices.create_data_stream:
9+
name: logs-app-default
10+
---
11+
teardown:
12+
- do:
13+
indices.delete_data_stream:
14+
name: logs-app-default
15+
ignore: 404
16+
17+
---
18+
"Test logs-*-* data streams have failure store enabled by default":
19+
# index a valid document (string message)
20+
- do:
21+
index:
22+
index: logs-app-default
23+
refresh: true
24+
body:
25+
'@timestamp': '2023-01-01T12:00:00Z'
26+
host:
27+
name: 'server-01'
28+
severity: 'INFO'
29+
message: "Application started successfully"
30+
- match: { result: created }
31+
32+
- do:
33+
indices.get_data_stream:
34+
name: logs-app-default
35+
- match: { data_streams.0.name: logs-app-default }
36+
- length: { data_streams.0.indices: 1 }
37+
- match: { data_streams.0.failure_store.enabled: true }
38+
- length: { data_streams.0.failure_store.indices: 0 }
39+
40+
# index a document with (object message, causing a mapping conflict)
41+
- do:
42+
index:
43+
index: logs-app-default
44+
refresh: true
45+
body:
46+
'@timestamp': '2023-01-01T12:01:00Z'
47+
host:
48+
name: 'server-02'
49+
severity: 'ERROR'
50+
message:
51+
struct:
52+
value: 42
53+
- match: { result: 'created' }
54+
- match: { failure_store: used}
55+
56+
- do:
57+
indices.get_data_stream:
58+
name: logs-app-default
59+
- length: { data_streams.0.failure_store.indices: 1 }
60+
61+
- do:
62+
search:
63+
index: logs-app-default::data
64+
body:
65+
query:
66+
match_all: {}
67+
- length: { hits.hits: 1 }
68+
- match: { hits.hits.0._source.severity: "INFO" }
69+
- match: { hits.hits.0._source.message: "Application started successfully" }
70+
71+
- do:
72+
search:
73+
index: logs-app-default::failures
74+
body:
75+
query:
76+
match_all: {}
77+
- length: { hits.hits: 1 }
78+
- match: { hits.hits.0._source.document.source.message.struct.value: 42 }
79+
- match: { hits.hits.0._source.error.type: "document_parsing_exception" }

x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,20 @@ public void testDSXpackUsage() throws Exception {
7070
assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(0));
7171
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
7272
assertBusy(() -> {
73-
Map<?, ?> logsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/logs").get("index_templates")).get(0);
74-
assertThat(logsTemplate, notNullValue());
75-
assertThat(logsTemplate.get("name"), equalTo("logs"));
76-
assertThat(((Map<?, ?>) logsTemplate.get("index_template")).get("data_stream"), notNullValue());
73+
Map<?, ?> syntheticsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/synthetics").get("index_templates")).get(0);
74+
assertThat(syntheticsTemplate, notNullValue());
75+
assertThat(syntheticsTemplate.get("name"), equalTo("synthetics"));
76+
assertThat(((Map<?, ?>) syntheticsTemplate.get("index_template")).get("data_stream"), notNullValue());
7777
});
7878
putFailureStoreTemplate();
7979

8080
// Create a data stream
81-
Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
81+
Request indexRequest = new Request("POST", "/synthetics-myapp-default/_doc");
8282
indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
8383
client().performRequest(indexRequest);
8484

8585
// Roll over the data stream
86-
Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
86+
Request rollover = new Request("POST", "/synthetics-myapp-default/_rollover");
8787
client().performRequest(rollover);
8888

8989
// Create failure store data stream
@@ -105,10 +105,10 @@ public void testDSXpackUsage() throws Exception {
105105
assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(1));
106106
assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));
107107

108-
// Enable the failure store for logs-mysql-default using the cluster setting...
108+
// Enable the failure store for synthetics-myapp-default using the cluster setting...
109109
updateClusterSettings(
110110
Settings.builder()
111-
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "logs-mysql-default")
111+
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "synthetics-myapp-default")
112112
.build()
113113
);
114114
// ...and assert that it counts towards effectively_enabled_count but not explicitly_enabled_count:

x-pack/plugin/core/template-resources/src/main/resources/[email protected]

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
},
1515
"default_pipeline": "logs@default-pipeline"
1616
}
17+
},
18+
"data_stream_options": {
19+
"failure_store": {
20+
"enabled": true
21+
}
1722
}
1823
},
1924
"_meta": {

x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class StackTemplateRegistry extends IndexTemplateRegistry {
3737

3838
// The stack template registry version. This number must be incremented when we make changes
3939
// to built-in templates.
40-
public static final int REGISTRY_VERSION = 16;
40+
public static final int REGISTRY_VERSION = 17;
4141

4242
public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version";
4343
public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting(

x-pack/plugin/stack/src/yamlRestTest/resources/rest-api-spec/test/stack/10_basic.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,14 +276,14 @@ setup:
276276
data_stream.namespace: "namespace1"
277277

278278
- do:
279-
catch: bad_request
280279
index:
281280
index: logs-dataset0-namespace1
282281
body:
283282
"@timestamp": "2020-01-01"
284283
data_stream.type: "metrics"
285284
data_stream.dataset: "dataset0"
286285
data_stream.namespace: "namespace1"
286+
- match: { failure_store: used }
287287

288288
- do:
289289
catch: bad_request

0 commit comments

Comments
 (0)