浏览代码

Add ingest processor existence helper method (#45156)

This commit adds a helper method to the ingest service allowing it to
inspect a pipeline by id and verify the existence of a processor in the
pipeline. This work exposed a potential bug in that some processors
contain inner processors that are passed in at instantiation. These
processors needed a common way to expose their inner processors, so the
WrappingProcessor was created in order to expose the inner processor.
Michael Basnight 6 年之前
父节点
当前提交
4ec68ec2e6

+ 4 - 2
modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java

@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+
+import org.elasticsearch.ingest.WrappingProcessor;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.ScriptService;
 
 
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
 import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
@@ -43,7 +45,7 @@ import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;
  *
  *
  * Note that this processor is experimental.
  * Note that this processor is experimental.
  */
  */
-public final class ForEachProcessor extends AbstractProcessor {
+public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {
 
 
     public static final String TYPE = "foreach";
     public static final String TYPE = "foreach";
 
 
@@ -97,7 +99,7 @@ public final class ForEachProcessor extends AbstractProcessor {
         return field;
         return field;
     }
     }
 
 
-    Processor getProcessor() {
+    public Processor getInnerProcessor() {
         return processor;
         return processor;
     }
     }
 
 

+ 2 - 2
modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java

@@ -49,7 +49,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
         ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
         assertThat(forEachProcessor, Matchers.notNullValue());
         assertThat(forEachProcessor, Matchers.notNullValue());
         assertThat(forEachProcessor.getField(), equalTo("_field"));
         assertThat(forEachProcessor.getField(), equalTo("_field"));
-        assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
+        assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor));
         assertFalse(forEachProcessor.isIgnoreMissing());
         assertFalse(forEachProcessor.isIgnoreMissing());
     }
     }
 
 
@@ -66,7 +66,7 @@ public class ForEachProcessorFactoryTests extends ESTestCase {
         ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
         ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
         assertThat(forEachProcessor, Matchers.notNullValue());
         assertThat(forEachProcessor, Matchers.notNullValue());
         assertThat(forEachProcessor.getField(), equalTo("_field"));
         assertThat(forEachProcessor.getField(), equalTo("_field"));
-        assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
+        assertThat(forEachProcessor.getInnerProcessor(), Matchers.sameInstance(processor));
         assertTrue(forEachProcessor.isIgnoreMissing());
         assertTrue(forEachProcessor.isIgnoreMissing());
     }
     }
 
 

+ 2 - 2
server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

@@ -37,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.LongSupplier;
 import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
-public class ConditionalProcessor extends AbstractProcessor {
+public class ConditionalProcessor extends AbstractProcessor implements WrappingProcessor {
 
 
     private static final Map<String, String> DEPRECATIONS =
     private static final Map<String, String> DEPRECATIONS =
             Map.of("_type", "[types removal] Looking up doc types [_type] in scripts is deprecated.");
             Map.of("_type", "[types removal] Looking up doc types [_type] in scripts is deprecated.");
@@ -90,7 +90,7 @@ public class ConditionalProcessor extends AbstractProcessor {
                 new DeprecationMap(ingestDocument.getSourceAndMetadata(), DEPRECATIONS, "conditional-processor")));
                 new DeprecationMap(ingestDocument.getSourceAndMetadata(), DEPRECATIONS, "conditional-processor")));
     }
     }
 
 
-    Processor getProcessor() {
+    public Processor getInnerProcessor() {
         return processor;
         return processor;
     }
     }
 
 

+ 35 - 1
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -388,7 +388,7 @@ public class IngestService implements ClusterStateApplier {
     static String getProcessorName(Processor processor){
     static String getProcessorName(Processor processor){
         // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
         // conditionals are implemented as wrappers around the real processor, so get the real processor for the correct type for the name
         if(processor instanceof ConditionalProcessor){
         if(processor instanceof ConditionalProcessor){
-            processor = ((ConditionalProcessor) processor).getProcessor();
+            processor = ((ConditionalProcessor) processor).getInnerProcessor();
         }
         }
         StringBuilder sb = new StringBuilder(5);
         StringBuilder sb = new StringBuilder(5);
         sb.append(processor.getType());
         sb.append(processor.getType());
@@ -561,6 +561,40 @@ public class IngestService implements ClusterStateApplier {
         }
         }
     }
     }
 
 
+    /**
+     * Determine if a pipeline contains a processor class within it by introspecting all of the processors within the pipeline.
+     * @param pipelineId the pipeline to inspect
+     * @param clazz the Processor class to look for
+     * @return True if the pipeline contains an instance of the Processor class passed in
+     */
+    public boolean hasProcessor(String pipelineId, Class<? extends Processor> clazz) {
+        Pipeline pipeline = getPipeline(pipelineId);
+        if (pipeline == null) {
+            return false;
+        }
+
+        for (Processor processor: pipeline.flattenAllProcessors()) {
+            if (clazz.isAssignableFrom(processor.getClass())) {
+                return true;
+            }
+
+            while (processor instanceof WrappingProcessor) {
+                WrappingProcessor wrappingProcessor = (WrappingProcessor) processor;
+                if (clazz.isAssignableFrom(wrappingProcessor.getInnerProcessor().getClass())) {
+                    return true;
+                }
+                processor = wrappingProcessor.getInnerProcessor();
+                // break in the case of self referencing processors in the event a processor author creates a
+                // wrapping processor that has its inner processor refer to itself.
+                if (wrappingProcessor == processor) {
+                    break;
+                }
+            }
+        }
+
+        return false;
+    }
+
     private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
     private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
         String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
         String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
         String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
         String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";

+ 2 - 2
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -49,8 +49,8 @@ public final class TrackingResultProcessor implements Processor {
                 if (conditionalProcessor.evaluate(ingestDocument) == false) {
                 if (conditionalProcessor.evaluate(ingestDocument) == false) {
                     return ingestDocument;
                     return ingestDocument;
                 }
                 }
-                if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
-                    processor = conditionalProcessor.getProcessor();
+                if (conditionalProcessor.getInnerProcessor() instanceof PipelineProcessor) {
+                    processor = conditionalProcessor.getInnerProcessor();
                 }
                 }
             }
             }
             if (processor instanceof PipelineProcessor) {
             if (processor instanceof PipelineProcessor) {

+ 33 - 0
server/src/main/java/org/elasticsearch/ingest/WrappingProcessor.java

@@ -0,0 +1,33 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest;
+
+/**
+ * A srapping processor is one that encapsulates an inner processor, or a processor that the wrapped processor enacts upon. All processors
+ * that contain an "inner" processor should implement this interface, such that the actual processor can be obtained.
+ */
+public interface WrappingProcessor extends Processor {
+
+    /**
+     * Method for retrieving the inner processor from a wrapped processor.
+     * @return the inner processor
+     */
+    Processor getInnerProcessor();
+}

+ 50 - 0
server/src/test/java/org/elasticsearch/ingest/FakeProcessor.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest;
+
+import java.util.function.Consumer;
+
+class FakeProcessor implements Processor {
+    private String type;
+    private String tag;
+    private Consumer<IngestDocument> executor;
+
+    FakeProcessor(String type, String tag, Consumer<IngestDocument> executor) {
+        this.type = type;
+        this.tag = tag;
+        this.executor = executor;
+    }
+
+    @Override
+    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+        executor.accept(ingestDocument);
+        return ingestDocument;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public String getTag() {
+        return tag;
+    }
+}

+ 93 - 34
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -43,10 +43,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.plugins.IngestPlugin;
 import org.elasticsearch.plugins.IngestPlugin;
+import org.elasticsearch.script.MockScriptEngine;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptModule;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -64,6 +70,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
 
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.emptySet;
@@ -263,6 +270,89 @@ public class IngestServiceTests extends ESTestCase {
         ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
         ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
     }
     }
 
 
+    public void testHasProcessor() throws Exception {
+        IngestService ingestService = createWithProcessors();
+        String id = "_id";
+        Pipeline pipeline = ingestService.getPipeline(id);
+        assertThat(pipeline, nullValue());
+        ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
+
+        PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray(
+            "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," +
+                "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"),
+            XContentType.JSON);
+        ClusterState previousClusterState = clusterState;
+        clusterState = IngestService.innerPut(putRequest, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+        pipeline = ingestService.getPipeline(id);
+        assertThat(pipeline, notNullValue());
+
+        assertTrue(ingestService.hasProcessor(id, Processor.class));
+        assertTrue(ingestService.hasProcessor(id, WrappingProcessorImpl.class));
+        assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class));
+        assertTrue(ingestService.hasProcessor(id, FakeProcessor.class));
+
+        assertFalse(ingestService.hasProcessor(id, ConditionalProcessor.class));
+    }
+
+    public void testHasProcessorComplexConditional() throws Exception {
+        LongSupplier relativeTimeProvider = mock(LongSupplier.class);
+        String scriptName = "conditionalScript";
+        ScriptService scriptService = new ScriptService(Settings.builder().build(),
+            Collections.singletonMap(
+                Script.DEFAULT_SCRIPT_LANG,
+                new MockScriptEngine(
+                    Script.DEFAULT_SCRIPT_LANG,
+                    Collections.singletonMap(
+                        scriptName, ctx -> {
+                            ctx.get("_type");
+                            return true;
+                        }
+                    ),
+                    Collections.emptyMap()
+                )
+            ),
+            new HashMap<>(ScriptModule.CORE_CONTEXTS)
+        );
+
+        Map<String, Processor.Factory> processors = new HashMap<>();
+        processors.put("complexSet", (factories, tag, config) -> {
+            String field = (String) config.remove("field");
+            String value = (String) config.remove("value");
+
+            return new ConditionalProcessor(randomAlphaOfLength(10),
+                new Script(
+                    ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
+                    scriptName, Collections.emptyMap()), scriptService,
+                new ConditionalProcessor(randomAlphaOfLength(10) + "-nested",
+                    new Script(
+                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
+                        scriptName, Collections.emptyMap()), scriptService,
+                    new FakeProcessor("complexSet", tag, (ingestDocument) -> ingestDocument.setFieldValue(field, value))));
+        });
+
+        IngestService ingestService = createWithProcessors(processors);
+        String id = "_id";
+        Pipeline pipeline = ingestService.getPipeline(id);
+        assertThat(pipeline, nullValue());
+        ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
+
+        PutPipelineRequest putRequest = new PutPipelineRequest(id,
+            new BytesArray("{\"processors\": [{\"complexSet\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON);
+        ClusterState previousClusterState = clusterState;
+        clusterState = IngestService.innerPut(putRequest, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+        pipeline = ingestService.getPipeline(id);
+        assertThat(pipeline, notNullValue());
+
+        assertTrue(ingestService.hasProcessor(id, Processor.class));
+        assertTrue(ingestService.hasProcessor(id, WrappingProcessor.class));
+        assertTrue(ingestService.hasProcessor(id, FakeProcessor.class));
+        assertTrue(ingestService.hasProcessor(id, ConditionalProcessor.class));
+
+        assertFalse(ingestService.hasProcessor(id, WrappingProcessorImpl.class));
+    }
+
     public void testCrud() throws Exception {
     public void testCrud() throws Exception {
         IngestService ingestService = createWithProcessors();
         IngestService ingestService = createWithProcessors();
         String id = "_id";
         String id = "_id";
@@ -946,7 +1036,7 @@ public class IngestServiceTests extends ESTestCase {
         assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
         assertThat(IngestService.getProcessorName(processor), equalTo(name + ":" + tag));
 
 
         ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
         ConditionalProcessor conditionalProcessor = mock(ConditionalProcessor.class);
-        when(conditionalProcessor.getProcessor()).thenReturn(processor);
+        when(conditionalProcessor.getInnerProcessor()).thenReturn(processor);
         assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
         assertThat(IngestService.getProcessorName(conditionalProcessor), equalTo(name + ":" + tag));
 
 
         PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
         PipelineProcessor pipelineProcessor = mock(PipelineProcessor.class);
@@ -1012,42 +1102,11 @@ public class IngestServiceTests extends ESTestCase {
         processors.put("set", (factories, tag, config) -> {
         processors.put("set", (factories, tag, config) -> {
             String field = (String) config.remove("field");
             String field = (String) config.remove("field");
             String value = (String) config.remove("value");
             String value = (String) config.remove("value");
-            return new Processor() {
-                @Override
-                public IngestDocument execute(IngestDocument ingestDocument) {
-                    ingestDocument.setFieldValue(field, value);
-                    return ingestDocument;
-                }
-
-                @Override
-                public String getType() {
-                    return "set";
-                }
-
-                @Override
-                public String getTag() {
-                    return tag;
-                }
-            };
+            return new FakeProcessor("set", tag, (ingestDocument) ->ingestDocument.setFieldValue(field, value));
         });
         });
         processors.put("remove", (factories, tag, config) -> {
         processors.put("remove", (factories, tag, config) -> {
             String field = (String) config.remove("field");
             String field = (String) config.remove("field");
-            return new Processor() {
-                @Override
-                public IngestDocument execute(IngestDocument ingestDocument) {
-                    ingestDocument.removeField(field);
-                    return ingestDocument;
-                }
-
-                @Override
-                public String getType() {
-                    return "remove";
-                }
-
-                @Override
-                public String getTag() {
-                    return tag;
-                }
+            return new WrappingProcessorImpl("remove", tag, (ingestDocument -> ingestDocument.removeField(field))) {
             };
             };
         });
         });
         return createWithProcessors(processors);
         return createWithProcessors(processors);

+ 51 - 0
server/src/test/java/org/elasticsearch/ingest/WrappingProcessorImpl.java

@@ -0,0 +1,51 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest;
+
+import java.util.function.Consumer;
+
+class WrappingProcessorImpl extends FakeProcessor implements WrappingProcessor {
+
+    WrappingProcessorImpl(String type, String tag, Consumer<IngestDocument> executor) {
+        super(type, tag, executor);
+    }
+
+    @Override
+    public Processor getInnerProcessor() {
+        String theType = getType();
+        String theTag = getTag();
+        return new Processor() {
+            @Override
+            public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
+                return ingestDocument;
+            }
+
+            @Override
+            public String getType() {
+                return theType;
+            }
+
+            @Override
+            public String getTag() {
+                return theTag;
+            }
+        };
+    }
+}