Browse Source

Introduce dedicated ingest processor exception (#48810)

Today we wrap exceptions that occur while executing an ingest processor
in an ElasticsearchException. Today, in ExceptionsHelper#unwrapCause we
only unwrap causes for exceptions that implement
ElasticsearchWrapperException, which the top-level
ElasticsearchException does not. Ultimately, this means that any
exception that occurs during processor execution does not have its cause
unwrapped, and so its status is blanket treated as a 500. This means
that while executing a bulk request with an ingest pipeline,
document-level failures that occur during a processor will cause the
status for that document to be treated as 500. Since that does not give
the client any indication that they made a mistake, it means some
clients will enter infinite retries, thinking that there is some
server-side problem that merely needs to clear. This commit addresses
this by introducing a dedicated ingest processor exception, so that its
causes can be unwrapped. While we could consider a broader change to
unwrap causes for more than just ElasticsearchWrapperExceptions, that is
a broad change with unclear implications. Since the problem of reporting
500s on client errors is a user-facing bug, we take the conservative
approach for now, and we can revisit the unwrapping in a future change.
Jason Tedor 6 years ago
parent
commit
d4e82d4020

+ 2 - 6
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

@@ -18,12 +18,10 @@
  */
 package org.elasticsearch.ingest.common;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.script.MockScriptEngine;
@@ -109,15 +107,13 @@ public class IngestRestartIT extends ESIntegTestCase {
             .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
             .get();
 
-        ElasticsearchException exception = expectThrows(ElasticsearchException.class,
+        IllegalStateException exception = expectThrows(IllegalStateException.class,
             () -> client().prepareIndex("index").setId("2")
                 .setSource("x", 0)
                 .setPipeline(pipelineIdWithScript)
                 .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                 .get());
-        assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type")));
-        assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown")));
-        assertThat(exception.getRootCause().getMessage(),
+        assertThat(exception.getMessage(),
             equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
                 "[org.elasticsearch.ElasticsearchParseException: Error updating pipeline with id [" + pipelineIdWithScript + "]; " +
                 "org.elasticsearch.ElasticsearchException: java.lang.IllegalArgumentException: cannot execute [inline] scripts; " +

+ 1 - 1
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

@@ -106,5 +106,5 @@ teardown:
       id: 1
       pipeline: "outer"
       body: {}
-- match: { error.root_cause.0.type: "exception" }
+- match: { error.root_cause.0.type: "ingest_processor_exception" }
 - match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }

+ 2 - 3
modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

@@ -339,7 +339,7 @@ teardown:
             ]
           }
   - length: { docs: 2 }
-  - match: { docs.0.error.type: "exception" }
+  - match: { docs.0.error.type: "illegal_argument_exception" }
   - match: { docs.1.doc._source.foo: "BAR" }
   - length: { docs.1.doc._ingest: 1 }
   - is_true: docs.1.doc._ingest.timestamp
@@ -653,8 +653,7 @@ teardown:
         }
 - length: { docs: 1 }
 - length: { docs.0.processor_results: 1 }
-- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
-- match: { docs.0.processor_results.0.error.caused_by.reason: "Cycle detected for pipeline: outer" }
+- match: { docs.0.processor_results.0.error.reason: "Cycle detected for pipeline: outer" }
 
 ---
 "Test verbose simulate with Pipeline Processor with Multiple Pipelines":

+ 6 - 1
server/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -1036,7 +1036,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
                 org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException.class,
                 org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException::new,
                 156,
-                Version.V_7_5_0);
+                Version.V_7_5_0),
+        INGEST_PROCESSOR_EXCEPTION(
+                org.elasticsearch.ingest.IngestProcessorException.class,
+                org.elasticsearch.ingest.IngestProcessorException::new,
+                157,
+                Version.V_7_6_0);
 
         final Class<? extends ElasticsearchException> exceptionClass;
         final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

+ 6 - 5
server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

@@ -143,7 +143,7 @@ public class CompoundProcessor implements Processor {
                 if (ignoreFailure) {
                     innerExecute(currentProcessor + 1, ingestDocument, handler);
                 } else {
-                    ElasticsearchException compoundProcessorException =
+                    IngestProcessorException compoundProcessorException =
                         newCompoundProcessorException(e, processor.getType(), processor.getTag());
                     if (onFailureProcessors.isEmpty()) {
                         handler.accept(null, compoundProcessorException);
@@ -207,12 +207,12 @@ public class CompoundProcessor implements Processor {
         ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
     }
 
-    private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
-        if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) {
-            return (ElasticsearchException) e;
+    private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
+        if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
+            return (IngestProcessorException) e;
         }
 
-        ElasticsearchException exception = new ElasticsearchException(e);
+        IngestProcessorException exception = new IngestProcessorException(e);
 
         if (processorType != null) {
             exception.addHeader("processor_type", processorType);
@@ -223,4 +223,5 @@ public class CompoundProcessor implements Processor {
 
         return exception;
     }
+
 }

+ 42 - 0
server/src/main/java/org/elasticsearch/ingest/IngestProcessorException.java

@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchWrapperException;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
+/**
+ * A dedicated wrapper for exceptions encountered executing an ingest processor. The wrapper is needed as we currently only unwrap causes
+ * for instances of {@link ElasticsearchWrapperException}.
+ */
+public class IngestProcessorException extends ElasticsearchException implements ElasticsearchWrapperException {
+
+    IngestProcessorException(final Exception cause) {
+        super(cause);
+    }
+
+    public IngestProcessorException(final StreamInput in) throws IOException {
+        super(in);
+    }
+
+}

+ 2 - 0
server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

@@ -71,6 +71,7 @@ import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
 import org.elasticsearch.indices.IndexTemplateMissingException;
 import org.elasticsearch.indices.InvalidIndexTemplateException;
 import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
+import org.elasticsearch.ingest.IngestProcessorException;
 import org.elasticsearch.repositories.RepositoryException;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.rest.action.admin.indices.AliasesNotFoundException;
@@ -816,6 +817,7 @@ public class ExceptionSerializationTests extends ESTestCase {
         ids.put(154, RetentionLeaseNotFoundException.class);
         ids.put(155, ShardNotInPrimaryModeException.class);
         ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
+        ids.put(157, IngestProcessorException.class);
 
         Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
         for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

+ 5 - 5
server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

@@ -19,14 +19,14 @@
 
 package org.elasticsearch.action.ingest;
 
-import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ingest.CompoundProcessor;
 import org.elasticsearch.ingest.DropProcessor;
+import org.elasticsearch.ingest.IngestDocument;
+import org.elasticsearch.ingest.IngestProcessorException;
+import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.ingest.RandomDocumentPicks;
 import org.elasticsearch.ingest.TestProcessor;
-import org.elasticsearch.ingest.CompoundProcessor;
-import org.elasticsearch.ingest.IngestDocument;
-import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.junit.After;
@@ -258,7 +258,7 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
         assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
         Exception exception = simulateDocumentBaseResult.getFailure();
-        assertThat(exception, instanceOf(ElasticsearchException.class));
+        assertThat(exception, instanceOf(IngestProcessorException.class));
         assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: processor failed"));
     }
 

+ 1 - 2
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -23,7 +23,6 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.Version;
@@ -823,7 +822,7 @@ public class IngestServiceTests extends ESTestCase {
         @SuppressWarnings("unchecked")
         final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
         ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {});
-        verify(failureHandler, never()).accept(eq(0), any(ElasticsearchException.class));
+        verify(failureHandler, never()).accept(eq(0), any(IngestProcessorException.class));
         verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
     }
 

+ 2 - 3
server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.ingest;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ingest.SimulateProcessorResult;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.script.MockScriptEngine;
@@ -86,7 +85,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         Exception[] holder = new Exception[1];
         trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
-        assertThat(((ElasticsearchException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
+        assertThat(((IngestProcessorException) holder[0]).getRootCause().getMessage(), equalTo(exception.getMessage()));
 
         SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
         assertThat(testProcessor.getInvokedCounter(), equalTo(1));
@@ -456,7 +455,7 @@ public class TrackingResultProcessorTests extends ESTestCase {
 
         Exception[] holder = new Exception[1];
         trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
-        ElasticsearchException exception = (ElasticsearchException) holder[0];
+        IngestProcessorException exception = (IngestProcessorException) holder[0];
         assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
         assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
     }