Browse Source

Enable failure store for log data streams (#131261)

eyalkoren 2 months ago
parent
commit
3eab53cb4d

+ 13 - 0
docs/changelog/131261.yaml

@@ -0,0 +1,13 @@
+pr: 131261
+summary: Enable Failure Store for new logs-*-* data streams
+area: Data streams
+type: feature
+issues:
+ - 131105
+highlight:
+  title: Enable Failure Store for new logs data streams
+  body: |-
+    The [Failure Store](docs-content://manage-data/data-store/data-streams/failure-store.md) is now enabled by default for new logs data streams matching the pattern `logs-*-*`. This means that such data streams will now store invalid documents in a
+    dedicated failure index instead of rejecting them, allowing better visibility and control over data quality issues without loosing data. This can be [enabled manually](docs-content://manage-data/data-store/data-streams/failure-store.md#set-up-failure-store-existing) for existing data streams. 
+    Note: With the failure store enabled, the http response code clients receive when indexing invalid documents will change from `400 Bad Request` to `201 Created`, with an additional response attribute `"failure_store" : "used"`. 
+  notable: true

+ 68 - 0
modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamIT.java

@@ -16,6 +16,7 @@ import java.util.Map;
 
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.is;
@@ -740,6 +741,73 @@ public class LogsDataStreamIT extends AbstractDataStreamIT {
         assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
         assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
     }
     }
 
 
+    @SuppressWarnings("unchecked")
+    public void testFailureStoreWithInvalidFieldType() throws Exception {
+        String dataStreamName = "logs-app-with-failure-store";
+        createDataStream(client, dataStreamName);
+
+        indexDoc(client, dataStreamName, """
+            {
+              "@timestamp": "2023-11-30T12:00:00Z",
+              "message": "This is a valid message"
+            }
+            """);
+
+        // invalid document (message as an object instead of string)
+        indexDoc(client, dataStreamName, """
+            {
+              "@timestamp": "2023-11-30T12:01:00Z",
+              "message": {
+                "nested": "This should fail because message should be a string"
+              }
+            }
+            """);
+
+        refreshAllIndices();
+
+        Request dsInfoRequest = new Request("GET", "/_data_stream/" + dataStreamName);
+        Map<String, Object> dsInfoResponse = entityAsMap(client.performRequest(dsInfoRequest));
+        List<Map<String, Object>> dataStreams = (List<Map<String, Object>>) dsInfoResponse.get("data_streams");
+        Map<String, Object> dataStream = dataStreams.getFirst();
+        Map<String, Object> failureStoreInfo = (Map<String, Object>) dataStream.get("failure_store");
+        assertNotNull(failureStoreInfo);
+        assertThat(failureStoreInfo.get("enabled"), is(true));
+        List<Map<String, Object>> failureIndices = (List<Map<String, Object>>) failureStoreInfo.get("indices");
+
+        assertThat(failureIndices, not(empty()));
+        String failureIndex = (String) failureIndices.getFirst().get("index_name");
+        assertThat(failureIndex, matchesRegex("\\.fs-" + dataStreamName + "-.*"));
+
+        // query the failure store index
+        Request failureStoreQuery = new Request("GET", "/" + failureIndex + "/_search");
+        failureStoreQuery.setJsonEntity("""
+            {
+              "query": {
+                "match_all": {}
+              }
+            }
+            """);
+        Map<String, Object> failureStoreResponse = entityAsMap(client.performRequest(failureStoreQuery));
+        Map<String, Object> hits = (Map<String, Object>) failureStoreResponse.get("hits");
+        List<Map<String, Object>> hitsList = (List<Map<String, Object>>) hits.get("hits");
+
+        // Verify the failed document is in the failure store
+        assertThat(hitsList.size(), is(1));
+        Map<String, Object> failedDoc = (Map<String, Object>) hitsList.getFirst().get("_source");
+        Map<String, Object> document = (Map<String, Object>) failedDoc.get("document");
+        assertNotNull(document);
+        Map<String, Object> source = (Map<String, Object>) document.get("source");
+        assertNotNull(source);
+        Map<String, Object> message = (Map<String, Object>) source.get("message");
+        assertNotNull(message);
+        assertThat(message.get("nested"), equalTo("This should fail because message should be a string"));
+        Map<String, Object> error = (Map<String, Object>) failedDoc.get("error");
+        assertNotNull(error);
+        assertEquals("document_parsing_exception", error.get("type"));
+        String errorMessage = (String) error.get("message");
+        assertThat(errorMessage, containsString("failed to parse field [message] of type [match_only_text] in document with id"));
+    }
+
     @Override
     @Override
     protected String indexTemplateName() {
     protected String indexTemplateName() {
         return "logs";
         return "logs";

+ 8 - 1
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

@@ -28,6 +28,8 @@ public class DataStreamFeatures implements FeatureSpecification {
 
 
     public static final NodeFeature LOGS_STREAM_FEATURE = new NodeFeature("logs_stream");
     public static final NodeFeature LOGS_STREAM_FEATURE = new NodeFeature("logs_stream");
 
 
+    public static final NodeFeature FAILURE_STORE_IN_LOG_DATA_STREAMS = new NodeFeature("logs_data_streams.failure_store.enabled");
+
     @Override
     @Override
     public Set<NodeFeature> getFeatures() {
     public Set<NodeFeature> getFeatures() {
         return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
         return Set.of(DataStream.DATA_STREAM_FAILURE_STORE_FEATURE);
@@ -35,6 +37,11 @@ public class DataStreamFeatures implements FeatureSpecification {
 
 
     @Override
     @Override
     public Set<NodeFeature> getTestFeatures() {
     public Set<NodeFeature> getTestFeatures() {
-        return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX, LOGS_STREAM_FEATURE);
+        return Set.of(
+            DATA_STREAM_FAILURE_STORE_TSDB_FIX,
+            DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX,
+            LOGS_STREAM_FEATURE,
+            FAILURE_STORE_IN_LOG_DATA_STREAMS
+        );
     }
     }
 }
 }

+ 79 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/260_logs_failure_store.yml

@@ -0,0 +1,79 @@
+---
+setup:
+  - requires:
+      cluster_features: [ "logs_data_streams.failure_store.enabled" ]
+      reason: "failure store became enabled by default for log data streams in 9.2.0"
+
+  - do:
+      indices.create_data_stream:
+        name: logs-app-default
+---
+teardown:
+  - do:
+      indices.delete_data_stream:
+        name: logs-app-default
+        ignore: 404
+
+---
+"Test logs-*-* data streams have failure store enabled by default":
+  # index a valid document (string message)
+  - do:
+      index:
+        index: logs-app-default
+        refresh: true
+        body:
+          '@timestamp': '2023-01-01T12:00:00Z'
+          host:
+            name: 'server-01'
+          severity: 'INFO'
+          message: "Application started successfully"
+  - match: { result: created }
+
+  - do:
+      indices.get_data_stream:
+        name: logs-app-default
+  - match: { data_streams.0.name: logs-app-default }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 0 }
+
+  # index a document with (object message, causing a mapping conflict)
+  - do:
+      index:
+        index: logs-app-default
+        refresh: true
+        body:
+          '@timestamp': '2023-01-01T12:01:00Z'
+          host:
+            name: 'server-02'
+          severity: 'ERROR'
+          message:
+            struct:
+              value: 42
+  - match: { result: 'created' }
+  - match: { failure_store: used}
+
+  - do:
+      indices.get_data_stream:
+        name: logs-app-default
+  - length: { data_streams.0.failure_store.indices: 1 }
+
+  - do:
+      search:
+        index: logs-app-default::data
+        body:
+          query:
+            match_all: {}
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._source.severity: "INFO" }
+  - match: { hits.hits.0._source.message: "Application started successfully" }
+
+  - do:
+      search:
+        index: logs-app-default::failures
+        body:
+          query:
+            match_all: {}
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._source.document.source.message.struct.value: 42 }
+  - match: { hits.hits.0._source.error.type: "document_parsing_exception" }

+ 8 - 8
x-pack/plugin/core/src/javaRestTest/java/org/elasticsearch/xpack/core/DataStreamRestIT.java

@@ -70,20 +70,20 @@ public class DataStreamRestIT extends ESRestTestCase {
         assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(0));
         assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(0));
         assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
         assertThat(failureStoreStats.get("failure_indices_count"), equalTo(0));
         assertBusy(() -> {
         assertBusy(() -> {
-            Map<?, ?> logsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/logs").get("index_templates")).get(0);
-            assertThat(logsTemplate, notNullValue());
-            assertThat(logsTemplate.get("name"), equalTo("logs"));
-            assertThat(((Map<?, ?>) logsTemplate.get("index_template")).get("data_stream"), notNullValue());
+            Map<?, ?> syntheticsTemplate = (Map<?, ?>) ((List<?>) getLocation("/_index_template/synthetics").get("index_templates")).get(0);
+            assertThat(syntheticsTemplate, notNullValue());
+            assertThat(syntheticsTemplate.get("name"), equalTo("synthetics"));
+            assertThat(((Map<?, ?>) syntheticsTemplate.get("index_template")).get("data_stream"), notNullValue());
         });
         });
         putFailureStoreTemplate();
         putFailureStoreTemplate();
 
 
         // Create a data stream
         // Create a data stream
-        Request indexRequest = new Request("POST", "/logs-mysql-default/_doc");
+        Request indexRequest = new Request("POST", "/synthetics-myapp-default/_doc");
         indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
         indexRequest.setJsonEntity("{\"@timestamp\": \"2020-01-01\"}");
         client().performRequest(indexRequest);
         client().performRequest(indexRequest);
 
 
         // Roll over the data stream
         // Roll over the data stream
-        Request rollover = new Request("POST", "/logs-mysql-default/_rollover");
+        Request rollover = new Request("POST", "/synthetics-myapp-default/_rollover");
         client().performRequest(rollover);
         client().performRequest(rollover);
 
 
         // Create failure store data stream
         // Create failure store data stream
@@ -105,10 +105,10 @@ public class DataStreamRestIT extends ESRestTestCase {
         assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(1));
         assertThat(failureStoreStats.get("effectively_enabled_count"), equalTo(1));
         assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));
         assertThat(failureStoreStats.get("failure_indices_count"), equalTo(1));
 
 
-        // Enable the failure store for logs-mysql-default using the cluster setting...
+        // Enable the failure store for synthetics-myapp-default using the cluster setting...
         updateClusterSettings(
         updateClusterSettings(
             Settings.builder()
             Settings.builder()
-                .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "logs-mysql-default")
+                .put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "synthetics-myapp-default")
                 .build()
                 .build()
         );
         );
         // ...and assert that it counts towards effectively_enabled_count but not explicitly_enabled_count:
         // ...and assert that it counts towards effectively_enabled_count but not explicitly_enabled_count:

+ 5 - 0
x-pack/plugin/core/template-resources/src/main/resources/logs@settings.json

@@ -14,6 +14,11 @@
         },
         },
         "default_pipeline": "logs@default-pipeline"
         "default_pipeline": "logs@default-pipeline"
       }
       }
+    },
+    "data_stream_options": {
+      "failure_store": {
+        "enabled": true
+      }
     }
     }
   },
   },
   "_meta": {
   "_meta": {

+ 1 - 1
x-pack/plugin/stack/src/main/java/org/elasticsearch/xpack/stack/StackTemplateRegistry.java

@@ -37,7 +37,7 @@ public class StackTemplateRegistry extends IndexTemplateRegistry {
 
 
     // The stack template registry version. This number must be incremented when we make changes
     // The stack template registry version. This number must be incremented when we make changes
     // to built-in templates.
     // to built-in templates.
-    public static final int REGISTRY_VERSION = 16;
+    public static final int REGISTRY_VERSION = 17;
 
 
     public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version";
     public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version";
     public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting(
     public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting(

+ 1 - 1
x-pack/plugin/stack/src/yamlRestTest/resources/rest-api-spec/test/stack/10_basic.yml

@@ -276,7 +276,6 @@ setup:
           data_stream.namespace: "namespace1"
           data_stream.namespace: "namespace1"
 
 
  - do:
  - do:
-      catch: bad_request
       index:
       index:
         index: logs-dataset0-namespace1
         index: logs-dataset0-namespace1
         body:
         body:
@@ -284,6 +283,7 @@ setup:
           data_stream.type: "metrics"
           data_stream.type: "metrics"
           data_stream.dataset: "dataset0"
           data_stream.dataset: "dataset0"
           data_stream.namespace: "namespace1"
           data_stream.namespace: "namespace1"
+ - match: { failure_store: used }
 
 
  - do:
  - do:
       catch: bad_request
       catch: bad_request