Browse Source

Restrict Indexing To Child Streams When Streams Is Enabled (#132011)

* Update BulkRequestModifier to allow wrapping multiple times while preserving ingest time taken

* Modify BulkResponse to have an equals method and update ingest test's to not depend on same instance assertions

This prevents issues when wrapping responses during ingest

* Add new StreamType enum along with logic to check if that stream type is enabled in the cluster

* Modify IngestService to prevent documents being re-routed into child streams

* Modify TransportAbstractBulkAction to prevent indexing into child streams

* Additional tests for new indexing restrictions

* Apply suggestion from @szybia

Co-authored-by: Szymon Bialkowski <szybia@tuta.io>

* Apply suggestions from code review

Co-authored-by: Szymon Bialkowski <szybia@tuta.io>

* Additional PR changes and cleanup

* Additional PR changes to improve performance and readability further

* Update docs/changelog/132011.yaml

* Added additional documentation on bulk modifier wrap methods

* PR Changes

* Use of failure store is now wrapped in cluster feature check

---------

Co-authored-by: Szymon Bialkowski <szybia@tuta.io>
Luke Whiting 2 months ago
parent
commit
12a57e0f9f

+ 5 - 0
docs/changelog/132011.yaml

@@ -0,0 +1,5 @@
+pr: 132011
+summary: Restrict Indexing To Child Streams When Streams Is Enabled
+area: Data streams
+type: enhancement
+issues: []

+ 3 - 1
modules/streams/build.gradle

@@ -20,7 +20,7 @@ esplugin {
 
 restResources {
   restApi {
-    include '_common', 'streams'
+    include '_common', 'streams', "bulk", "index", "ingest", "indices", "delete_by_query", "search"
   }
 }
 
@@ -38,4 +38,6 @@ artifacts {
 
 dependencies {
   testImplementation project(path: ':test:test-clusters')
+  clusterModules project(':modules:ingest-common')
+  clusterModules project(':modules:reindex')
 }

+ 6 - 1
modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java

@@ -29,7 +29,12 @@ public class StreamsYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
     }
 
     @ClassRule
-    public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build();
+    public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
+        .module("streams")
+        .module("ingest-common")
+        .module("reindex")
+        .feature(FeatureFlag.LOGS_STREAM)
+        .build();
 
     @Override
     protected String getTestRestCluster() {

+ 156 - 0
modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml

@@ -0,0 +1,156 @@
+---
+"Check User Can't Write To Substream Directly":
+  - do:
+      streams.logs_enable: { }
+  - is_true: acknowledged
+
+  - do:
+      streams.status: { }
+  - is_true: logs.enabled
+
+  - do:
+      bulk:
+        body: |
+          { "index": { "_index": "logs.foo" } }
+          { "foo": "bar" }
+  - match: { errors: true }
+  - match: { items.0.index.status: 400 }
+  - match: { items.0.index.error.type: "illegal_argument_exception" }
+  - match: { items.0.index.error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }
+
+---
+"Check User Can't Write To Substream Directly With Single Doc":
+  - do:
+      streams.logs_enable: { }
+  - is_true: acknowledged
+
+  - do:
+      streams.status: { }
+  - is_true: logs.enabled
+
+  - do:
+      catch: bad_request
+      index:
+        index: logs.foo
+        id: "1"
+        body:
+          foo: bar
+  - match: { error.type: "illegal_argument_exception" }
+  - match: { error.reason: "Direct writes to child streams are prohibited. Index directly into the [logs] stream instead" }
+
+---
+"Check Bulk Index With Reroute Processor To Substream Is Rejected":
+  - do:
+      streams.logs_enable: { }
+  - is_true: acknowledged
+
+  - do:
+      streams.status: { }
+  - is_true: logs.enabled
+
+  - do:
+      ingest.put_pipeline:
+        id: "reroute-to-logs-foo"
+        body:
+          processors:
+            - reroute:
+                destination: "logs.foo"
+  - do:
+      indices.create:
+        index: "bad-index"
+        body:
+          settings:
+            index.default_pipeline: "reroute-to-logs-foo"
+  - do:
+      bulk:
+        body: |
+          { "index": { "_index": "bad-index" } }
+          { "foo": "bar" }
+  - match: { errors: true }
+  - match: { items.0.index.status: 400 }
+  - match: { items.0.index.error.type: "illegal_argument_exception" }
+  - match: { items.0.index.error.reason: "Pipeline [reroute-to-logs-foo] can't change the target index (from [bad-index] to [logs] child stream [logs.foo]) History: [bad-index]" }
+
+---
+"Check Bulk Index With Script Processor To Substream Is Rejected":
+  - do:
+      streams.logs_enable: { }
+  - is_true: acknowledged
+
+  - do:
+      streams.status: { }
+  - is_true: logs.enabled
+
+  - do:
+      ingest.put_pipeline:
+        id: "script-to-logs-foo"
+        body:
+          processors:
+            - script:
+                source: "ctx._index = 'logs.foo'"
+  - do:
+      indices.create:
+        index: "bad-index-script"
+        body:
+          settings:
+            index.default_pipeline: "script-to-logs-foo"
+  - do:
+      bulk:
+        body: |
+          { "index": { "_index": "bad-index-script" } }
+          { "foo": "bar" }
+  - match: { errors: true }
+  - match: { items.0.index.status: 400 }
+  - match: { items.0.index.error.type: "illegal_argument_exception" }
+  - match: { items.0.index.error.reason: "Pipeline [script-to-logs-foo] can't change the target index (from [bad-index-script] to [logs] child stream [logs.foo]) History: [bad-index-script]" }
+
+---
+"Check Delete By Query Directly On Substream After Reroute Succeeds":
+  - do:
+      streams.logs_enable: { }
+  - is_true: acknowledged
+
+  - do:
+      streams.status: { }
+  - is_true: logs.enabled
+
+  - do:
+      ingest.put_pipeline:
+        id: "reroute-to-logs-foo-success"
+        body:
+          processors:
+            - reroute:
+                destination: "logs.foo"
+  - do:
+      indices.create:
+        index: "logs"
+        body:
+          settings:
+            index.default_pipeline: "reroute-to-logs-foo-success"
+  - do:
+      bulk:
+        refresh: true
+        body: |
+          { "index": { "_index": "logs" } }
+          { "foo": "bar", "baz": "qux" }
+  - match: { errors: false }
+  - match: { items.0.index.status: 201 }
+
+  - do:
+      delete_by_query:
+        index: logs.foo
+        refresh: true
+        body:
+          query:
+            match:
+              foo: "bar"
+  - match: { deleted: 1 }
+  - match: { total: 1 }
+
+  - do:
+      search:
+        index: logs.foo
+        body:
+          query:
+            match_all: {}
+  - match: { hits.total.value: 0 }

+ 65 - 24
server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -105,47 +106,87 @@ final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
      * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
      * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
      * service with the results returned from running the remaining write operations.
+     * <br>
+     * Use this method when you want the ingest time to be taken from the actual {@link BulkResponse} such as if you are wrapping
+     * a response multiple times and wish to preserve an already calculated ingest time.
      *
-     * @param ingestTookInMillis Time elapsed for ingestion to be passed to final result.
-     * @param actionListener The action listener that expects the final bulk response.
-     * @return An action listener that combines ingest failure results with the results from writing the remaining documents.
+     * @param actionListener the listener to wrap
+     * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
+     */
+    ActionListener<BulkResponse> wrapActionListenerIfNeeded(ActionListener<BulkResponse> actionListener) {
+        if (itemResponses.isEmpty()) {
+            return actionListener;
+        } else {
+            return doWrapActionListenerIfNeeded(BulkResponse::getIngestTookInMillis, actionListener);
+        }
+    }
+
+    /**
+     * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
+     * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
+     * service with the results returned from running the remaining write operations.
+     * <br>
+     * This variant is used when the ingest time is already known and should be explicitly set in the final response,
+     * rather than extracted from the {@link BulkResponse}.
+     *
+     * @param ingestTookInMillis the ingest time in milliseconds to use in the final response
+     * @param actionListener the listener to wrap
+     * @return a wrapped listener that merges ingest and bulk results, or the original listener if no items were dropped/failed
      */
     ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
         if (itemResponses.isEmpty()) {
             return actionListener.map(
                 response -> new BulkResponse(
                     response.getItems(),
-                    response.getTook().getMillis(),
+                    response.getTookInMillis(),
                     ingestTookInMillis,
                     response.getIncrementalState()
                 )
             );
         } else {
-            return actionListener.map(response -> {
-                // these items are the responses from the subsequent bulk request, their 'slots'
-                // are not correct for this response we're building
-                final BulkItemResponse[] bulkResponses = response.getItems();
+            return doWrapActionListenerIfNeeded(ignoredResponse -> ingestTookInMillis, actionListener);
+        }
+    }
+
+    /**
+     * If documents were dropped or failed in ingest, this method wraps the action listener that will be notified when the
+     * updated bulk operation is completed. The wrapped listener combines the dropped and failed document results from the ingest
+     * service with the results returned from running the remaining write operations.
+     *
+     * @param ingestTimeProviderFunction A function to provide the ingest time taken for this response
+     * @param actionListener The action listener that expects the final bulk response.
+     * @return An action listener that combines ingest failure results with the results from writing the remaining documents.
+     */
+    private ActionListener<BulkResponse> doWrapActionListenerIfNeeded(
+        Function<BulkResponse, Long> ingestTimeProviderFunction,
+        ActionListener<BulkResponse> actionListener
+    ) {
+        return actionListener.map(response -> {
+            // these items are the responses from the subsequent bulk request, their 'slots'
+            // are not correct for this response we're building
+            final BulkItemResponse[] bulkResponses = response.getItems();
 
-                final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];
+            final BulkItemResponse[] allResponses = new BulkItemResponse[bulkResponses.length + itemResponses.size()];
 
-                // the item responses are from the original request, so their slots are correct.
-                // these are the responses for requests that failed early and were not passed on to the subsequent bulk.
-                for (BulkItemResponse item : itemResponses) {
-                    allResponses[item.getItemId()] = item;
-                }
+            // the item responses are from the original request, so their slots are correct.
+            // these are the responses for requests that failed early and were not passed on to the subsequent bulk.
+            for (BulkItemResponse item : itemResponses) {
+                allResponses[item.getItemId()] = item;
+            }
 
-                // use the original slots for the responses from the bulk
-                for (int i = 0; i < bulkResponses.length; i++) {
-                    allResponses[originalSlots.get(i)] = bulkResponses[i];
-                }
+            // use the original slots for the responses from the bulk
+            for (int i = 0; i < bulkResponses.length; i++) {
+                allResponses[originalSlots.get(i)] = bulkResponses[i];
+            }
 
-                if (Assertions.ENABLED) {
-                    assertResponsesAreCorrect(bulkResponses, allResponses);
-                }
+            if (Assertions.ENABLED) {
+                assertResponsesAreCorrect(bulkResponses, allResponses);
+            }
 
-                return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
-            });
-        }
+            var ingestTookInMillis = ingestTimeProviderFunction.apply(response);
+
+            return new BulkResponse(allResponses, response.getTook().getMillis(), ingestTookInMillis, response.getIncrementalState());
+        });
     }
 
     private void assertResponsesAreCorrect(BulkItemResponse[] bulkResponses, BulkItemResponse[] allResponses) {

+ 48 - 3
server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

@@ -26,15 +26,18 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.ComponentTemplate;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.ProjectId;
 import org.elasticsearch.cluster.metadata.ProjectMetadata;
 import org.elasticsearch.cluster.project.ProjectResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.streams.StreamType;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.ingest.IngestService;
@@ -69,6 +72,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
     protected final Executor coordinationExecutor;
     protected final Executor systemCoordinationExecutor;
     private final ActionType<BulkResponse> bulkAction;
+    protected final FeatureService featureService;
 
     public TransportAbstractBulkAction(
         ActionType<BulkResponse> action,
@@ -81,7 +85,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
         IndexingPressure indexingPressure,
         SystemIndices systemIndices,
         ProjectResolver projectResolver,
-        LongSupplier relativeTimeNanosProvider
+        LongSupplier relativeTimeNanosProvider,
+        FeatureService featureService
     ) {
         super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
         this.threadPool = threadPool;
@@ -93,6 +98,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
         this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
         this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
         this.ingestForwarder = new IngestActionForwarder(transportService);
+        this.featureService = featureService;
         clusterService.addStateApplier(this.ingestForwarder);
         this.relativeTimeNanosProvider = relativeTimeNanosProvider;
         this.bulkAction = action;
@@ -396,8 +402,47 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
         ActionListener<BulkResponse> listener
     ) throws IOException {
         final long relativeStartTimeNanos = relativeTimeNanos();
-        if (applyPipelines(task, bulkRequest, executor, listener) == false) {
-            doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
+
+        // Validate child stream writes before processing pipelines
+        ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
+        BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
+
+        DocWriteRequest<?> req;
+        int i = -1;
+        while (bulkRequestModifier.hasNext()) {
+            req = bulkRequestModifier.next();
+            i++;
+
+            for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(projectMetadata)) {
+                if (req instanceof IndexRequest ir && streamType.matchesStreamPrefix(req.index()) && ir.isPipelineResolved() == false) {
+                    IllegalArgumentException e = new IllegalArgumentException(
+                        "Direct writes to child streams are prohibited. Index directly into the ["
+                            + streamType.getStreamName()
+                            + "] stream instead"
+                    );
+                    Boolean failureStoreEnabled = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
+
+                    if (featureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)) {
+                        if (Boolean.TRUE.equals(failureStoreEnabled)) {
+                            bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
+                        } else if (Boolean.FALSE.equals(failureStoreEnabled)) {
+                            bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_ENABLED);
+                        } else {
+                            bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
+                        }
+                    } else {
+                        bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
+                    }
+
+                    break;
+                }
+            }
+        }
+
+        var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
+
+        if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) {
+            doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos);
         }
     }
 

+ 2 - 3
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -86,7 +86,6 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
     private final OriginSettingClient rolloverClient;
     private final FailureStoreMetrics failureStoreMetrics;
     private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings;
-    private final FeatureService featureService;
 
     @Inject
     public TransportBulkAction(
@@ -187,7 +186,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
             indexingPressure,
             systemIndices,
             projectResolver,
-            relativeTimeProvider
+            relativeTimeProvider,
+            featureService
         );
         this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings;
         Objects.requireNonNull(relativeTimeProvider);
@@ -195,7 +195,6 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
         this.failureStoreMetrics = failureStoreMetrics;
-        this.featureService = featureService;
     }
 
     public static <Response extends ReplicationResponse & WriteResponse> ActionListener<BulkResponse> unwrappingSingleItemBulkResponse(

+ 5 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

@@ -36,6 +36,7 @@ import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.index.IndexSettingProvider;
 import org.elasticsearch.index.IndexSettingProviders;
@@ -102,7 +103,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
         ProjectResolver projectResolver,
         IndicesService indicesService,
         NamedXContentRegistry xContentRegistry,
-        IndexSettingProviders indexSettingProviders
+        IndexSettingProviders indexSettingProviders,
+        FeatureService featureService
     ) {
         super(
             SimulateBulkAction.INSTANCE,
@@ -115,7 +117,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
             indexingPressure,
             systemIndices,
             projectResolver,
-            threadPool::relativeTimeInNanos
+            threadPool::relativeTimeInNanos,
+            featureService
         );
         this.indicesService = indicesService;
         this.xContentRegistry = xContentRegistry;

+ 54 - 0
server/src/main/java/org/elasticsearch/common/streams/StreamType.java

@@ -0,0 +1,54 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.common.streams;
+
+import org.elasticsearch.cluster.metadata.ProjectMetadata;
+import org.elasticsearch.cluster.metadata.StreamsMetadata;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public enum StreamType {
+
+    LOGS("logs");
+
+    private final String streamName;
+
+    StreamType(String streamName) {
+        this.streamName = streamName;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public boolean streamTypeIsEnabled(ProjectMetadata projectMetadata) {
+        StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY);
+        return switch (this) {
+            case LOGS -> metadata.isLogsEnabled();
+        };
+    }
+
+    public boolean matchesStreamPrefix(String indexName) {
+        if (indexName == null) {
+            return false;
+        }
+        return indexName.startsWith(streamName + ".");
+    }
+
+    public static Set<StreamType> getEnabledStreamTypesForProject(ProjectMetadata projectMetadata) {
+        return Arrays.stream(values())
+            .filter(t -> t.streamTypeIsEnabled(projectMetadata))
+            .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
+    }
+
+}

+ 24 - 0
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -57,6 +57,7 @@ import org.elasticsearch.common.logging.DeprecationCategory;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.regex.Regex;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.streams.StreamType;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -1282,6 +1283,29 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         return; // document failed!
                     }
 
+                    for (StreamType streamType : StreamType.getEnabledStreamTypesForProject(project)) {
+                        if (streamType.matchesStreamPrefix(newIndex)
+                            && ingestDocument.getIndexHistory().contains(streamType.getStreamName()) == false) {
+                            exceptionHandler.accept(
+                                new IngestPipelineException(
+                                    pipelineId,
+                                    new IllegalArgumentException(
+                                        format(
+                                            "Pipeline [%s] can't change the target index (from [%s] to [%s] child stream [%s]) "
+                                                + "History: [%s]",
+                                            pipelineId,
+                                            originalIndex,
+                                            streamType.getStreamName(),
+                                            newIndex,
+                                            String.join(", ", ingestDocument.getIndexHistory())
+                                        )
+                                    )
+                                )
+                            );
+                            return; // document failed!
+                        }
+                    }
+
                     // add the index to the document's index history, and check for cycles in the visited indices
                     boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false;
                     if (cycle) {

+ 7 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.ingest.SimulateIndexResponse;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
 import org.elasticsearch.cluster.metadata.ProjectId;
@@ -30,6 +31,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.index.IndexSettingProviders;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
@@ -79,6 +81,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
     private ClusterService clusterService;
     private TestThreadPool threadPool;
     private IndicesService indicesService;
+    private FeatureService mockFeatureService;
 
     private TestTransportSimulateBulkAction bulkAction;
 
@@ -96,7 +99,8 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
                 TestProjectResolvers.DEFAULT_PROJECT_ONLY,
                 indicesService,
                 NamedXContentRegistry.EMPTY,
-                new IndexSettingProviders(Set.of())
+                new IndexSettingProviders(Set.of()),
+                mockFeatureService
             );
         }
     }
@@ -126,6 +130,8 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
         transportService.acceptIncomingRequests();
         indicesService = mock(IndicesService.class);
         bulkAction = new TestTransportSimulateBulkAction();
+        mockFeatureService = mock(FeatureService.class);
+        when(mockFeatureService.clusterHasFeature(clusterService.state(), DataStream.DATA_STREAM_FAILURE_STORE_FEATURE)).thenReturn(false);
     }
 
     @After