Browse Source

Collect APM metrics for failure stores (#108279)

This PR adds APM metrics for failure stores. See the JavaDoc comments in `FailureStoreMetrics.java` for a detailed explanation on the individual metrics.
Niels Bauman 1 year ago
parent
commit
10d665ba6b
19 changed files with 781 additions and 129 deletions
  1. 428 0
      modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java
  2. 49 41
      server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
  3. 98 0
      server/src/main/java/org/elasticsearch/action/bulk/FailureStoreMetrics.java
  4. 5 3
      server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java
  5. 61 39
      server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
  6. 2 2
      server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
  7. 19 0
      server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java
  8. 41 7
      server/src/main/java/org/elasticsearch/ingest/IngestService.java
  9. 5 1
      server/src/main/java/org/elasticsearch/node/NodeConstruction.java
  10. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java
  11. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
  12. 8 6
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
  13. 13 8
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
  14. 2 1
      server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java
  15. 3 1
      server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java
  16. 20 13
      server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
  17. 15 2
      server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java
  18. 5 2
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  19. 3 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

+ 428 - 0
modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/IngestFailureStoreMetricsIT.java

@@ -0,0 +1,428 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.datastreams;
+
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
+import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
+import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
+import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
+import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
+import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
+import org.elasticsearch.action.datastreams.CreateDataStreamAction;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.compress.CompressedXContent;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.IngestTestPlugin;
+import org.elasticsearch.ingest.Processor;
+import org.elasticsearch.ingest.TestProcessor;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.PluginsService;
+import org.elasticsearch.telemetry.Measurement;
+import org.elasticsearch.telemetry.TestTelemetryPlugin;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xcontent.XContentType;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+
+/**
+ * An integration test that verifies how different paths/scenarios affect the APM metrics for failure stores.
+ */
+@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.SUITE)
+public class IngestFailureStoreMetricsIT extends ESIntegTestCase {
+
+    private static final List<String> METRICS = List.of(
+        FailureStoreMetrics.METRIC_TOTAL,
+        FailureStoreMetrics.METRIC_FAILURE_STORE,
+        FailureStoreMetrics.METRIC_REJECTED
+    );
+
+    private String template;
+    private String dataStream;
+    private String pipeline;
+
+    @Before
+    public void initializeRandomNames() {
+        template = "template-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        dataStream = "data-stream-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        pipeline = "pipeline-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
+        logger.info(
+            "--> running [{}] with generated names data stream [{}], template [{}] and pipeline [{}]",
+            getTestName(),
+            dataStream,
+            template,
+            pipeline
+        );
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return List.of(DataStreamsPlugin.class, CustomIngestTestPlugin.class, TestTelemetryPlugin.class, MapperExtrasPlugin.class);
+    }
+
+    public void testNoPipelineNoFailures() throws IOException {
+        putComposableIndexTemplate(true);
+        createDataStream();
+
+        int nrOfDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfDocs, null);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfDocs, dataStream);
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
+    }
+
+    public void testFailingPipelineNoFailureStore() throws IOException {
+        putComposableIndexTemplate(false);
+        createDataStream();
+        createBasicPipeline("fail");
+
+        int nrOfSuccessfulDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfSuccessfulDocs, null);
+        int nrOfFailingDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfFailingDocs, pipeline);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfSuccessfulDocs + nrOfFailingDocs, dataStream);
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
+        assertMeasurements(
+            measurements.get(FailureStoreMetrics.METRIC_REJECTED),
+            nrOfFailingDocs,
+            dataStream,
+            FailureStoreMetrics.ErrorLocation.PIPELINE,
+            false
+        );
+    }
+
+    public void testFailingPipelineWithFailureStore() throws IOException {
+        putComposableIndexTemplate(true);
+        createDataStream();
+        createBasicPipeline("fail");
+
+        int nrOfSuccessfulDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfSuccessfulDocs, null);
+        int nrOfFailingDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfFailingDocs, pipeline);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfSuccessfulDocs + nrOfFailingDocs, dataStream);
+        assertMeasurements(
+            measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE),
+            nrOfFailingDocs,
+            dataStream,
+            FailureStoreMetrics.ErrorLocation.PIPELINE
+        );
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
+    }
+
+    public void testShardFailureNoFailureStore() throws IOException {
+        putComposableIndexTemplate(false);
+        createDataStream();
+
+        int nrOfSuccessfulDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfSuccessfulDocs, null);
+        int nrOfFailingDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfFailingDocs, "\"foo\"", null);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfSuccessfulDocs + nrOfFailingDocs, dataStream);
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
+        assertMeasurements(
+            measurements.get(FailureStoreMetrics.METRIC_REJECTED),
+            nrOfFailingDocs,
+            dataStream,
+            FailureStoreMetrics.ErrorLocation.SHARD,
+            false
+        );
+    }
+
+    public void testShardFailureWithFailureStore() throws IOException {
+        putComposableIndexTemplate(true);
+        createDataStream();
+
+        int nrOfSuccessfulDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfSuccessfulDocs, null);
+        int nrOfFailingDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfFailingDocs, "\"foo\"", null);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfSuccessfulDocs + nrOfFailingDocs, dataStream);
+        assertMeasurements(
+            measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE),
+            nrOfFailingDocs,
+            dataStream,
+            FailureStoreMetrics.ErrorLocation.SHARD
+        );
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
+    }
+
+    /**
+     * Make sure the rejected counter gets incremented when there were shard-level failures while trying to redirect a document to the
+     * failure store.
+     */
+    public void testRejectionFromFailureStore() throws IOException {
+        putComposableIndexTemplate(true);
+        createDataStream();
+
+        // Initialize failure store.
+        var rolloverRequest = new RolloverRequest(dataStream, null);
+        rolloverRequest.setIndicesOptions(
+            IndicesOptions.builder(rolloverRequest.indicesOptions())
+                .failureStoreOptions(opts -> opts.includeFailureIndices(true).includeRegularIndices(false))
+                .build()
+        );
+        var rolloverResponse = client().execute(RolloverAction.INSTANCE, rolloverRequest).actionGet();
+        var failureStoreIndex = rolloverResponse.getNewIndex();
+        // Add a write block to the failure store index, which causes shard-level "failures".
+        var addIndexBlockRequest = new AddIndexBlockRequest(IndexMetadata.APIBlock.WRITE, failureStoreIndex);
+        client().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest).actionGet();
+
+        int nrOfSuccessfulDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfSuccessfulDocs, null);
+        int nrOfFailingDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfFailingDocs, "\"foo\"", null);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfSuccessfulDocs + nrOfFailingDocs, dataStream);
+        assertMeasurements(
+            measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE),
+            nrOfFailingDocs,
+            dataStream,
+            FailureStoreMetrics.ErrorLocation.SHARD
+        );
+        assertMeasurements(
+            measurements.get(FailureStoreMetrics.METRIC_REJECTED),
+            nrOfFailingDocs,
+            dataStream,
+            FailureStoreMetrics.ErrorLocation.SHARD,
+            true
+        );
+    }
+
+    /**
+     * Make sure metrics get the correct <code>data_stream</code> attribute after a reroute.
+     */
+    public void testRerouteSuccessfulCorrectName() throws IOException {
+        putComposableIndexTemplate(false);
+        createDataStream();
+
+        String destination = dataStream + "-destination";
+        final var createDataStreamRequest = new CreateDataStreamAction.Request(destination);
+        assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());
+        createReroutePipeline(destination);
+
+        int nrOfDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfDocs, pipeline);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfDocs, destination);
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
+    }
+
+    public void testDropping() throws IOException {
+        putComposableIndexTemplate(true);
+        createDataStream();
+        createBasicPipeline("drop");
+
+        int nrOfDocs = randomIntBetween(5, 10);
+        indexDocs(dataStream, nrOfDocs, pipeline);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfDocs, dataStream);
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
+    }
+
+    public void testDataStreamAlias() throws IOException {
+        putComposableIndexTemplate(false);
+        createDataStream();
+        var indicesAliasesRequest = new IndicesAliasesRequest();
+        indicesAliasesRequest.addAliasAction(
+            IndicesAliasesRequest.AliasActions.add().alias("some-alias").index(dataStream).writeIndex(true)
+        );
+        client().execute(TransportIndicesAliasesAction.TYPE, indicesAliasesRequest).actionGet();
+
+        int nrOfDocs = randomIntBetween(5, 10);
+        indexDocs("some-alias", nrOfDocs, null);
+
+        var measurements = collectTelemetry();
+        assertMeasurements(measurements.get(FailureStoreMetrics.METRIC_TOTAL), nrOfDocs, dataStream);
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_FAILURE_STORE).size());
+        assertEquals(0, measurements.get(FailureStoreMetrics.METRIC_REJECTED).size());
+    }
+
+    private void putComposableIndexTemplate(boolean failureStore) throws IOException {
+        TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(template);
+        request.indexTemplate(
+            ComposableIndexTemplate.builder()
+                .indexPatterns(List.of(dataStream + "*"))
+                .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, failureStore))
+                .template(new Template(null, new CompressedXContent("""
+                    {
+                      "dynamic": false,
+                      "properties": {
+                        "@timestamp": {
+                          "type": "date"
+                        },
+                        "count": {
+                            "type": "long"
+                        }
+                      }
+                    }"""), null))
+                .build()
+        );
+        client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
+    }
+
+    private void createDataStream() {
+        final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStream);
+        assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());
+    }
+
+    private void createBasicPipeline(String processorType) {
+        createPipeline(Strings.format("\"%s\": {}", processorType));
+    }
+
+    private void createReroutePipeline(String destination) {
+        createPipeline(Strings.format("\"reroute\": {\"destination\": \"%s\"}", destination));
+    }
+
+    private void createPipeline(String processor) {
+        String pipelineDefinition = Strings.format("{\"processors\": [{%s}]}", processor);
+        BytesReference bytes = new BytesArray(pipelineDefinition);
+        clusterAdmin().putPipeline(new PutPipelineRequest(pipeline, bytes, XContentType.JSON)).actionGet();
+    }
+
+    private void indexDocs(String dataStream, int numDocs, String pipeline) {
+        indexDocs(dataStream, numDocs, "1", pipeline);
+    }
+
+    private void indexDocs(String dataStream, int numDocs, String value, String pipeline) {
+        BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        for (int i = 0; i < numDocs; i++) {
+            String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
+            bulkRequest.add(
+                new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
+                    .source(Strings.format("{\"%s\":\"%s\", \"count\": %s}", DEFAULT_TIMESTAMP_FIELD, time, value), XContentType.JSON)
+                    .setPipeline(pipeline)
+            );
+        }
+        client().bulk(bulkRequest).actionGet();
+    }
+
+    private static Map<String, List<Measurement>> collectTelemetry() {
+        Map<String, List<Measurement>> measurements = new HashMap<>();
+        for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) {
+            final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();
+
+            telemetryPlugin.collect();
+
+            for (String metricName : METRICS) {
+                measurements.put(metricName, telemetryPlugin.getLongCounterMeasurement(metricName));
+            }
+        }
+        return measurements;
+    }
+
+    private void assertMeasurements(List<Measurement> measurements, int expectedSize, String expectedDataStream) {
+        assertMeasurements(measurements, expectedSize, expectedDataStream, (Consumer<Measurement>) null);
+    }
+
+    private void assertMeasurements(
+        List<Measurement> measurements,
+        int expectedSize,
+        String expectedDataStream,
+        FailureStoreMetrics.ErrorLocation location
+    ) {
+        assertMeasurements(
+            measurements,
+            expectedSize,
+            expectedDataStream,
+            measurement -> assertEquals(location.name(), measurement.attributes().get("error_location"))
+        );
+    }
+
+    private void assertMeasurements(
+        List<Measurement> measurements,
+        int expectedSize,
+        String expectedDataStream,
+        FailureStoreMetrics.ErrorLocation location,
+        boolean failureStore
+    ) {
+        assertMeasurements(measurements, expectedSize, expectedDataStream, measurement -> {
+            assertEquals(location.name(), measurement.attributes().get("error_location"));
+            assertEquals(failureStore, measurement.attributes().get("failure_store"));
+        });
+    }
+
+    private void assertMeasurements(
+        List<Measurement> measurements,
+        int expectedSize,
+        String expectedDataStream,
+        Consumer<Measurement> customAssertion
+    ) {
+        assertEquals(expectedSize, measurements.size());
+        for (Measurement measurement : measurements) {
+            assertEquals(expectedDataStream, measurement.attributes().get("data_stream"));
+            if (customAssertion != null) {
+                customAssertion.accept(measurement);
+            }
+        }
+    }
+
+    public static class CustomIngestTestPlugin extends IngestTestPlugin {
+        @Override
+        public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
+            Map<String, Processor.Factory> processors = new HashMap<>();
+            processors.put(
+                "drop",
+                (factories, tag, description, config) -> new TestProcessor(tag, "drop", description, ingestDocument -> null)
+            );
+            processors.put("reroute", (factories, tag, description, config) -> {
+                String destination = (String) config.remove("destination");
+                return new TestProcessor(
+                    tag,
+                    "reroute",
+                    description,
+                    (Consumer<IngestDocument>) ingestDocument -> ingestDocument.reroute(destination)
+                );
+            });
+            processors.put(
+                "fail",
+                (processorFactories, tag, description, config) -> new TestProcessor(tag, "fail", description, new RuntimeException())
+            );
+            return processors;
+        }
+    }
+}

+ 49 - 41
server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

@@ -10,7 +10,9 @@ package org.elasticsearch.action.bulk;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
@@ -91,6 +93,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
     private final OriginSettingClient rolloverClient;
     private final Set<String> failureStoresToBeRolledOver = ConcurrentCollections.newConcurrentSet();
     private final Set<Integer> failedRolloverRequests = ConcurrentCollections.newConcurrentSet();
+    private final FailureStoreMetrics failureStoreMetrics;
 
     BulkOperation(
         Task task,
@@ -104,7 +107,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
         IndexNameExpressionResolver indexNameExpressionResolver,
         LongSupplier relativeTimeProvider,
         long startTimeNanos,
-        ActionListener<BulkResponse> listener
+        ActionListener<BulkResponse> listener,
+        FailureStoreMetrics failureStoreMetrics
     ) {
         this(
             task,
@@ -120,7 +124,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
             startTimeNanos,
             listener,
             new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()),
-            new FailureStoreDocumentConverter()
+            new FailureStoreDocumentConverter(),
+            failureStoreMetrics
         );
     }
 
@@ -138,7 +143,8 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
         long startTimeNanos,
         ActionListener<BulkResponse> listener,
         ClusterStateObserver observer,
-        FailureStoreDocumentConverter failureStoreDocumentConverter
+        FailureStoreDocumentConverter failureStoreDocumentConverter,
+        FailureStoreMetrics failureStoreMetrics
     ) {
         super(listener);
         this.task = task;
@@ -156,6 +162,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
         this.observer = observer;
         this.failureStoreDocumentConverter = failureStoreDocumentConverter;
         this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
+        this.failureStoreMetrics = failureStoreMetrics;
     }
 
     @Override
@@ -437,17 +444,11 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
                 for (int idx = 0; idx < bulkShardResponse.getResponses().length; idx++) {
                     // We zip the requests and responses together so that we can identify failed documents and potentially store them
                     BulkItemResponse bulkItemResponse = bulkShardResponse.getResponses()[idx];
+                    BulkItemRequest bulkItemRequest = bulkShardRequest.items()[idx];
 
                     if (bulkItemResponse.isFailed()) {
-                        BulkItemRequest bulkItemRequest = bulkShardRequest.items()[idx];
                         assert bulkItemRequest.id() == bulkItemResponse.getItemId() : "Bulk items were returned out of order";
-
-                        DataStream failureStoreReference = getRedirectTarget(bulkItemRequest.request(), getClusterState().metadata());
-                        if (failureStoreReference != null) {
-                            maybeMarkFailureStoreForRollover(failureStoreReference);
-                            var cause = bulkItemResponse.getFailure().getCause();
-                            addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreReference.getName());
-                        }
+                        processFailure(bulkItemRequest, bulkItemResponse.getFailure().getCause());
                         addFailure(bulkItemResponse);
                     } else {
                         bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
@@ -464,11 +465,7 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
                     final String indexName = request.index();
                     DocWriteRequest<?> docWriteRequest = request.request();
 
-                    DataStream failureStoreReference = getRedirectTarget(docWriteRequest, getClusterState().metadata());
-                    if (failureStoreReference != null) {
-                        maybeMarkFailureStoreForRollover(failureStoreReference);
-                        addDocumentToRedirectRequests(request, e, failureStoreReference.getName());
-                    }
+                    processFailure(request, e);
                     addFailure(docWriteRequest, request.id(), indexName, e);
                 }
                 completeShardOperation();
@@ -479,45 +476,56 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
                 clusterState = null;
                 releaseOnFinish.close();
             }
+
+            private void processFailure(BulkItemRequest bulkItemRequest, Exception cause) {
+                var errorType = ElasticsearchException.getExceptionName(ExceptionsHelper.unwrapCause(cause));
+                DocWriteRequest<?> docWriteRequest = bulkItemRequest.request();
+                DataStream failureStoreCandidate = getRedirectTargetCandidate(docWriteRequest, getClusterState().metadata());
+                // If the candidate is not null, the BulkItemRequest targets a data stream, but we'll still have to check if
+                // it has the failure store enabled.
+                if (failureStoreCandidate != null) {
+                    // Do not redirect documents to a failure store that were already headed to one.
+                    var isFailureStoreDoc = docWriteRequest instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore();
+                    if (isFailureStoreDoc == false && failureStoreCandidate.isFailureStoreEnabled()) {
+                        // Redirect to failure store.
+                        maybeMarkFailureStoreForRollover(failureStoreCandidate);
+                        addDocumentToRedirectRequests(bulkItemRequest, cause, failureStoreCandidate.getName());
+                        failureStoreMetrics.incrementFailureStore(
+                            bulkItemRequest.index(),
+                            errorType,
+                            FailureStoreMetrics.ErrorLocation.SHARD
+                        );
+                    } else {
+                        // If we can't redirect to a failure store (because either the data stream doesn't have the failure store enabled
+                        // or this request was already targeting a failure store), we increment the rejected counter.
+                        failureStoreMetrics.incrementRejected(
+                            bulkItemRequest.index(),
+                            errorType,
+                            FailureStoreMetrics.ErrorLocation.SHARD,
+                            isFailureStoreDoc
+                        );
+                    }
+                }
+            }
         });
     }
 
     /**
-     * Determines if the write request can be redirected if it fails. Write requests can be redirected IFF they are targeting a data stream
-     * with a failure store and are not already redirected themselves. If the document can be redirected, the data stream name to use for
-     * the redirection is returned.
+     * Tries to find a <i>candidate</i> redirect target for this write request. A candidate redirect target is a data stream that may or
+     * may not have the failure store enabled.
      *
      * @param docWriteRequest the write request to check
      * @param metadata cluster state metadata for resolving index abstractions
-     * @return a data stream if the write request points to a data stream that has the failure store enabled, or {@code null} if it does not
+     * @return a data stream if the write request points to a data stream, or {@code null} if it does not
      */
-    private static DataStream getRedirectTarget(DocWriteRequest<?> docWriteRequest, Metadata metadata) {
+    private static DataStream getRedirectTargetCandidate(DocWriteRequest<?> docWriteRequest, Metadata metadata) {
         // Feature flag guard
         if (DataStream.isFailureStoreFeatureFlagEnabled() == false) {
             return null;
         }
-        // Do not resolve a failure store for documents that were already headed to one
-        if (docWriteRequest instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore()) {
-            return null;
-        }
         // If there is no index abstraction, then the request is using a pattern of some sort, which data streams do not support
         IndexAbstraction ia = metadata.getIndicesLookup().get(docWriteRequest.index());
-        if (ia == null) {
-            return null;
-        }
-        if (ia.isDataStreamRelated()) {
-            // The index abstraction could be an alias. Alias abstractions (even for data streams) only keep track of which _index_ they
-            // will write to, not which _data stream_.
-            // We work backward to find the data stream from the concrete write index to cover this case.
-            Index concreteIndex = ia.getWriteIndex();
-            IndexAbstraction writeIndexAbstraction = metadata.getIndicesLookup().get(concreteIndex.getName());
-            DataStream parentDataStream = writeIndexAbstraction.getParentDataStream();
-            if (parentDataStream != null && parentDataStream.isFailureStoreEnabled()) {
-                // Keep the data stream name around to resolve the redirect to failure store if the shard level request fails.
-                return parentDataStream;
-            }
-        }
-        return null;
+        return DataStream.resolveDataStream(ia, metadata);
     }
 
     /**

+ 98 - 0
server/src/main/java/org/elasticsearch/action/bulk/FailureStoreMetrics.java

@@ -0,0 +1,98 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.telemetry.metric.LongCounter;
+import org.elasticsearch.telemetry.metric.MeterRegistry;
+
+import java.util.Map;
+
+/**
+ * A class containing APM metrics for failure stores. See the JavaDoc on the individual methods for an explanation on what they're tracking.
+ * General notes:
+ * <ul>
+ *     <li>When a document is rerouted in a pipeline, the destination data stream is used for the metric attribute(s).</li>
+ * </ul>
+ */
+public class FailureStoreMetrics {
+
+    public static final FailureStoreMetrics NOOP = new FailureStoreMetrics(MeterRegistry.NOOP);
+
+    public static final String METRIC_TOTAL = "es.data_stream.ingest.documents.total";
+    public static final String METRIC_FAILURE_STORE = "es.data_stream.ingest.documents.failure_store.total";
+    public static final String METRIC_REJECTED = "es.data_stream.ingest.documents.rejected.total";
+
+    private final LongCounter totalCounter;
+    private final LongCounter failureStoreCounter;
+    private final LongCounter rejectedCounter;
+
+    public FailureStoreMetrics(MeterRegistry meterRegistry) {
+        totalCounter = meterRegistry.registerLongCounter(METRIC_TOTAL, "total number of documents that were sent to a data stream", "unit");
+        failureStoreCounter = meterRegistry.registerLongCounter(
+            METRIC_FAILURE_STORE,
+            "number of documents that got redirected to the failure store",
+            "unit"
+        );
+        rejectedCounter = meterRegistry.registerLongCounter(METRIC_REJECTED, "number of documents that were rejected", "unit");
+    }
+
+    /**
+     * This counter tracks the number of documents that we <i>tried</i> to index into a data stream. This includes documents
+     * that were dropped by a pipeline. This counter will only be incremented once for every incoming document (even when it gets
+     * redirected to the failure store and/or gets rejected).
+     * @param dataStream the name of the data stream
+     */
+    public void incrementTotal(String dataStream) {
+        totalCounter.incrementBy(1, Map.of("data_stream", dataStream));
+    }
+
+    /**
+     * This counter tracks the number of documents that we <i>tried</i> to store into a failure store. This includes both pipeline and
+     * shard-level failures.
+     * @param dataStream the name of the data stream
+     * @param errorType the error type (i.e. the name of the exception that was thrown)
+     * @param errorLocation where this failure occurred
+     */
+    public void incrementFailureStore(String dataStream, String errorType, ErrorLocation errorLocation) {
+        failureStoreCounter.incrementBy(
+            1,
+            Map.of("data_stream", dataStream, "error_type", errorType, "error_location", errorLocation.name())
+        );
+    }
+
+    /**
+     * This counter tracks the number of documents that failed to get stored in Elasticsearch. Meaning, any document that did not get
+     * stored in the data stream or in its failure store.
+     * @param dataStream the name of the data stream
+     * @param errorType the error type (i.e. the name of the exception that was thrown)
+     * @param errorLocation where this failure occurred
+     * @param failureStore whether this failure occurred while trying to ingest into a failure store (<code>true</code>) or in the data
+     * stream itself (<code>false</code>)
+     */
+    public void incrementRejected(String dataStream, String errorType, ErrorLocation errorLocation, boolean failureStore) {
+        rejectedCounter.incrementBy(
+            1,
+            Map.of(
+                "data_stream",
+                dataStream,
+                "error_type",
+                errorType,
+                "error_location",
+                errorLocation.name(),
+                "failure_store",
+                failureStore
+            )
+        );
+    }
+
+    public enum ErrorLocation {
+        PIPELINE,
+        SHARD;
+    }
+}

+ 5 - 3
server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

@@ -222,7 +222,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
             original.numberOfActions(),
             () -> bulkRequestModifier,
             bulkRequestModifier::markItemAsDropped,
-            (indexName) -> shouldStoreFailure(indexName, metadata, threadPool.absoluteTimeInMillis()),
+            (indexName) -> resolveFailureStore(indexName, metadata, threadPool.absoluteTimeInMillis()),
             bulkRequestModifier::markItemForFailureStore,
             bulkRequestModifier::markItemAsFailed,
             (originalThread, exception) -> {
@@ -274,13 +274,15 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
     /**
      * Determines if an index name is associated with either an existing data stream or a template
      * for one that has the failure store enabled.
+     *
      * @param indexName The index name to check.
      * @param metadata Cluster state metadata.
      * @param epochMillis A timestamp to use when resolving date math in the index name.
      * @return true if this is not a simulation, and the given index name corresponds to a data stream with a failure store
-     * or if it matches a template that has a data stream failure store enabled.
+     * or if it matches a template that has a data stream failure store enabled. Returns false if the index name corresponds to a
+     * data stream, but it doesn't have the failure store enabled. Returns null when it doesn't correspond to a data stream.
      */
-    protected abstract boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis);
+    protected abstract Boolean resolveFailureStore(String indexName, Metadata metadata, long epochMillis);
 
     /**
      * Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions.  Upserts are

+ 61 - 39
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

@@ -42,7 +42,6 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
 import org.elasticsearch.features.FeatureService;
-import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexingPressure;
 import org.elasticsearch.index.VersionType;
@@ -57,7 +56,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.Executor;
@@ -82,6 +80,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
     private final NodeClient client;
     private final IndexNameExpressionResolver indexNameExpressionResolver;
     private final OriginSettingClient rolloverClient;
+    private final FailureStoreMetrics failureStoreMetrics;
 
     @Inject
     public TransportBulkAction(
@@ -94,7 +93,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         ActionFilters actionFilters,
         IndexNameExpressionResolver indexNameExpressionResolver,
         IndexingPressure indexingPressure,
-        SystemIndices systemIndices
+        SystemIndices systemIndices,
+        FailureStoreMetrics failureStoreMetrics
     ) {
         this(
             threadPool,
@@ -107,7 +107,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
             indexNameExpressionResolver,
             indexingPressure,
             systemIndices,
-            threadPool::relativeTimeInNanos
+            threadPool::relativeTimeInNanos,
+            failureStoreMetrics
         );
     }
 
@@ -122,7 +123,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         IndexNameExpressionResolver indexNameExpressionResolver,
         IndexingPressure indexingPressure,
         SystemIndices systemIndices,
-        LongSupplier relativeTimeProvider
+        LongSupplier relativeTimeProvider,
+        FailureStoreMetrics failureStoreMetrics
     ) {
         this(
             TYPE,
@@ -137,7 +139,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
             indexNameExpressionResolver,
             indexingPressure,
             systemIndices,
-            relativeTimeProvider
+            relativeTimeProvider,
+            failureStoreMetrics
         );
     }
 
@@ -154,7 +157,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         IndexNameExpressionResolver indexNameExpressionResolver,
         IndexingPressure indexingPressure,
         SystemIndices systemIndices,
-        LongSupplier relativeTimeProvider
+        LongSupplier relativeTimeProvider,
+        FailureStoreMetrics failureStoreMetrics
     ) {
         super(
             bulkAction,
@@ -173,6 +177,7 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         this.client = client;
         this.indexNameExpressionResolver = indexNameExpressionResolver;
         this.rolloverClient = new OriginSettingClient(client, LAZY_ROLLOVER_ORIGIN);
+        this.failureStoreMetrics = failureStoreMetrics;
     }
 
     public static <Response extends ReplicationResponse & WriteResponse> ActionListener<BulkResponse> unwrappingSingleItemBulkResponse(
@@ -199,6 +204,8 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         ActionListener<BulkResponse> listener,
         long relativeStartTimeNanos
     ) {
+        trackIndexRequests(bulkRequest);
+
         Map<String, CreateIndexRequest> indicesToAutoCreate = new HashMap<>();
         Set<String> dataStreamsToBeRolledOver = new HashSet<>();
         Set<String> failureStoresToBeRolledOver = new HashSet<>();
@@ -216,6 +223,27 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
         );
     }
 
+    /**
+     * Track the number of index requests in our APM metrics. We'll track almost all docs here (pipeline or no pipeline,
+     * failure store or original), but some docs don't reach this place (dropped and rejected docs), so we increment for those docs in
+     * different places.
+     */
+    private void trackIndexRequests(BulkRequest bulkRequest) {
+        final Metadata metadata = clusterService.state().metadata();
+        for (DocWriteRequest<?> request : bulkRequest.requests) {
+            if (request instanceof IndexRequest == false) {
+                continue;
+            }
+            String resolvedIndexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
+            IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(resolvedIndexName);
+            DataStream dataStream = DataStream.resolveDataStream(indexAbstraction, metadata);
+            // We only track index requests into data streams.
+            if (dataStream != null) {
+                failureStoreMetrics.incrementTotal(dataStream.getName());
+            }
+        }
+    }
+
     /**
      * Determine all the targets (i.e. indices, data streams, failure stores) that require an action before we can proceed with the bulk
      * request. Indices might need to be created, and data streams and failure stores might need to be rolled over when they're marked
@@ -535,29 +563,29 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
             indexNameExpressionResolver,
             relativeTimeNanosProvider,
             startTimeNanos,
-            listener
+            listener,
+            failureStoreMetrics
         ).run();
     }
 
     /**
-     * Determines if an index name is associated with either an existing data stream or a template
-     * for one that has the failure store enabled.
-     * @param indexName The index name to check.
-     * @param metadata Cluster state metadata.
-     * @param epochMillis A timestamp to use when resolving date math in the index name.
-     * @return true if the given index name corresponds to a data stream with a failure store,
-     * or if it matches a template that has a data stream failure store enabled.
+     * See {@link #resolveFailureStore(String, Metadata, long)}
      */
-    static boolean shouldStoreFailureInternal(String indexName, Metadata metadata, long epochMillis) {
-        return DataStream.isFailureStoreFeatureFlagEnabled()
-            && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or(
-                () -> resolveFailureStoreFromTemplate(indexName, metadata)
-            ).orElse(false);
+    // Visibility for testing
+    static Boolean resolveFailureInternal(String indexName, Metadata metadata, long epochMillis) {
+        if (DataStream.isFailureStoreFeatureFlagEnabled() == false) {
+            return null;
+        }
+        var resolution = resolveFailureStoreFromMetadata(indexName, metadata, epochMillis);
+        if (resolution != null) {
+            return resolution;
+        }
+        return resolveFailureStoreFromTemplate(indexName, metadata);
     }
 
     @Override
-    protected boolean shouldStoreFailure(String indexName, Metadata metadata, long time) {
-        return shouldStoreFailureInternal(indexName, metadata, time);
+    protected Boolean resolveFailureStore(String indexName, Metadata metadata, long time) {
+        return resolveFailureInternal(indexName, metadata, time);
     }
 
     /**
@@ -567,30 +595,24 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
      * @param epochMillis A timestamp to use when resolving date math in the index name.
      * @return true if the given index name corresponds to an existing data stream with a failure store enabled.
      */
-    private static Optional<Boolean> resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) {
+    private static Boolean resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) {
         if (indexName == null) {
-            return Optional.empty();
+            return null;
         }
 
         // Get index abstraction, resolving date math if it exists
         IndexAbstraction indexAbstraction = metadata.getIndicesLookup()
             .get(IndexNameExpressionResolver.resolveDateMathExpression(indexName, epochMillis));
-
-        // We only store failures if the failure is being written to a data stream,
-        // not when directly writing to backing indices/failure stores
         if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) {
-            return Optional.empty();
+            return null;
         }
 
-        // Locate the write index for the abstraction, and check if it has a data stream associated with it.
-        // This handles alias resolution as well as data stream resolution.
-        Index writeIndex = indexAbstraction.getWriteIndex();
-        assert writeIndex != null : "Could not resolve write index for resource [" + indexName + "]";
-        IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName());
-        DataStream targetDataStream = writeAbstraction.getParentDataStream();
+        // We only store failures if the failure is being written to a data stream,
+        // not when directly writing to backing indices/failure stores
+        DataStream targetDataStream = DataStream.resolveDataStream(indexAbstraction, metadata);
 
         // We will store the failure if the write target belongs to a data stream with a failure store.
-        return Optional.of(targetDataStream != null && targetDataStream.isFailureStoreEnabled());
+        return targetDataStream != null && targetDataStream.isFailureStoreEnabled();
     }
 
     /**
@@ -599,9 +621,9 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
      * @param metadata Cluster state metadata.
      * @return true if the given index name corresponds to an index template with a data stream failure store enabled.
      */
-    private static Optional<Boolean> resolveFailureStoreFromTemplate(String indexName, Metadata metadata) {
+    private static Boolean resolveFailureStoreFromTemplate(String indexName, Metadata metadata) {
         if (indexName == null) {
-            return Optional.empty();
+            return null;
         }
 
         // Check to see if the index name matches any templates such that an index would have been attributed
@@ -612,11 +634,11 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
             ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(template);
             if (composableIndexTemplate.getDataStreamTemplate() != null) {
                 // Check if the data stream has the failure store enabled
-                return Optional.of(composableIndexTemplate.getDataStreamTemplate().hasFailureStore());
+                return composableIndexTemplate.getDataStreamTemplate().hasFailureStore();
             }
         }
 
         // Could not locate a failure store via template
-        return Optional.empty();
+        return null;
     }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

@@ -166,8 +166,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
     }
 
     @Override
-    protected boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) {
+    protected Boolean resolveFailureStore(String indexName, Metadata metadata, long epochMillis) {
         // A simulate bulk request should not change any persistent state in the system, so we never write to the failure store
-        return false;
+        return null;
     }
 }

+ 19 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

@@ -1376,6 +1376,25 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
         }
     }
 
+    /**
+     * Resolve the index abstraction to a data stream. This handles alias resolution as well as data stream resolution. This does <b>NOT</b>
+     * resolve a data stream by providing a concrete backing index.
+     */
+    public static DataStream resolveDataStream(IndexAbstraction indexAbstraction, Metadata metadata) {
+        // We do not consider concrete indices - only data streams and data stream aliases.
+        if (indexAbstraction == null || indexAbstraction.isDataStreamRelated() == false) {
+            return null;
+        }
+
+        // Locate the write index for the abstraction, and check if it has a data stream associated with it.
+        Index writeIndex = indexAbstraction.getWriteIndex();
+        if (writeIndex == null) {
+            return null;
+        }
+        IndexAbstraction writeAbstraction = metadata.getIndicesLookup().get(writeIndex.getName());
+        return writeAbstraction.getParentDataStream();
+    }
+
     /**
      * Modifies the passed Instant object to be used as a bound for a timestamp field in TimeSeries. It needs to be called in both backing
      * index construction (rollover) and index selection for doc insertion. Failure to do so may lead to errors due to document timestamps

+ 41 - 7
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -11,6 +11,7 @@ package org.elasticsearch.ingest;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.util.Strings;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ResourceNotFoundException;
@@ -18,6 +19,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.ingest.DeletePipelineRequest;
@@ -88,6 +90,7 @@ import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.IntConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -117,6 +120,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     private volatile Map<String, PipelineHolder> pipelines = Map.of();
     private final ThreadPool threadPool;
     private final IngestMetric totalMetrics = new IngestMetric();
+    private final FailureStoreMetrics failureStoreMetrics;
     private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
     private volatile ClusterState state;
 
@@ -190,7 +194,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         List<IngestPlugin> ingestPlugins,
         Client client,
         MatcherWatchdog matcherWatchdog,
-        DocumentParsingProvider documentParsingProvider
+        DocumentParsingProvider documentParsingProvider,
+        FailureStoreMetrics failureStoreMetrics
     ) {
         this.clusterService = clusterService;
         this.scriptService = scriptService;
@@ -212,6 +217,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         );
         this.threadPool = threadPool;
         this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR);
+        this.failureStoreMetrics = failureStoreMetrics;
     }
 
     /**
@@ -228,6 +234,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         this.taskQueue = ingestService.taskQueue;
         this.pipelines = ingestService.pipelines;
         this.state = ingestService.state;
+        this.failureStoreMetrics = ingestService.failureStoreMetrics;
     }
 
     private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
@@ -691,7 +698,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
      * @param actionRequests The collection of requests to be processed.
      * @param onDropped A callback executed when a document is dropped by a pipeline.
      *                  Accepts the slot in the collection of requests that the document occupies.
-     * @param shouldStoreFailure A predicate executed on each ingest failure to determine if the
+     * @param resolveFailureStore A function executed on each ingest failure to determine if the
      *                           failure should be stored somewhere.
      * @param onStoreFailure A callback executed when a document fails ingest but the failure should
      *                       be persisted elsewhere. Accepts the slot in the collection of requests
@@ -709,7 +716,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         final int numberOfActionRequests,
         final Iterable<DocWriteRequest<?>> actionRequests,
         final IntConsumer onDropped,
-        final Predicate<String> shouldStoreFailure,
+        final Function<String, Boolean> resolveFailureStore,
         final TriConsumer<Integer, String, Exception> onStoreFailure,
         final BiConsumer<Integer, Exception> onFailure,
         final BiConsumer<Thread, Exception> onCompletion,
@@ -794,7 +801,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                             }
                         );
 
-                        executePipelines(pipelines, indexRequest, ingestDocument, shouldStoreFailure, documentListener);
+                        executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener);
                         indexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes());
                         assert actionRequest.index() != null;
 
@@ -885,7 +892,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         final PipelineIterator pipelines,
         final IndexRequest indexRequest,
         final IngestDocument ingestDocument,
-        final Predicate<String> shouldStoreFailure,
+        final Function<String, Boolean> resolveFailureStore,
         final ActionListener<IngestPipelinesExecutionResult> listener
     ) {
         assert pipelines.hasNext();
@@ -898,9 +905,22 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         ingestDocument.resetReroute();
         final String originalIndex = indexRequest.indices()[0];
         final Consumer<Exception> exceptionHandler = (Exception e) -> {
-            if (shouldStoreFailure.test(originalIndex)) {
+            String errorType = ElasticsearchException.getExceptionName(ExceptionsHelper.unwrapCause(e));
+            // If `failureStoreResolution` is true, we store the failure. If it's false, the target is a data stream,
+            // but it doesn't have the failure store enabled. If it's null, the target wasn't a data stream.
+            Boolean failureStoreResolution = resolveFailureStore.apply(originalIndex);
+            if (failureStoreResolution != null && failureStoreResolution) {
+                failureStoreMetrics.incrementFailureStore(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE);
                 listener.onResponse(IngestPipelinesExecutionResult.failAndStoreFor(originalIndex, e));
             } else {
+                if (failureStoreResolution != null) {
+                    // If this document targeted a data stream that didn't have the failure store enabled, we increment
+                    // the rejected counter.
+                    // We also increment the total counter because this request will not reach the code that increments
+                    // the total counter for non-rejected documents.
+                    failureStoreMetrics.incrementTotal(originalIndex);
+                    failureStoreMetrics.incrementRejected(originalIndex, errorType, FailureStoreMetrics.ErrorLocation.PIPELINE, false);
+                }
                 listener.onFailure(e);
             }
         };
@@ -928,6 +948,20 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 }
 
                 if (keep == false) {
+                    // We only increment the total counter for dropped docs here, because these docs don't reach the code
+                    // that ordinarily take care of that.
+                    // We reuse `resolveFailureStore` here to determine whether the index request targets a data stream,
+                    // because we only want to track these metrics for data streams.
+                    Boolean failureStoreResolution = resolveFailureStore.apply(originalIndex);
+                    if (failureStoreResolution != null) {
+                        // Get index abstraction, resolving date math if it exists
+                        IndexAbstraction indexAbstraction = state.metadata()
+                            .getIndicesLookup()
+                            .get(IndexNameExpressionResolver.resolveDateMathExpression(originalIndex, threadPool.absoluteTimeInMillis()));
+                        DataStream dataStream = DataStream.resolveDataStream(indexAbstraction, state.metadata());
+                        String dataStreamName = dataStream != null ? dataStream.getName() : originalIndex;
+                        failureStoreMetrics.incrementTotal(dataStreamName);
+                    }
                     listener.onResponse(IngestPipelinesExecutionResult.DISCARD_RESULT);
                     return; // document dropped!
                 }
@@ -1019,7 +1053,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 }
 
                 if (newPipelines.hasNext()) {
-                    executePipelines(newPipelines, indexRequest, ingestDocument, shouldStoreFailure, listener);
+                    executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener);
                 } else {
                     // update the index request's source and (potentially) cache the timestamp for TSDB
                     updateIndexRequestSource(indexRequest, ingestDocument);

+ 5 - 1
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction;
 import org.elasticsearch.action.admin.indices.template.reservedstate.ReservedComposableIndexTemplateAction;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
 import org.elasticsearch.action.ingest.ReservedPipelineAction;
 import org.elasticsearch.action.search.SearchExecutionStatsCollector;
@@ -659,6 +660,7 @@ class NodeConstruction {
 
         modules.bindToInstance(DocumentParsingProvider.class, documentParsingProvider);
 
+        FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry());
         final IngestService ingestService = new IngestService(
             clusterService,
             threadPool,
@@ -668,7 +670,8 @@ class NodeConstruction {
             pluginsService.filterPlugins(IngestPlugin.class).toList(),
             client,
             IngestService.createGrokThreadWatchdog(environment, threadPool),
-            documentParsingProvider
+            documentParsingProvider,
+            failureStoreMetrics
         );
 
         SystemIndices systemIndices = createSystemIndices(settings);
@@ -1154,6 +1157,7 @@ class NodeConstruction {
             b.bind(FileSettingsService.class).toInstance(fileSettingsService);
             b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
             b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService);
+            b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
         });
 
         if (ReadinessService.enabled(environment)) {

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java

@@ -1164,7 +1164,8 @@ public class BulkOperationTests extends ESTestCase {
             timeZero,
             listener,
             observer,
-            failureStoreDocumentConverter
+            failureStoreDocumentConverter,
+            FailureStoreMetrics.NOOP
         );
     }
 

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

@@ -130,7 +130,8 @@ public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCa
             mock(ActionFilters.class),
             indexNameExpressionResolver,
             new IndexingPressure(Settings.EMPTY),
-            EmptySystemIndices.INSTANCE
+            EmptySystemIndices.INSTANCE,
+            FailureStoreMetrics.NOOP
         ) {
             @Override
             void executeBulk(

+ 8 - 6
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

@@ -69,7 +69,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
-import java.util.function.Predicate;
+import java.util.function.Function;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.sameInstance;
@@ -110,7 +110,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
 
     /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
     @Captor
-    ArgumentCaptor<Predicate<String>> redirectPredicate;
+    ArgumentCaptor<Function<String, Boolean>> redirectPredicate;
     @Captor
     ArgumentCaptor<TriConsumer<Integer, String, Exception>> redirectHandler;
     @Captor
@@ -155,7 +155,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
                 new ActionFilters(Collections.emptySet()),
                 TestIndexNameExpressionResolver.newInstance(),
                 new IndexingPressure(SETTINGS),
-                EmptySystemIndices.INSTANCE
+                EmptySystemIndices.INSTANCE,
+                FailureStoreMetrics.NOOP
             );
         }
 
@@ -410,9 +411,10 @@ public class TransportBulkActionIngestTests extends ESTestCase {
         Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
         failureHandler.getValue().accept(0, exception); // have an exception for our one index request
         indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
-        assertTrue(redirectPredicate.getValue().test(WITH_FAILURE_STORE_ENABLED + "-1")); // ensure redirects on failure store data stream
-        assertFalse(redirectPredicate.getValue().test(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices
-        assertFalse(redirectPredicate.getValue().test("index")); // no redirects for non-existant indices with no templates
+        // ensure redirects on failure store data stream
+        assertTrue(redirectPredicate.getValue().apply(WITH_FAILURE_STORE_ENABLED + "-1"));
+        assertNull(redirectPredicate.getValue().apply(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices
+        assertNull(redirectPredicate.getValue().apply("index")); // no redirects for non-existent indices with no templates
         redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2)
         completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed
         assertTrue(action.isExecuted);

+ 13 - 8
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

@@ -71,6 +71,7 @@ import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assume.assumeThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -103,7 +104,8 @@ public class TransportBulkActionTests extends ESTestCase {
                 new ActionFilters(Collections.emptySet()),
                 new Resolver(),
                 new IndexingPressure(Settings.EMPTY),
-                EmptySystemIndices.INSTANCE
+                EmptySystemIndices.INSTANCE,
+                FailureStoreMetrics.NOOP
             );
         }
 
@@ -417,13 +419,16 @@ public class TransportBulkActionTests extends ESTestCase {
             .build();
 
         // Data stream with failure store should store failures
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(dataStreamWithFailureStore, metadata, testTime), is(true));
+        assertThat(TransportBulkAction.resolveFailureInternal(dataStreamWithFailureStore, metadata, testTime), is(true));
         // Data stream without failure store should not
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(dataStreamWithoutFailureStore, metadata, testTime), is(false));
+        assertThat(TransportBulkAction.resolveFailureInternal(dataStreamWithoutFailureStore, metadata, testTime), is(false));
         // An index should not be considered for failure storage
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(backingIndex1.getIndex().getName(), metadata, testTime), is(false));
+        assertThat(TransportBulkAction.resolveFailureInternal(backingIndex1.getIndex().getName(), metadata, testTime), is(nullValue()));
         // even if that index is itself a failure store
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(failureStoreIndex1.getIndex().getName(), metadata, testTime), is(false));
+        assertThat(
+            TransportBulkAction.resolveFailureInternal(failureStoreIndex1.getIndex().getName(), metadata, testTime),
+            is(nullValue())
+        );
     }
 
     public void testResolveFailureStoreFromTemplate() throws Exception {
@@ -454,11 +459,11 @@ public class TransportBulkActionTests extends ESTestCase {
             .build();
 
         // Data stream with failure store should store failures
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(dsTemplateWithFailureStore + "-1", metadata, testTime), is(true));
+        assertThat(TransportBulkAction.resolveFailureInternal(dsTemplateWithFailureStore + "-1", metadata, testTime), is(true));
         // Data stream without failure store should not
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(dsTemplateWithoutFailureStore + "-1", metadata, testTime), is(false));
+        assertThat(TransportBulkAction.resolveFailureInternal(dsTemplateWithoutFailureStore + "-1", metadata, testTime), is(false));
         // An index template should not be considered for failure storage
-        assertThat(TransportBulkAction.shouldStoreFailureInternal(indexTemplate + "-1", metadata, testTime), is(false));
+        assertThat(TransportBulkAction.resolveFailureInternal(indexTemplate + "-1", metadata, testTime), is(nullValue()));
     }
 
     private BulkRequest buildBulkRequest(List<String> indices) {

+ 2 - 1
server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java

@@ -254,7 +254,8 @@ public class TransportBulkActionTookTests extends ESTestCase {
                 indexNameExpressionResolver,
                 new IndexingPressure(Settings.EMPTY),
                 EmptySystemIndices.INSTANCE,
-                relativeTimeProvider
+                relativeTimeProvider,
+                FailureStoreMetrics.NOOP
             );
         }
     }

+ 3 - 1
server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.Build;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
@@ -90,7 +91,8 @@ public class ReservedPipelineActionTests extends ESTestCase {
             Collections.singletonList(DUMMY_PLUGIN),
             client,
             null,
-            DocumentParsingProvider.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
         );
         Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
         assertTrue(factories.containsKey("set"));

+ 20 - 13
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
@@ -88,9 +89,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.IntConsumer;
 import java.util.function.LongSupplier;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils.executeAndAssertSuccessful;
@@ -152,7 +153,8 @@ public class IngestServiceTests extends ESTestCase {
             List.of(DUMMY_PLUGIN),
             client,
             null,
-            DocumentParsingProvider.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
         );
         Map<String, Processor.Factory> factories = ingestService.getProcessorFactories();
         assertTrue(factories.containsKey("foo"));
@@ -172,7 +174,8 @@ public class IngestServiceTests extends ESTestCase {
                 List.of(DUMMY_PLUGIN, DUMMY_PLUGIN),
                 client,
                 null,
-                DocumentParsingProvider.EMPTY_INSTANCE
+                DocumentParsingProvider.EMPTY_INSTANCE,
+                FailureStoreMetrics.NOOP
             )
         );
         assertTrue(e.getMessage(), e.getMessage().contains("already registered"));
@@ -189,7 +192,8 @@ public class IngestServiceTests extends ESTestCase {
             List.of(DUMMY_PLUGIN),
             client,
             null,
-            DocumentParsingProvider.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
         );
         final IndexRequest indexRequest = new IndexRequest("_index").id("_id")
             .source(Map.of())
@@ -1665,7 +1669,7 @@ public class IngestServiceTests extends ESTestCase {
             .setFinalPipeline("_id2");
         doThrow(new RuntimeException()).when(processor)
             .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any());
-        final Predicate<String> redirectCheck = (idx) -> indexRequest.index().equals(idx);
+        final Function<String, Boolean> redirectCheck = (idx) -> indexRequest.index().equals(idx);
         @SuppressWarnings("unchecked")
         final TriConsumer<Integer, String, Exception> redirectHandler = mock(TriConsumer.class);
         @SuppressWarnings("unchecked")
@@ -1722,7 +1726,7 @@ public class IngestServiceTests extends ESTestCase {
             .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any());
         doThrow(new RuntimeException()).when(processor)
             .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Map.of()), any());
-        final Predicate<String> redirectPredicate = (idx) -> indexRequest.index().equals(idx);
+        final Function<String, Boolean> redirectCheck = (idx) -> indexRequest.index().equals(idx);
         @SuppressWarnings("unchecked")
         final TriConsumer<Integer, String, Exception> redirectHandler = mock(TriConsumer.class);
         @SuppressWarnings("unchecked")
@@ -1733,7 +1737,7 @@ public class IngestServiceTests extends ESTestCase {
             1,
             List.of(indexRequest),
             indexReq -> {},
-            redirectPredicate,
+            redirectCheck,
             redirectHandler,
             failureHandler,
             completionHandler,
@@ -1826,9 +1830,9 @@ public class IngestServiceTests extends ESTestCase {
         for (int i = 0; i < numRequest; i++) {
             IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none");
             indexRequest.source(xContentType, "field1", "value1");
-            boolean shouldListExecutedPipelines = randomBoolean();
-            executedPipelinesExpected.add(shouldListExecutedPipelines);
-            indexRequest.setListExecutedPipelines(shouldListExecutedPipelines);
+            boolean shouldListExecutedPiplines = randomBoolean();
+            executedPipelinesExpected.add(shouldListExecutedPiplines);
+            indexRequest.setListExecutedPipelines(shouldListExecutedPiplines);
             bulkRequest.add(indexRequest);
         }
 
@@ -2320,7 +2324,8 @@ public class IngestServiceTests extends ESTestCase {
             List.of(testPlugin),
             client,
             null,
-            DocumentParsingProvider.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
         );
         ingestService.addIngestClusterStateListener(ingestClusterStateListener);
 
@@ -2675,7 +2680,8 @@ public class IngestServiceTests extends ESTestCase {
             List.of(DUMMY_PLUGIN),
             client,
             null,
-            DocumentParsingProvider.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
         );
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState));
 
@@ -2974,7 +2980,8 @@ public class IngestServiceTests extends ESTestCase {
             }),
             client,
             null,
-            documentParsingProvider
+            documentParsingProvider,
+            FailureStoreMetrics.NOOP
         );
         if (randomBoolean()) {
             /*

+ 15 - 2
server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.ingest;
 
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.action.bulk.SimulateBulkRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -115,11 +116,23 @@ public class SimulateIngestServiceTests extends ESTestCase {
         ThreadPool threadPool = mock(ThreadPool.class);
         when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
         when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
-        return new IngestService(mock(ClusterService.class), threadPool, null, null, null, List.of(new IngestPlugin() {
+        var ingestPlugin = new IngestPlugin() {
             @Override
             public Map<String, Processor.Factory> getProcessors(final Processor.Parameters parameters) {
                 return processors;
             }
-        }), client, null, DocumentParsingProvider.EMPTY_INSTANCE);
+        };
+        return new IngestService(
+            mock(ClusterService.class),
+            threadPool,
+            null,
+            null,
+            null,
+            List.of(ingestPlugin),
+            client,
+            null,
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
+        );
     }
 }

+ 5 - 2
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -45,6 +45,7 @@ import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAct
 import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.action.bulk.TransportBulkAction;
 import org.elasticsearch.action.bulk.TransportShardBulkAction;
 import org.elasticsearch.action.index.IndexRequest;
@@ -2395,14 +2396,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
                             Collections.emptyList(),
                             client,
                             null,
-                            DocumentParsingProvider.EMPTY_INSTANCE
+                            DocumentParsingProvider.EMPTY_INSTANCE,
+                            FailureStoreMetrics.NOOP
                         ),
                         mockFeatureService,
                         client,
                         actionFilters,
                         indexNameExpressionResolver,
                         new IndexingPressure(settings),
-                        EmptySystemIndices.INSTANCE
+                        EmptySystemIndices.INSTANCE,
+                        FailureStoreMetrics.NOOP
                     )
                 );
                 final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(

+ 3 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.bulk.FailureStoreMetrics;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -136,7 +137,8 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             Collections.singletonList(SKINNY_INGEST_PLUGIN),
             client,
             null,
-            DocumentParsingProvider.EMPTY_INSTANCE
+            DocumentParsingProvider.EMPTY_INSTANCE,
+            FailureStoreMetrics.NOOP
         );
     }