소스 검색

Adjust failure store to work with TSDS (#114307) (#115294)

In this PR we add a test and we fix the issues we encountered when we
enabled the failure store for TSDS and logsdb.

**Logsdb** Logsdb worked out of the box, so we just added the test that
indexes with a bulk request a couple of documents and tests how they are
ingested.

**TSDS** Here it was a bit trickier. We encountered the following
issues:

- TSDS requires a timestamp to determine the write index of the data stream meaning the failure happens earlier than we have anticipated so far. We added a special exception to detect this case and we treat it accordingly.
- The template of a TSDS data stream sets certain settings that we do not want to have in the failure store index. We added an allowlist that gets applied before we add the necessary index settings. 

Furthermore, we added a test case to capture this.
Mary Gouseti 1 년 전
부모
커밋
21312daadf

+ 2 - 1
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

@@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
 import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
 import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
 import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
@@ -170,7 +171,7 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {
             var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
             time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99));
             indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
-            expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
+            expectThrows(IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class, () -> client().index(indexRequest).actionGet());
         }
 
         // Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index:

+ 6 - 0
modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java

@@ -26,6 +26,7 @@ import java.util.Set;
 public class DataStreamFeatures implements FeatureSpecification {
 
     public static final NodeFeature DATA_STREAM_LIFECYCLE = new NodeFeature("data_stream.lifecycle");
+    public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");
 
     @Override
     public Map<NodeFeature, Version> getHistoricalFeatures() {
@@ -41,4 +42,9 @@ public class DataStreamFeatures implements FeatureSpecification {
             DataStreamGlobalRetention.GLOBAL_RETENTION                       // Added in 8.14
         );
     }
+
+    @Override
+    public Set<NodeFeature> getTestFeatures() {
+        return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
+    }
 }

+ 101 - 0
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/150_tsdb.yml

@@ -182,6 +182,107 @@ index without timestamp:
         body:
           - '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
 
+---
+TSDB failures go to failure store:
+  - requires:
+      cluster_features: ["data_stream.failure_store.tsdb_fix"]
+      reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store."
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template2
+        body:
+          index_patterns: [ "fs-k8s*" ]
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              index:
+                mode: time_series
+                number_of_replicas: 1
+                number_of_shards: 2
+                routing_path: [ metricset, time_series_dimension ]
+                time_series:
+                  start_time: 2021-04-28T00:00:00Z
+                  end_time: 2021-04-29T00:00:00Z
+            mappings:
+              properties:
+                "@timestamp":
+                  type: date
+                metricset:
+                  type: keyword
+                  time_series_dimension: true
+                k8s:
+                  properties:
+                    pod:
+                      properties:
+                        uid:
+                          type: keyword
+                          time_series_dimension: true
+                        name:
+                          type: keyword
+                        ip:
+                          type: ip
+                        network:
+                          properties:
+                            tx:
+                              type: long
+                            rx:
+                              type: long
+  - do:
+      index:
+        index: fs-k8s
+        body:
+          - '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+  - match: { result : "created"}
+  - match: { failure_store : "used"}
+
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{ "create": { "_index": "fs-k8s"} }'
+          - '{"@timestamp":"2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+          - '{ "create": { "_index": "k8s"} }'
+          - '{ "@timestamp": "2021-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+          - '{ "create": { "_index": "fs-k8s"} }'
+          - '{ "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+          - '{ "create": { "_index": "fs-k8s"} }'
+          - '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+          - '{ "create": { "_index": "k8s"} }'
+          - '{"metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+          - '{ "create": { "_index": "k8s"} }'
+          - '{ "@timestamp":"2000-04-28T01:00:00ZZ", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
+  - is_true: errors
+
+  # Successfully indexed to backing index
+  - match: { items.0.create._index: '/\.ds-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { items.0.create.status: 201 }
+  - is_false: items.0.create.failure_store
+  - match: { items.1.create._index: '/\.ds-k8s-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { items.1.create.status: 201 }
+  - is_false: items.1.create.failure_store
+
+  # Successfully indexed to failure store
+  - match: { items.2.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { items.2.create.status: 201 }
+  - match: { items.2.create.failure_store: used }
+  - match: { items.3.create._index: '/\.fs-fs-k8s-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { items.3.create.status: 201 }
+  - match: { items.3.create.failure_store: used }
+
+  # Rejected, eligible to go to failure store, but failure store not enabled
+  - match: { items.4.create._index: 'k8s' }
+  - match: { items.4.create.status: 400 }
+  - match: { items.4.create.error.type: timestamp_error }
+  - match: { items.4.create.failure_store: not_enabled }
+  - match: { items.4.create._index: 'k8s' }
+  - match: { items.4.create.status: 400 }
+  - match: { items.4.create.error.type: timestamp_error }
+  - match: { items.4.create.failure_store: not_enabled }
+
 ---
 index without timestamp with pipeline:
   - do:

+ 1 - 1
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

@@ -879,7 +879,7 @@ teardown:
   # Successfully indexed to backing index
   - match: { items.0.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
   - match: { items.0.create.status: 201 }
-  - is_false: items.1.create.failure_store
+  - is_false: items.0.create.failure_store
 
   # Rejected but not eligible to go to failure store
   - match: { items.1.create._index: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

+ 12 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java

@@ -35,6 +35,7 @@ public class CreateIndexClusterStateUpdateRequest {
     private ResizeType resizeType;
     private boolean copySettings;
     private SystemDataStreamDescriptor systemDataStreamDescriptor;
+    private boolean isFailureIndex = false;
 
     private Settings settings = Settings.EMPTY;
 
@@ -102,6 +103,11 @@ public class CreateIndexClusterStateUpdateRequest {
         return this;
     }
 
+    public CreateIndexClusterStateUpdateRequest isFailureIndex(boolean isFailureIndex) {
+        this.isFailureIndex = isFailureIndex;
+        return this;
+    }
+
     public String cause() {
         return cause;
     }
@@ -168,6 +174,10 @@ public class CreateIndexClusterStateUpdateRequest {
         return dataStreamName;
     }
 
+    public boolean isFailureIndex() {
+        return isFailureIndex;
+    }
+
     public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName) {
         this.dataStreamName = dataStreamName;
         return this;
@@ -228,6 +238,8 @@ public class CreateIndexClusterStateUpdateRequest {
             + systemDataStreamDescriptor
             + ", matchingTemplate="
             + matchingTemplate
+            + ", isFailureIndex="
+            + isFailureIndex
             + '}';
     }
 }

+ 7 - 0
server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

@@ -320,6 +320,12 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
                     shard -> new ArrayList<>()
                 );
                 shardRequests.add(bulkItemRequest);
+            } catch (DataStream.TimestampError timestampError) {
+                IndexDocFailureStoreStatus failureStoreStatus = processFailure(bulkItemRequest, clusterState, timestampError);
+                if (IndexDocFailureStoreStatus.USED.equals(failureStoreStatus) == false) {
+                    String name = ia != null ? ia.getName() : docWriteRequest.index();
+                    addFailureAndDiscardRequest(docWriteRequest, bulkItemRequest.id(), name, timestampError, failureStoreStatus);
+                }
             } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {
                 String name = ia != null ? ia.getName() : docWriteRequest.index();
                 var failureStoreStatus = isFailureStoreRequest(docWriteRequest)
@@ -545,6 +551,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
                 boolean added = addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());
                 if (added) {
                     failureStoreMetrics.incrementFailureStore(bulkItemRequest.index(), errorType, FailureStoreMetrics.ErrorLocation.SHARD);
+                    return IndexDocFailureStoreStatus.USED;
                 } else {
                     failureStoreMetrics.incrementRejected(
                         bulkItemRequest.index(),

+ 20 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -1343,7 +1343,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
                         + "]"
                 )
                 .collect(Collectors.joining());
-            throw new IllegalArgumentException(
+            throw new TimestampError(
                 "the document timestamp ["
                     + timestampAsString
                     + "] is outside of ranges of currently writable indices ["
@@ -1405,10 +1405,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             } else if (rawTimestamp instanceof String sTimestamp) {
                 return DateFormatters.from(TIMESTAMP_FORMATTER.parse(sTimestamp), TIMESTAMP_FORMATTER.locale()).toInstant();
             } else {
-                throw new IllegalArgumentException("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
+                throw new TimestampError("timestamp [" + rawTimestamp + "] type [" + rawTimestamp.getClass() + "] error");
             }
         } catch (Exception e) {
-            throw new IllegalArgumentException("Error get data stream timestamp field: " + e.getMessage(), e);
+            throw new TimestampError("Error get data stream timestamp field: " + e.getMessage(), e);
         }
     }
 
@@ -1432,7 +1432,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
                 );
             };
         } catch (Exception e) {
-            throw new IllegalArgumentException("Error extracting data stream timestamp field: " + e.getMessage(), e);
+            throw new TimestampError("Error extracting data stream timestamp field: " + e.getMessage(), e);
         }
     }
 
@@ -1741,4 +1741,20 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
             );
         }
     }
+
+    /**
+     * This is a specialised error to capture that a document does not have a valid timestamp
+     * to index a document. It is mainly applicable for TSDS data streams because they need the timestamp
+     * to determine the write index.
+     */
+    public static class TimestampError extends IllegalArgumentException {
+
+        public TimestampError(String message, Exception cause) {
+            super(message, cause);
+        }
+
+        public TimestampError(String message) {
+            super(message);
+        }
+    }
 }

+ 41 - 6
server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinition.java

@@ -9,6 +9,7 @@
 
 package org.elasticsearch.cluster.metadata;
 
+import org.elasticsearch.cluster.routing.allocation.DataTier;
 import org.elasticsearch.common.compress.CompressedXContent;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -19,6 +20,8 @@ import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.index.mapper.RoutingFieldMapper;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * A utility class that contains the mappings and settings logic for failure store indices that are a part of data streams.
@@ -26,12 +29,30 @@ import java.io.IOException;
 public class DataStreamFailureStoreDefinition {
 
     public static final String FAILURE_STORE_REFRESH_INTERVAL_SETTING_NAME = "data_streams.failure_store.refresh_interval";
+    public static final String INDEX_FAILURE_STORE_VERSION_SETTING_NAME = "index.failure_store.version";
     public static final Settings DATA_STREAM_FAILURE_STORE_SETTINGS;
+    // Only a subset of user configurable settings is applicable for a failure index. Here we have an
+    // allowlist that will filter all other settings out.
+    public static final Set<String> SUPPORTED_USER_SETTINGS = Set.of(
+        DataTier.TIER_PREFERENCE,
+        IndexMetadata.SETTING_INDEX_HIDDEN,
+        INDEX_FAILURE_STORE_VERSION_SETTING_NAME,
+        IndexMetadata.SETTING_NUMBER_OF_SHARDS,
+        IndexMetadata.SETTING_NUMBER_OF_REPLICAS,
+        IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS,
+        IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(),
+        IndexMetadata.LIFECYCLE_NAME
+    );
+    public static final Set<String> SUPPORTED_USER_SETTINGS_PREFIXES = Set.of(
+        IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + ".",
+        IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + ".",
+        IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "."
+    );
     public static final CompressedXContent DATA_STREAM_FAILURE_STORE_MAPPING;
 
     public static final int FAILURE_STORE_DEFINITION_VERSION = 1;
     public static final Setting<Integer> FAILURE_STORE_DEFINITION_VERSION_SETTING = Setting.intSetting(
-        "index.failure_store.version",
+        INDEX_FAILURE_STORE_VERSION_SETTING_NAME,
         0,
         Setting.Property.IndexScope
     );
@@ -40,11 +61,6 @@ public class DataStreamFailureStoreDefinition {
         DATA_STREAM_FAILURE_STORE_SETTINGS = Settings.builder()
             // Always start with the hidden settings for a backing index.
             .put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
-            // Override any pipeline settings on the failure store to not use any
-            // specified by the data stream template. Default pipelines are very much
-            // meant for the backing indices only.
-            .putNull(IndexSettings.DEFAULT_PIPELINE.getKey())
-            .putNull(IndexSettings.FINAL_PIPELINE.getKey())
             .put(FAILURE_STORE_DEFINITION_VERSION_SETTING.getKey(), FAILURE_STORE_DEFINITION_VERSION)
             .build();
 
@@ -199,4 +215,23 @@ public class DataStreamFailureStoreDefinition {
         }
         return builder;
     }
+
+    /**
+     * Removes the unsupported by the failure store settings from the settings provided.
+     * ATTENTION: This method should be applied BEFORE we set the necessary settings for an index
+     * @param builder the settings builder that is going to be updated
+     * @return the original settings builder, with the unsupported settings removed.
+     */
+    public static Settings.Builder filterUserDefinedSettings(Settings.Builder builder) {
+        if (builder.keys().isEmpty() == false) {
+            Set<String> existingKeys = new HashSet<>(builder.keys());
+            for (String setting : existingKeys) {
+                if (SUPPORTED_USER_SETTINGS.contains(setting) == false
+                    && SUPPORTED_USER_SETTINGS_PREFIXES.stream().anyMatch(setting::startsWith) == false) {
+                    builder.remove(setting);
+                }
+            }
+        }
+        return builder;
+    }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

@@ -425,7 +425,8 @@ public class MetadataCreateDataStreamService {
             .nameResolvedInstant(nameResolvedInstant)
             .performReroute(false)
             .setMatchingTemplate(template)
-            .settings(indexSettings);
+            .settings(indexSettings)
+            .isFailureIndex(true);
 
         try {
             currentState = metadataCreateIndexService.applyCreateIndexRequest(

+ 5 - 2
server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

@@ -983,6 +983,7 @@ public class MetadataCreateIndexService {
             final Settings templateAndRequestSettings = Settings.builder().put(combinedTemplateSettings).put(request.settings()).build();
 
             final IndexMode templateIndexMode = Optional.of(request)
+                .filter(r -> r.isFailureIndex() == false)
                 .map(CreateIndexClusterStateUpdateRequest::matchingTemplate)
                 .map(metadata::retrieveIndexModeFromTemplate)
                 .orElse(null);
@@ -1038,11 +1039,13 @@ public class MetadataCreateIndexService {
 
             // Finally, we actually add the explicit defaults prior to the template settings and the
             // request settings, so that the precedence goes:
-            // Explicit Defaults -> Template -> Request -> Necessary Settings (# of shards, uuid, etc)
+            // Explicit Defaults -> Template -> Request -> Filter out failure store settings -> Necessary Settings (# of shards, uuid, etc)
             indexSettingsBuilder.put(additionalIndexSettings.build());
             indexSettingsBuilder.put(templateSettings.build());
         }
-
+        if (request.isFailureIndex()) {
+            DataStreamFailureStoreDefinition.filterUserDefinedSettings(indexSettingsBuilder);
+        }
         // now, put the request settings, so they override templates
         indexSettingsBuilder.put(requestSettings.build());
 

+ 73 - 0
server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreDefinitionTests.java

@@ -0,0 +1,73 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.cluster.metadata;
+
+import org.elasticsearch.cluster.routing.allocation.DataTier;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.elasticsearch.cluster.metadata.DataStreamFailureStoreDefinition.INDEX_FAILURE_STORE_VERSION_SETTING_NAME;
+import static org.hamcrest.Matchers.equalTo;
+
+public class DataStreamFailureStoreDefinitionTests extends ESTestCase {
+
+    public void testSettingsFiltering() {
+        // Empty
+        Settings.Builder builder = Settings.builder();
+        Settings.Builder expectedBuilder = Settings.builder();
+        assertThat(DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys(), equalTo(expectedBuilder.keys()));
+
+        // All supported settings
+        builder.put(INDEX_FAILURE_STORE_VERSION_SETTING_NAME, 3)
+            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(DataTier.TIER_PREFERENCE, "data_cold")
+            .put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
+            .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-10")
+            .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
+            .put(IndexMetadata.LIFECYCLE_NAME, "my-policy")
+            .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4))
+            .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4))
+            .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4));
+        // We expect no changes
+        expectedBuilder = Settings.builder().put(builder.build());
+        assertThat(DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys(), equalTo(expectedBuilder.keys()));
+
+        // Remove unsupported settings
+        String randomSetting = randomAlphaOfLength(10);
+        builder.put(INDEX_FAILURE_STORE_VERSION_SETTING_NAME, 3)
+            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+            .put(DataTier.TIER_PREFERENCE, "data_cold")
+            .put(IndexMetadata.SETTING_INDEX_HIDDEN, true)
+            .put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-10")
+            .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
+            .put(IndexMetadata.LIFECYCLE_NAME, "my-policy")
+            .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4))
+            .put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4))
+            .put(IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "." + randomAlphaOfLength(4), randomAlphaOfLength(4))
+            .put(IndexSettings.MODE.getKey(), randomFrom(IndexMode.values()))
+            .put(randomSetting, randomAlphaOfLength(10));
+        // We expect no changes
+        expectedBuilder = Settings.builder().put(builder.build());
+        assertThat(
+            DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys().size(),
+            equalTo(expectedBuilder.keys().size() - 2)
+        );
+        assertThat(
+            DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys().contains(IndexSettings.MODE.getKey()),
+            equalTo(false)
+        );
+        assertThat(DataStreamFailureStoreDefinition.filterUserDefinedSettings(builder).keys().contains(randomSetting), equalTo(false));
+    }
+
+}

+ 99 - 0
x-pack/plugin/logsdb/src/yamlRestTest/resources/rest-api-spec/test/20_failure_store.yml

@@ -0,0 +1,99 @@
+---
+teardown:
+  - do:
+      indices.delete_data_stream:
+        name: my-logs-fs
+        ignore: 404
+
+  - do:
+      indices.delete_index_template:
+        name: template
+        ignore: 404
+
+  - do:
+      indices.delete_data_stream:
+        name: my-logs-db
+        ignore: 404
+  - do:
+      indices.delete_index_template:
+        name: template1
+        ignore: 404
+
+---
+Test failure store with logsdb:
+  - requires:
+      test_runner_features: [ capabilities, allowed_warnings ]
+      capabilities:
+        - method: PUT
+          path: /{index}
+          capabilities: [ logsdb_index_mode ]
+        - method: POST
+          path: /_bulk
+          capabilities: [ 'failure_store_status' ]
+        - method: PUT
+          path: /_bulk
+          capabilities: [ 'failure_store_status' ]
+      reason: "Support for 'logsdb' index mode & failure status capability required"
+
+  - do:
+      allowed_warnings:
+        - "index template [my-template] has index patterns [my-logs-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template
+        body:
+          index_patterns: ["my-logs-fs*"]
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              index:
+                mode: logsdb
+                number_of_replicas: 1
+                number_of_shards: 2
+  - do:
+      allowed_warnings:
+        - "index template [my-template2] has index patterns [my-logs-db*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
+      indices.put_index_template:
+        name: my-template2
+        body:
+          index_patterns: [ "my-logs-db*" ]
+          data_stream: {}
+          template:
+            settings:
+              index:
+                mode: logsdb
+                number_of_replicas: 1
+                number_of_shards: 2
+
+  - do:
+      bulk:
+        refresh: true
+        body:
+          - '{ "create": { "_index": "my-logs-fs"} }'
+          - '{"@timestamp":"2019-08-06T12:09:12.375Z", "log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer"}'
+          - '{ "create": { "_index": "my-logs-db"} }'
+          - '{ "@timestamp": "2022-01-01", "log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer" }'
+          - '{ "create": { "_index": "my-logs-fs"} }'
+          - '{"log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer"}'
+          - '{ "create": { "_index": "my-logs-db"} }'
+          - '{"log.level": "INFO", "message":"Tomcat started on port(s): 8080 (http) with context path ''", "service.name":"spring-petclinic","process.thread.name":"restartedMain","log.logger":"org.springframework.boot.web.embedded.tomcat.TomcatWebServer"}'
+  - is_true: errors
+
+  # Successfully indexed to backing index
+  - match: { items.0.create._index: '/\.ds-my-logs-fs-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { items.0.create.status: 201 }
+  - is_false: items.0.create.failure_store
+  - match: { items.1.create._index: '/\.ds-my-logs-db-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { items.1.create.status: 201 }
+  - is_false: items.1.create.failure_store
+
+  # Successfully indexed to failure store
+  - match: { items.2.create._index: '/\.fs-my-logs-fs-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
+  - match: { items.2.create.status: 201 }
+  - match: { items.2.create.failure_store: used }
+
+  # Rejected, eligible to go to failure store, but failure store not enabled
+  - match: { items.3.create._index: '/\.ds-my-logs-db-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { items.3.create.status: 400 }
+  - match: { items.3.create.error.type: document_parsing_exception }
+  - match: { items.3.create.failure_store: not_enabled }