Переглянути джерело

handle errors when evaluating if conditions in processors (#52543)

Dan Hermann 5 роки тому
батько
коміт
d793036f50

+ 44 - 2
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

@@ -18,11 +18,13 @@
  */
 package org.elasticsearch.ingest.common;
 
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.ingest.IngestStats;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.script.MockScriptEngine;
 import org.elasticsearch.script.MockScriptPlugin;
@@ -31,12 +33,13 @@ import org.elasticsearch.test.InternalTestCluster;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
 // Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
 // ends up being copied into this test.
@@ -56,13 +59,52 @@ public class IngestRestartIT extends ESIntegTestCase {
     public static class CustomScriptPlugin extends MockScriptPlugin {
         @Override
         protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
-            return Collections.singletonMap("my_script", ctx -> {
+            return Map.of("my_script", ctx -> {
                 ctx.put("z", 0);
                 return null;
+            }, "throwing_script", ctx -> {
+                throw new RuntimeException("this script always fails");
             });
         }
     }
 
+    public void testFailureInConditionalProcessor() {
+        internalCluster().ensureAtLeastNumDataNodes(1);
+        internalCluster().startMasterOnlyNode();
+        final String pipelineId = "foo";
+        client().admin().cluster().preparePutPipeline(pipelineId,
+            new BytesArray("{\n" +
+                "  \"processors\" : [\n" +
+                "  {\"set\" : {\"field\": \"any_field\", \"value\": \"any_value\"}},\n" +
+                "  {\"set\" : {" + "" +
+                "    \"if\" : " + "{\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"throwing_script\"}," +
+                "    \"field\": \"any_field2\"," +
+                "    \"value\": \"any_value2\"}" +
+                "  }\n" +
+                "  ]\n" +
+                "}"), XContentType.JSON).get();
+
+        Exception e = expectThrows(
+            Exception.class,
+            () ->
+                client().prepareIndex("index").setId("1")
+                    .setSource("x", 0)
+                    .setPipeline(pipelineId)
+                    .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+                    .get()
+        );
+        assertTrue(e.getMessage().contains("this script always fails"));
+
+        NodesStatsResponse r = client().admin().cluster().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get();
+        int nodeCount = r.getNodes().size();
+        for (int k = 0; k < nodeCount; k++) {
+            List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
+            for (IngestStats.ProcessorStat st : stats) {
+                assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
+            }
+        }
+    }
+
     public void testScriptDisabled() throws Exception {
         String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10);
         String pipelineIdWithScript = pipelineIdWithoutScript + "_script";

+ 9 - 1
server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

@@ -68,7 +68,15 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
 
     @Override
     public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
-        if (evaluate(ingestDocument)) {
+        final boolean matches;
+        try {
+            matches = evaluate(ingestDocument);
+        } catch (Exception e) {
+            handler.accept(null, e);
+            return;
+        }
+
+        if (matches) {
             final long startTimeInNanos = relativeTimeProvider.getAsLong();
             metric.preIngest();
             processor.execute(ingestDocument, (result, e) -> {

+ 4 - 2
server/src/main/java/org/elasticsearch/ingest/Processor.java

@@ -47,12 +47,14 @@ public interface Processor {
      * otherwise just overwrite {@link #execute(IngestDocument)}.
      */
     default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
+        final IngestDocument result;
         try {
-            IngestDocument result = execute(ingestDocument);
-            handler.accept(result, null);
+            result = execute(ingestDocument);
         } catch (Exception e) {
             handler.accept(null, e);
+            return;
         }
+        handler.accept(result, null);
     }
 
     /**