Browse Source

Fix failure store pipeline-level failure recording issues (#111802)

Adds additional tests and fixes some edge cases related to rerouting documents in ingest and persisting their failures to failure stores.
---------

Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com>
James Baiera 1 year ago
parent
commit
656b5db3d1

+ 430 - 1
modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_failure_store_redirection.yml

@@ -318,4 +318,433 @@ teardown:
         index: .fs-destination-*
   - length:   { hits.hits: 1  }
   - match: { hits.hits.0._index: "/\\.fs-destination-data-stream-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
-  - match: { hits.hits.0._source.document.index: 'destination-data-stream' }
+  - match: { hits.hits.0._source.document.index: 'logs-foobar' }
+
+---
+"Failure redirects to original failure store during index change if self referenced":
+  - requires:
+      cluster_features: [ "gte_v8.15.0" ]
+      reason: "data stream failure stores REST structure changed in 8.15+"
+      test_runner_features: [ allowed_warnings, contains ]
+
+  - do:
+      ingest.put_pipeline:
+        id: "failing_pipeline"
+        body: >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "set": {
+                  "field": "_index",
+                  "value": "logs-elsewhere"
+                }
+              },
+              {
+                "script": {
+                  "source": "ctx.object.data = ctx.object"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              number_of_shards: 1
+              number_of_replicas: 1
+              index:
+                default_pipeline: "failing_pipeline"
+
+  - do:
+      index:
+        index: logs-foobar
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          object:
+            data:
+              field: 'someValue'
+
+  - do:
+      indices.get_data_stream:
+        name: logs-foobar
+  - match: { data_streams.0.name: logs-foobar }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+
+  - do:
+      search:
+        index: logs-foobar
+        body: { query: { match_all: { } } }
+  - length: { hits.hits: 0 }
+
+  - do:
+      search:
+        index: .fs-logs-foobar-*
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
+  - exists: hits.hits.0._source.@timestamp
+  - not_exists: hits.hits.0._source.foo
+  - not_exists: hits.hits.0._source.document.id
+  - match: { hits.hits.0._source.document.index: 'logs-foobar' }
+  - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
+  - match: { hits.hits.0._source.document.source.object.data.field: 'someValue' }
+  - match: { hits.hits.0._source.error.type: 'illegal_argument_exception' }
+  - contains: { hits.hits.0._source.error.message: 'Failed to generate the source document for ingest pipeline' }
+  - contains: { hits.hits.0._source.error.stack_trace: 'Failed to generate the source document for ingest pipeline' }
+  - match: { hits.hits.0._source.error.pipeline_trace.0: 'failing_pipeline' }
+  - match: { hits.hits.0._source.error.pipeline: 'failing_pipeline' }
+
+  - do:
+      indices.delete_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged
+
+  - do:
+      indices.delete:
+        index: .fs-logs-foobar-*
+  - is_true: acknowledged
+
+---
+"Failure redirects to original failure store during index change if final pipeline changes target":
+  - requires:
+      cluster_features: [ "gte_v8.15.0" ]
+      reason: "data stream failure stores REST structure changed in 8.15+"
+      test_runner_features: [ allowed_warnings, contains ]
+
+  - do:
+      ingest.put_pipeline:
+        id: "change_index_pipeline"
+        body: >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "set": {
+                  "field": "_index",
+                  "value": "logs-elsewhere"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              number_of_shards: 1
+              number_of_replicas: 1
+              index:
+                final_pipeline: "change_index_pipeline"
+
+  - do:
+      index:
+        index: logs-foobar
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          foo: bar
+
+  - do:
+      indices.get_data_stream:
+        name: logs-foobar
+  - match: { data_streams.0.name: logs-foobar }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+
+  - do:
+      search:
+        index: logs-foobar
+        body: { query: { match_all: { } } }
+  - length: { hits.hits: 0 }
+
+  - do:
+      search:
+        index: .fs-logs-foobar-*
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
+  - exists: hits.hits.0._source.@timestamp
+  - not_exists: hits.hits.0._source.foo
+  - not_exists: hits.hits.0._source.document.id
+  - match: { hits.hits.0._source.document.index: 'logs-foobar' }
+  - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
+  - match: { hits.hits.0._source.document.source.foo: 'bar' }
+  - match: { hits.hits.0._source.error.type: 'illegal_state_exception' }
+  - contains: { hits.hits.0._source.error.message: "final pipeline [change_index_pipeline] can't change the target index" }
+  - contains: { hits.hits.0._source.error.stack_trace: "final pipeline [change_index_pipeline] can't change the target index" }
+  - match: { hits.hits.0._source.error.pipeline_trace.0: 'change_index_pipeline' }
+  - match: { hits.hits.0._source.error.pipeline: 'change_index_pipeline' }
+
+  - do:
+      indices.delete_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged
+
+  - do:
+      indices.delete:
+        index: .fs-logs-foobar-*
+  - is_true: acknowledged
+
+---
+"Failure redirects to correct failure store when index loop is detected":
+  - requires:
+      cluster_features: [ "gte_v8.15.0" ]
+      reason: "data stream failure stores REST structure changed in 8.15+"
+      test_runner_features: [ allowed_warnings, contains ]
+
+  - do:
+      ingest.put_pipeline:
+        id: "send_to_destination"
+        body: >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "reroute": {
+                  "tag": "reroute-tag-1",
+                  "destination": "destination-data-stream"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "send_back_to_original"
+        body: >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "reroute": {
+                  "tag": "reroute-tag-2",
+                  "destination": "logs-foobar"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              number_of_shards: 1
+              number_of_replicas: 1
+              index:
+                default_pipeline: "send_to_destination"
+
+  - do:
+      allowed_warnings:
+        - "index template [destination_logs_template] has index patterns [destination-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [destination_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: destination_logs_template
+        body:
+          index_patterns: destination-*
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              number_of_shards: 1
+              number_of_replicas: 1
+              index:
+                default_pipeline: "send_back_to_original"
+
+  - do:
+      index:
+        index: logs-foobar
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          foo: bar
+
+
+  - do:
+      indices.get_data_stream:
+        name: destination-data-stream
+  - match: { data_streams.0.name: destination-data-stream }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-destination-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-destination-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+
+  - do:
+      search:
+        index: destination-data-stream
+        body: { query: { match_all: { } } }
+  - length: { hits.hits: 0 }
+
+  - do:
+      search:
+        index: .fs-destination-data-stream-*
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._index: "/\\.fs-destination-data-stream-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
+  - exists: hits.hits.0._source.@timestamp
+  - not_exists: hits.hits.0._source.foo
+  - not_exists: hits.hits.0._source.document.id
+  - match: { hits.hits.0._source.document.index: 'logs-foobar' }
+  - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
+  - match: { hits.hits.0._source.document.source.foo: 'bar' }
+  - match: { hits.hits.0._source.error.type: 'illegal_state_exception' }
+  - contains: { hits.hits.0._source.error.message: 'index cycle detected' }
+  - contains: { hits.hits.0._source.error.stack_trace: 'index cycle detected' }
+  - match: { hits.hits.0._source.error.pipeline_trace.0: 'send_back_to_original' }
+  - match: { hits.hits.0._source.error.pipeline: 'send_back_to_original' }
+
+  - do:
+      indices.delete_data_stream:
+        name: destination-data-stream
+  - is_true: acknowledged
+
+  - do:
+      indices.delete:
+        index: .fs-destination-data-stream-*
+  - is_true: acknowledged
+
+---
+"Failure redirects to correct failure store when pipeline loop is detected":
+  - requires:
+      cluster_features: [ "gte_v8.15.0" ]
+      reason: "data stream failure stores REST structure changed in 8.15+"
+      test_runner_features: [ allowed_warnings, contains ]
+
+  - do:
+      ingest.put_pipeline:
+        id: "step_1"
+        body: >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "pipeline": {
+                  "tag": "step-1",
+                  "name": "step_2"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      ingest.put_pipeline:
+        id: "step_2"
+        body: >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "pipeline": {
+                  "tag": "step-2",
+                  "name": "step_1"
+                }
+              }
+            ]
+          }
+  - match: { acknowledged: true }
+
+  - do:
+      allowed_warnings:
+        - "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
+      indices.put_index_template:
+        name: generic_logs_template
+        body:
+          index_patterns: logs-*
+          data_stream:
+            failure_store: true
+          template:
+            settings:
+              number_of_shards: 1
+              number_of_replicas: 1
+              index:
+                default_pipeline: "step_1"
+
+  - do:
+      index:
+        index: logs-foobar
+        refresh: true
+        body:
+          '@timestamp': '2020-12-12'
+          foo: bar
+
+  - do:
+      indices.get_data_stream:
+        name: logs-foobar
+  - match: { data_streams.0.name: logs-foobar }
+  - match: { data_streams.0.timestamp_field.name: '@timestamp' }
+  - length: { data_streams.0.indices: 1 }
+  - match: { data_streams.0.indices.0.index_name: '/\.ds-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+  - match: { data_streams.0.failure_store.enabled: true }
+  - length: { data_streams.0.failure_store.indices: 1 }
+  - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-logs-foobar-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
+
+  - do:
+      search:
+        index: logs-foobar
+        body: { query: { match_all: { } } }
+  - length: { hits.hits: 0 }
+
+  - do:
+      search:
+        index: .fs-logs-foobar-*
+  - length: { hits.hits: 1 }
+  - match: { hits.hits.0._index: "/\\.fs-logs-foobar-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
+  - exists: hits.hits.0._source.@timestamp
+  - not_exists: hits.hits.0._source.foo
+  - not_exists: hits.hits.0._source.document.id
+  - match: { hits.hits.0._source.document.index: 'logs-foobar' }
+  - match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
+  - match: { hits.hits.0._source.document.source.foo: 'bar' }
+  - match: { hits.hits.0._source.error.type: 'graph_structure_exception' }
+  - contains: { hits.hits.0._source.error.message: 'Cycle detected for pipeline: step_1' }
+  - contains: { hits.hits.0._source.error.stack_trace: 'Cycle detected for pipeline: step_1' }
+  - match: { hits.hits.0._source.error.pipeline_trace.0: 'step_1' }
+  - match: { hits.hits.0._source.error.pipeline_trace.1: 'step_2' }
+  - match: { hits.hits.0._source.error.pipeline: 'step_2' }
+  - match: { hits.hits.0._source.error.processor_tag: 'step-2' }
+  - match: { hits.hits.0._source.error.processor_type: 'pipeline' }
+
+  - do:
+      indices.delete_data_stream:
+        name: logs-foobar
+  - is_true: acknowledged
+
+  - do:
+      indices.delete:
+        index: .fs-logs-foobar-*
+  - is_true: acknowledged

+ 6 - 0
server/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -1927,6 +1927,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
             ResourceAlreadyUploadedException::new,
             181,
             TransportVersions.ADD_RESOURCE_ALREADY_UPLOADED_EXCEPTION
+        ),
+        INGEST_PIPELINE_EXCEPTION(
+            org.elasticsearch.ingest.IngestPipelineException.class,
+            org.elasticsearch.ingest.IngestPipelineException::new,
+            182,
+            TransportVersions.INGEST_PIPELINE_EXCEPTION_ADDED
         );
 
         final Class<? extends ElasticsearchException> exceptionClass;

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -188,6 +188,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_SINGLE_VALUE_QUERY_SOURCE = def(8_718_00_0);
     public static final TransportVersion ESQL_ORIGINAL_INDICES = def(8_719_00_0);
     public static final TransportVersion ML_INFERENCE_EIS_INTEGRATION_ADDED = def(8_720_00_0);
+    public static final TransportVersion INGEST_PIPELINE_EXCEPTION_ADDED = def(8_721_00_0);
     /*
      * STOP! READ THIS FIRST! No, really,
      *        ____ _____ ___  ____  _        ____  _____    _    ____    _____ _   _ ___ ____    _____ ___ ____  ____ _____ _

+ 5 - 9
server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocumentConverter.java

@@ -70,20 +70,14 @@ public class FailureStoreDocumentConverter {
         Supplier<Long> timeSupplier
     ) throws IOException {
         return new IndexRequest().index(targetIndexName)
-            .source(createSource(source, exception, targetIndexName, timeSupplier))
+            .source(createSource(source, exception, timeSupplier))
             .opType(DocWriteRequest.OpType.CREATE)
             .setWriteToFailureStore(true);
     }
 
-    private static XContentBuilder createSource(
-        IndexRequest source,
-        Exception exception,
-        String targetIndexName,
-        Supplier<Long> timeSupplier
-    ) throws IOException {
+    private static XContentBuilder createSource(IndexRequest source, Exception exception, Supplier<Long> timeSupplier) throws IOException {
         Objects.requireNonNull(source, "source must not be null");
         Objects.requireNonNull(exception, "exception must not be null");
-        Objects.requireNonNull(targetIndexName, "targetIndexName must not be null");
         Objects.requireNonNull(timeSupplier, "timeSupplier must not be null");
         Throwable unwrapped = ExceptionsHelper.unwrapCause(exception);
         XContentBuilder builder = JsonXContent.contentBuilder();
@@ -98,7 +92,9 @@ public class FailureStoreDocumentConverter {
                 if (source.routing() != null) {
                     builder.field("routing", source.routing());
                 }
-                builder.field("index", targetIndexName);
+                if (source.index() != null) {
+                    builder.field("index", source.index());
+                }
                 // Unmapped source field
                 builder.startObject("source");
                 {

+ 37 - 0
server/src/main/java/org/elasticsearch/ingest/IngestPipelineException.java

@@ -0,0 +1,37 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchWrapperException;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.ingest.CompoundProcessor.PIPELINE_ORIGIN_EXCEPTION_HEADER;
+
+/**
+ * A dedicated wrapper for exceptions encountered while executing an ingest pipeline. Unlike {@link IngestProcessorException}, this
+ * exception indicates an issue with the overall pipeline execution, either due to mid-process validation problem or other non-processor
+ * level issues with the execution. The wrapper is needed as we currently only unwrap causes for instances of
+ * {@link ElasticsearchWrapperException}.
+ */
+public class IngestPipelineException extends ElasticsearchException implements ElasticsearchWrapperException {
+
+    IngestPipelineException(final String pipeline, final Exception cause) {
+        super(cause);
+        this.addHeader(PIPELINE_ORIGIN_EXCEPTION_HEADER, List.of(pipeline));
+    }
+
+    public IngestPipelineException(final StreamInput in) throws IOException {
+        super(in);
+    }
+
+}

+ 31 - 21
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -944,14 +944,17 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                     // An IllegalArgumentException can be thrown when an ingest processor creates a source map that is self-referencing.
                     // In that case, we catch and wrap the exception, so we can include more details
                     exceptionHandler.accept(
-                        new IllegalArgumentException(
-                            format(
-                                "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]",
-                                pipelineId,
-                                indexRequest.index(),
-                                indexRequest.id()
-                            ),
-                            ex
+                        new IngestPipelineException(
+                            pipelineId,
+                            new IllegalArgumentException(
+                                format(
+                                    "Failed to generate the source document for ingest pipeline [%s] for document [%s/%s]",
+                                    pipelineId,
+                                    indexRequest.index(),
+                                    indexRequest.id()
+                                ),
+                                ex
+                            )
                         )
                     );
                     return; // document failed!
@@ -963,14 +966,18 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                 if (Objects.equals(originalIndex, newIndex) == false) {
                     // final pipelines cannot change the target index (either directly or by way of a reroute)
                     if (isFinalPipeline) {
+                        logger.info("Service stack: [{}]", ingestDocument.getPipelineStack());
                         exceptionHandler.accept(
-                            new IllegalStateException(
-                                format(
-                                    "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]",
-                                    pipelineId,
-                                    originalIndex,
-                                    newIndex,
-                                    indexRequest.id()
+                            new IngestPipelineException(
+                                pipelineId,
+                                new IllegalStateException(
+                                    format(
+                                        "final pipeline [%s] can't change the target index (from [%s] to [%s]) for document [%s]",
+                                        pipelineId,
+                                        originalIndex,
+                                        newIndex,
+                                        indexRequest.id()
+                                    )
                                 )
                             )
                         );
@@ -983,12 +990,15 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         List<String> indexCycle = new ArrayList<>(ingestDocument.getIndexHistory());
                         indexCycle.add(newIndex);
                         exceptionHandler.accept(
-                            new IllegalStateException(
-                                format(
-                                    "index cycle detected while processing pipeline [%s] for document [%s]: %s",
-                                    pipelineId,
-                                    indexRequest.id(),
-                                    indexCycle
+                            new IngestPipelineException(
+                                pipelineId,
+                                new IllegalStateException(
+                                    format(
+                                        "index cycle detected while processing pipeline [%s] for document [%s]: %s",
+                                        pipelineId,
+                                        indexRequest.id(),
+                                        indexCycle
+                                    )
                                 )
                             )
                         );

+ 2 - 0
server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

@@ -67,6 +67,7 @@ import org.elasticsearch.indices.recovery.PeerRecoveryNotFound;
 import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
 import org.elasticsearch.indices.recovery.RecoveryCommitTooNewException;
 import org.elasticsearch.ingest.GraphStructureException;
+import org.elasticsearch.ingest.IngestPipelineException;
 import org.elasticsearch.ingest.IngestProcessorException;
 import org.elasticsearch.persistent.NotPersistentTaskNodeException;
 import org.elasticsearch.persistent.PersistentTaskNodeNotAssignedException;
@@ -834,6 +835,7 @@ public class ExceptionSerializationTests extends ESTestCase {
         ids.put(179, NotPersistentTaskNodeException.class);
         ids.put(180, PersistentTaskNodeNotAssignedException.class);
         ids.put(181, ResourceAlreadyUploadedException.class);
+        ids.put(182, IngestPipelineException.class);
 
         Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
         for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

+ 1 - 1
server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentConverterTests.java

@@ -83,7 +83,7 @@ public class FailureStoreDocumentConverterTests extends ESTestCase {
 
         assertThat(ObjectPath.eval("document.id", convertedRequest.sourceAsMap()), is(equalTo("1")));
         assertThat(ObjectPath.eval("document.routing", convertedRequest.sourceAsMap()), is(equalTo("fake_routing")));
-        assertThat(ObjectPath.eval("document.index", convertedRequest.sourceAsMap()), is(equalTo(targetIndexName)));
+        assertThat(ObjectPath.eval("document.index", convertedRequest.sourceAsMap()), is(equalTo("original_index")));
         assertThat(ObjectPath.eval("document.source.key", convertedRequest.sourceAsMap()), is(equalTo("value")));
 
         assertThat(ObjectPath.eval("error.type", convertedRequest.sourceAsMap()), is(equalTo("exception")));