Browse Source

Add an IngestService stats test (#93120)

Joe Gallo 2 years ago
parent
commit
21c8a56a6e

+ 3 - 5
server/src/internalClusterTest/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java → server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java

@@ -5,13 +5,14 @@
  * in compliance with, at your election, the Elastic License 2.0 or the Server
  * Side Public License, v 1.
  */
-package org.elasticsearch.action.ingest;
+package org.elasticsearch.ingest;
 
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -21,9 +22,6 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.NodeEnvironment;
-import org.elasticsearch.ingest.AbstractProcessor;
-import org.elasticsearch.ingest.IngestDocument;
-import org.elasticsearch.ingest.Processor;
 import org.elasticsearch.plugins.IngestPlugin;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.repositories.RepositoriesService;
@@ -51,7 +49,7 @@ import static org.hamcrest.Matchers.equalTo;
  * executes asynchronously. The result of the operation should be the same and also the order in which the
  * bulk responses are returned should be the same as how the corresponding index requests were defined.
  */
-public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
+public class IngestAsyncProcessorIT extends ESSingleNodeTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> getPlugins() {

+ 108 - 0
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -56,6 +56,7 @@ import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptModule;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.script.TemplateScript;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.MockLogAppender;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -1389,6 +1390,113 @@ public class IngestServiceTests extends ESTestCase {
         }
     }
 
+    public void testIngestAndPipelineStats() throws Exception {
+        final Processor processor = mock(Processor.class);
+        when(processor.getType()).thenReturn("mock");
+        when(processor.getTag()).thenReturn("mockTag");
+        when(processor.isAsync()).thenReturn(true);
+
+        // avoid returning null and dropping the document
+        doAnswer(args -> {
+            @SuppressWarnings("unchecked")
+            BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
+            handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
+            return null;
+        }).when(processor).execute(any(IngestDocument.class), any());
+
+        // mock up an ingest service for returning a pipeline, this is used by the pipeline processor
+        final Pipeline[] pipelineToReturn = new Pipeline[1];
+        final IngestService pipelineIngestService = mock(IngestService.class);
+        when(pipelineIngestService.getPipeline(anyString())).thenAnswer(inv -> pipelineToReturn[0]);
+
+        IngestService ingestService = createWithProcessors(
+            Map.of(
+                "pipeline",
+                (factories, tag, description, config) -> new PipelineProcessor(tag, description, (params) -> new TemplateScript(params) {
+                    @Override
+                    public String execute() {
+                        return "_id3";
+                    } // this pipeline processor will always execute the '_id3' processor
+                }, false, pipelineIngestService),
+                "mock",
+                (factories, tag, description, config) -> processor
+            )
+        );
+
+        {
+            // all zeroes since nothing has executed
+            final IngestStats ingestStats = ingestService.stats();
+            assertThat(ingestStats.getPipelineStats().size(), equalTo(0));
+            assertStats(ingestStats.getTotalStats(), 0, 0, 0);
+        }
+
+        // put some pipelines, and now there are pipeline and processor stats, too
+        PutPipelineRequest putRequest1 = new PutPipelineRequest(
+            "_id1",
+            new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
+            XContentType.JSON
+        );
+        // n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below
+        PutPipelineRequest putRequest2 = new PutPipelineRequest(
+            "_id2",
+            new BytesArray("{\"processors\": [{\"pipeline\" : {}}]}"),
+            XContentType.JSON
+        );
+        PutPipelineRequest putRequest3 = new PutPipelineRequest(
+            "_id3",
+            new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
+            XContentType.JSON
+        );
+        ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
+        ClusterState previousClusterState = clusterState;
+        clusterState = executePut(putRequest1, clusterState);
+        clusterState = executePut(putRequest2, clusterState);
+        clusterState = executePut(putRequest3, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+
+        // hook up the mock ingest service to return pipeline3 when asked by the pipeline processor
+        pipelineToReturn[0] = ingestService.getPipeline("_id3");
+
+        {
+            final IngestStats ingestStats = ingestService.stats();
+            assertThat(ingestStats.getPipelineStats().size(), equalTo(3));
+
+            // total
+            assertStats(ingestStats.getTotalStats(), 0, 0, 0);
+            // pipeline
+            assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 0, 0, 0);
+            assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 0, 0, 0);
+            assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 0, 0, 0);
+            // processor
+            assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
+            assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
+            assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0);
+        }
+
+        // put a single document through ingest processing
+        final IndexRequest indexRequest = new IndexRequest("_index");
+        indexRequest.setPipeline("_id1").setFinalPipeline("_id2");
+        indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
+        ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE);
+
+        {
+            final IngestStats ingestStats = ingestService.stats();
+            assertThat(ingestStats.getPipelineStats().size(), equalTo(3));
+
+            // total
+            // see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2
+            // assertStats(ingestStats.getTotalStats(), 1, 0, 0);
+            // pipeline
+            assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0);
+            assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0);
+            assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 1, 0, 0);
+            // processor
+            assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
+            assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
+            assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0);
+        }
+    }
+
     public void testStats() throws Exception {
         final Processor processor = mock(Processor.class);
         final Processor processorFailure = mock(Processor.class);