Răsfoiți Sursa

Change the ingest simulate api to not include dropped documents (#44161)

If documents are dropped by the `drop` processor then
these documents are returned as a `null` value in the response.

=== Example

Create pipeline:

```
PUT _ingest/pipeline/droppipeline
{
    "processors": [
        {
            "set": {
                "field": "bla",
                "value": "val"
            }
        },
        {
            "drop": {}
        }
    ]
}
```

Simulate request:

POST _ingest/pipeline/droppipeline/_simulate
{
    "docs": [
        {
            "_source": {
                "message": "text"
            }
        }
    ]
}

Response:

```
{
    "docs": [
        null
    ]
}
```

Response if verbose is enabled:

```
{
    "docs": [
        {
            "processor_results": [
                {
                    "doc": {
                        "_index": "_index",
                        "_type": "_doc",
                        "_id": "_id",
                        "_source": {
                            "message": "text",
                            "bla": "val"
                        },
                        "_ingest": {
                            "timestamp": "2019-07-10T11:07:10.758315Z"
                        }
                    }
                },
                null
            ]
        }
    ]
}
```

Closes #36150

* Abort pipeline simulation in verbose mode when document has been dropped
by drop processor
Martijn van Groningen 6 ani în urmă
părinte
comite
a8c5fc728b

+ 34 - 12
server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResult.java

@@ -19,6 +19,7 @@
 package org.elasticsearch.action.ingest;
 
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -66,12 +67,16 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
     }
 
     public SimulateDocumentBaseResult(IngestDocument ingestDocument) {
-        this.ingestDocument = new WriteableIngestDocument(ingestDocument);
-        failure = null;
+        if (ingestDocument != null) {
+            this.ingestDocument = new WriteableIngestDocument(ingestDocument);
+        } else {
+            this.ingestDocument = null;
+        }
+        this.failure = null;
     }
 
     public SimulateDocumentBaseResult(Exception failure) {
-        ingestDocument = null;
+        this.ingestDocument = null;
         this.failure = failure;
     }
 
@@ -79,23 +84,35 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
      * Read from a stream.
      */
     public SimulateDocumentBaseResult(StreamInput in) throws IOException {
-        if (in.readBoolean()) {
-            ingestDocument = null;
+        // TODO: s/V_8_0_0/V_7_4_0 when change has been backported
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
             failure = in.readException();
+            ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new);
         } else {
-            ingestDocument = new WriteableIngestDocument(in);
-            failure = null;
+            if (in.readBoolean()) {
+                ingestDocument = null;
+                failure = in.readException();
+            } else {
+                ingestDocument = new WriteableIngestDocument(in);
+                failure = null;
+            }
         }
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        if (failure == null) {
-            out.writeBoolean(false);
-            ingestDocument.writeTo(out);
-        } else {
-            out.writeBoolean(true);
+        // TODO: s/V_8_0_0/V_7_4_0 when change has been backported
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
             out.writeException(failure);
+            out.writeOptionalWriteable(ingestDocument);
+        } else {
+            if (failure == null) {
+                out.writeBoolean(false);
+                ingestDocument.writeTo(out);
+            } else {
+                out.writeBoolean(true);
+                out.writeException(failure);
+            }
         }
     }
 
@@ -112,6 +129,11 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        if (failure == null && ingestDocument == null) {
+            builder.nullValue();
+            return builder;
+        }
+
         builder.startObject();
         if (failure == null) {
             ingestDocument.toXContent(builder, params);

+ 2 - 2
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

@@ -55,8 +55,8 @@ class SimulateExecutionService {
             }
         } else {
             try {
-                pipeline.execute(ingestDocument);
-                return new SimulateDocumentBaseResult(ingestDocument);
+                IngestDocument result = pipeline.execute(ingestDocument);
+                return new SimulateDocumentBaseResult(result);
             } catch (Exception e) {
                 return new SimulateDocumentBaseResult(e);
             }

+ 11 - 11
server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java

@@ -105,28 +105,23 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
         this(processorTag, null, failure);
     }
 
+    public SimulateProcessorResult(String processorTag) {
+        this(processorTag, null, null);
+    }
+
     /**
      * Read from a stream.
      */
     SimulateProcessorResult(StreamInput in) throws IOException {
         this.processorTag = in.readString();
-        if (in.readBoolean()) {
-            this.ingestDocument = new WriteableIngestDocument(in);
-        } else {
-            this.ingestDocument = null;
-        }
+        this.ingestDocument = in.readOptionalWriteable(WriteableIngestDocument::new);
         this.failure = in.readException();
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(processorTag);
-        if (ingestDocument == null) {
-            out.writeBoolean(false);
-        } else {
-            out.writeBoolean(true);
-            ingestDocument.writeTo(out);
-        }
+        out.writeOptionalWriteable(ingestDocument);
         out.writeException(failure);
     }
 
@@ -147,6 +142,11 @@ public class SimulateProcessorResult implements Writeable, ToXContentObject {
 
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        if (processorTag == null && failure == null && ingestDocument == null) {
+            builder.nullValue();
+            return builder;
+        }
+
         builder.startObject();
 
         if (processorTag != null) {

+ 7 - 2
server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

@@ -74,8 +74,13 @@ public final class TrackingResultProcessor implements Processor {
                     verbosePipelineProcessor);
                 ingestDocument.executePipeline(verbosePipeline);
             } else {
-                processor.execute(ingestDocument);
-                processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
+                IngestDocument result = processor.execute(ingestDocument);
+                if (result != null) {
+                    processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
+                } else {
+                    processorResultList.add(new SimulateProcessorResult(processor.getTag()));
+                    return null;
+                }
             }
         } catch (Exception e) {
             if (ignoreFailure) {

+ 54 - 8
server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

@@ -20,14 +20,15 @@
 package org.elasticsearch.action.ingest;
 
 import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.ingest.DropProcessor;
+import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.ingest.RandomDocumentPicks;
 import org.elasticsearch.ingest.TestProcessor;
 import org.elasticsearch.ingest.CompoundProcessor;
 import org.elasticsearch.ingest.IngestDocument;
 import org.elasticsearch.ingest.Pipeline;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.threadpool.TestThreadPool;
 import org.junit.After;
 import org.junit.Before;
 
@@ -38,6 +39,7 @@ import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocumen
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
@@ -45,17 +47,13 @@ public class SimulateExecutionServiceTests extends ESTestCase {
 
     private final Integer version = randomBoolean() ? randomInt() : null;
 
-    private ThreadPool threadPool;
+    private TestThreadPool threadPool;
     private SimulateExecutionService executionService;
     private IngestDocument ingestDocument;
 
     @Before
     public void setup() {
-        threadPool = new ThreadPool(
-                Settings.builder()
-                        .put("node.name", getClass().getName())
-                        .build()
-        );
+        threadPool = new TestThreadPool(SimulateExecutionServiceTests.class.getSimpleName());
         executionService = new SimulateExecutionService(threadPool);
         ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
     }
@@ -213,4 +211,52 @@ public class SimulateExecutionServiceTests extends ESTestCase {
         assertThat(exception, instanceOf(ElasticsearchException.class));
         assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed"));
     }
+
+    public void testDropDocument() {
+        TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
+        Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of());
+        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
+
+        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, false);
+        assertThat(processor1.getInvokedCounter(), equalTo(1));
+        assertThat(actualItemResponse, instanceOf(SimulateDocumentBaseResult.class));
+        SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
+        assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
+        assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
+    }
+
+    public void testDropDocumentVerbose() {
+        TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
+        Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of());
+        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
+
+        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
+        assertThat(processor1.getInvokedCounter(), equalTo(1));
+        assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
+        SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
+        assertThat(verboseResult.getProcessorResults().size(), equalTo(2));
+        assertThat(verboseResult.getProcessorResults().get(0).getIngestDocument(), notNullValue());
+        assertThat(verboseResult.getProcessorResults().get(0).getFailure(), nullValue());
+        assertThat(verboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
+        assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
+    }
+
+    public void testDropDocumentVerboseExtraProcessor() {
+        TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
+        Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, Map.of());
+        TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
+        Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
+
+        SimulateDocumentResult actualItemResponse = executionService.executeDocument(pipeline, ingestDocument, true);
+        assertThat(processor1.getInvokedCounter(), equalTo(1));
+        assertThat(processor3.getInvokedCounter(), equalTo(0));
+        assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
+        SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
+        assertThat(verboseResult.getProcessorResults().size(), equalTo(2));
+        assertThat(verboseResult.getProcessorResults().get(0).getIngestDocument(), notNullValue());
+        assertThat(verboseResult.getProcessorResults().get(0).getFailure(), nullValue());
+        assertThat(verboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
+        assertThat(verboseResult.getProcessorResults().get(1).getFailure(), nullValue());
+    }
+
 }