Browse Source

Add `ingest` information to the cluster info endpoint (#96328)

Pablo Alcantar Morales 2 years ago
parent
commit
244da063ca

+ 6 - 0
docs/changelog/96328.yaml

@@ -0,0 +1,6 @@
+pr: 96328
+summary: Add `ingest` information to the cluster info endpoint
+area: Stats
+type: enhancement
+issues:
+ - 95392

+ 119 - 3
docs/reference/cluster/cluster-info.asciidoc

@@ -33,9 +33,14 @@ You can use the Cluster Info API to retrieve information of a cluster.
 A comma-separated list of the following options:
 +
 --
+`_all`::
+All the information available. Can not be mixed with other targets.
+
 `http`::
 HTTP connection information.
 
+`ingest`::
+Ingest information.
 --
 
 [role="child_attributes"]
@@ -126,6 +131,114 @@ Cumulative size in bytes of all requests from this client.
 ======
 
 
+[[cluster-info-api-response-body-ingest]]
+`ingest`::
+(object)
+Contains ingest information for the cluster.
++
+.Properties of `ingest`
+[%collapsible%open]
+======
+`total`::
+(object)
+Contains information about ingest operations for the cluster.
++
+.Properties of `total`
+[%collapsible%open]
+=======
+`count`::
+(integer)
+Total number of documents ingested across the cluster.
+
+`time`::
+(<<time-units,time value>>)
+Total time spent preprocessing ingest documents across the cluster.
+
+`time_in_millis`::
+(integer)
+Total time, in milliseconds, spent preprocessing ingest documents across the cluster.
+
+`current`::
+(integer)
+Total number of documents currently being ingested.
+
+`failed`::
+(integer)
+Total number of failed ingest operations across the cluster.
+=======
+
+`pipelines`::
+(object)
+Contains information about ingest pipelines for the cluster.
++
+.Properties of `pipelines`
+[%collapsible%open]
+=======
+`<pipeline_id>`::
+(object)
+Contains information about the ingest pipeline.
++
+.Properties of `<pipeline_id>`
+[%collapsible%open]
+========
+`count`::
+(integer)
+Number of documents preprocessed by the ingest pipeline.
+
+`time`::
+(<<time-units,time value>>)
+Total time spent preprocessing documents in the ingest pipeline.
+
+`time_in_millis`::
+(integer)
+Total time, in milliseconds, spent preprocessing documents in the ingest
+pipeline.
+
+`failed`::
+(integer)
+Total number of failed operations for the ingest pipeline.
+
+`processors`::
+(array of objects)
+Contains information for the ingest processors for the ingest pipeline.
++
+.Properties of `processors`
+[%collapsible%open]
+=========
+`<processor>`::
+(object)
+Contains information for the ingest processor.
++
+.Properties of `<processor>`
+[%collapsible%open]
+==========
+`count`::
+(integer)
+Number of documents transformed by the processor.
+
+`time`::
+(<<time-units,time value>>)
+Time spent by the processor transforming documents.
+
+`time_in_millis`::
+(integer)
+Time, in milliseconds, spent by the processor transforming documents.
+
+`current`::
+(integer)
+Number of documents currently being transformed by the processor.
+
+`failed`::
+(integer)
+Number of failed operations for the processor.
+==========
+=========
+========
+=======
+======
+
+
+
 [[cluster-info-api-example]]
 ==== {api-examples-title}
 
@@ -133,10 +246,13 @@ Cumulative size in bytes of all requests from this client.
 ----
 # returns all stats info of the cluster
 GET /_info/_all
-----
 
-[source,console]
-----
 # returns the http info of the cluster
 GET /_info/http
+
+# returns the http info of the cluster
+GET /_info/ingest
+
+# returns the http and ingest info of the cluster
+GET /_info/http,ingest
 ----

+ 76 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/15_info_ingest.yml

@@ -0,0 +1,76 @@
+---
+setup:
+  - skip:
+      version: " - 8.8.99"
+      reason: "/_info/ingest only available from v8.9"
+
+---
+teardown:
+  - do:
+      ingest.delete_pipeline:
+        id: "ingest_info_pipeline"
+        ignore: 404
+
+  - do:
+      indices.delete:
+        index: "ingest_info_index"
+        ignore_unavailable: true
+
+---
+"Cluster ingest information":
+  - do:
+      ingest.put_pipeline:
+        id: "ingest_info_pipeline"
+        body:  >
+          {
+            "description": "_description",
+            "processors": [
+              {
+                "set" : {
+                  "field": "pipeline",
+                  "value": "pipeline"
+                }
+              }
+            ]
+          }
+
+  - do:
+      bulk:
+        refresh: true
+        index: ingest_info_index
+        body:
+          - '{"create": {"pipeline" : "ingest_info_pipeline"}}'
+          - '{"some-field": "some-value"}'
+          - '{"create": {"pipeline" : "ingest_info_pipeline"}}'
+          - '{"some-field": "another-value"}'
+
+  - do:
+      cluster.info:
+        target: [ ingest ]
+
+  - is_true: cluster_name
+
+  # Summary ingest section
+  - is_true: ingest.total
+  - gte: { ingest.total.count: 2 }
+  - gte: { ingest.total.time_in_millis: 0 }
+  # next 2 conditions _should_ be 0, but because these yaml tests are sharing the same test cluster, other tests could
+  # pollute the information.
+  - gte: { ingest.total.current: 0 }
+  - gte: { ingest.total.failed: 0 }
+
+  # Pipelines section
+  - is_true: ingest.pipelines.ingest_info_pipeline
+  - gte: { ingest.pipelines.ingest_info_pipeline.count: 2 }
+  - gte: { ingest.pipelines.ingest_info_pipeline.time_in_millis: 0 }
+  - match: { ingest.pipelines.ingest_info_pipeline.current: 0 }
+  - match: { ingest.pipelines.ingest_info_pipeline.failed: 0 }
+
+  # Processors section
+  - is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set
+  - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.type: "set" }
+  - is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set.stats
+  - gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.count: 2 }
+  - gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.time_in_millis: 0 }
+  - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.current: 0 }
+  - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.failed: 0 }

+ 38 - 0
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.info/10_info_all.yml

@@ -0,0 +1,38 @@
+---
+setup:
+  - skip:
+      version: " - 8.8.99"
+      reason: "/_info/_all only available from v8.9"
+
+---
+"Cluster Info _all":
+  - do:
+      cluster.info:
+        target: [ _all ]
+
+  # this tests only checks that the target exists, to check the structure of them, we have specific tests
+  - is_true: cluster_name
+  - is_true: http
+  - is_true: ingest
+
+---
+"Cluster Info fails when mixing _all with other targets":
+  - do:
+      catch: bad_request
+      cluster.info:
+        target: [ _all, ingest ]
+
+  - match: { status: 400 }
+  - match: { error.type: illegal_argument_exception }
+  - match: { error.reason: "request [/_info/_all,ingest] contains _all and individual target [_all,ingest]" }
+
+---
+"Cluster Info fails with an invalid target":
+  - do:
+      catch: bad_request
+      cluster.info:
+        target: [ ingest, invalid_target ]
+
+  - match: { status: 400 }
+  - match: { error.type: illegal_argument_exception }
+  - match: { error.reason: "request [/_info/ingest,invalid_target] contains unrecognized target: [invalid_target]" }

+ 1 - 1
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/info.http/10_info_http.yml → rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.info/20_info_http.yml

@@ -1,5 +1,5 @@
 ---
-"HTTP Stats":
+"Cluster HTTP Info":
   - skip:
       version: " - 8.8.99"
       reason: "/_info/http only available from v8.9"

+ 69 - 12
server/src/main/java/org/elasticsearch/ingest/IngestStats.java

@@ -23,6 +23,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -34,6 +35,19 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
         Writeable,
         ChunkedToXContent {
 
+    private static final Comparator<PipelineStat> PIPELINE_STAT_COMPARATOR = (p1, p2) -> {
+        final Stats p2Stats = p2.stats;
+        final Stats p1Stats = p1.stats;
+        final int ingestTimeCompare = Long.compare(p2Stats.ingestTimeInMillis, p1Stats.ingestTimeInMillis);
+        if (ingestTimeCompare == 0) {
+            return Long.compare(p2Stats.ingestCount, p1Stats.ingestCount);
+        } else {
+            return ingestTimeCompare;
+        }
+    };
+
+    public static final IngestStats IDENTITY = new IngestStats(Stats.IDENTITY, List.of(), Map.of());
+
     /**
      * @param totalStats - The total stats for Ingest. This is logically the sum of all pipeline stats,
      *                   and pipeline stats are logically the sum of the processor stats.
@@ -41,16 +55,7 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
      * @param processorStats - The per-processor stats for a given pipeline. A map keyed by the pipeline identifier.
      */
     public IngestStats {
-        pipelineStats = pipelineStats.stream().sorted((p1, p2) -> {
-            final IngestStats.Stats p2Stats = p2.stats;
-            final IngestStats.Stats p1Stats = p1.stats;
-            final int ingestTimeCompare = Long.compare(p2Stats.ingestTimeInMillis, p1Stats.ingestTimeInMillis);
-            if (ingestTimeCompare == 0) {
-                return Long.compare(p2Stats.ingestCount, p1Stats.ingestCount);
-            } else {
-                return ingestTimeCompare;
-            }
-        }).toList();
+        pipelineStats = pipelineStats.stream().sorted(PIPELINE_STAT_COMPARATOR).toList();
     }
 
     /**
@@ -153,11 +158,30 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
         );
     }
 
+    public static IngestStats merge(IngestStats first, IngestStats second) {
+        return new IngestStats(
+            Stats.merge(first.totalStats, second.totalStats),
+            PipelineStat.merge(first.pipelineStats, second.pipelineStats),
+            merge(first.processorStats, second.processorStats)
+        );
+    }
+
+    static Map<String, List<ProcessorStat>> merge(Map<String, List<ProcessorStat>> first, Map<String, List<ProcessorStat>> second) {
+        var totalsPerPipelineProcessor = new HashMap<String, List<ProcessorStat>>();
+
+        first.forEach((pipelineId, stats) -> totalsPerPipelineProcessor.merge(pipelineId, stats, ProcessorStat::merge));
+        second.forEach((pipelineId, stats) -> totalsPerPipelineProcessor.merge(pipelineId, stats, ProcessorStat::merge));
+
+        return totalsPerPipelineProcessor;
+    }
+
     public record Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount)
         implements
             Writeable,
             ToXContentFragment {
 
+        public static final Stats IDENTITY = new Stats(0, 0, 0, 0);
+
         /**
          * Read from a stream.
          */
@@ -181,6 +205,15 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
             builder.field("failed", ingestFailedCount);
             return builder;
         }
+
+        static Stats merge(Stats first, Stats second) {
+            return new Stats(
+                first.ingestCount + second.ingestCount,
+                first.ingestTimeInMillis + second.ingestTimeInMillis,
+                first.ingestCurrent + second.ingestCurrent,
+                first.ingestFailedCount + second.ingestFailedCount
+            );
+        }
     }
 
     /**
@@ -216,10 +249,34 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
     /**
      * Container for pipeline stats.
      */
-    public record PipelineStat(String pipelineId, Stats stats) {}
+    public record PipelineStat(String pipelineId, Stats stats) {
+        static List<PipelineStat> merge(List<PipelineStat> first, List<PipelineStat> second) {
+            var totalsPerPipeline = new HashMap<String, Stats>();
+
+            first.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps.stats, Stats::merge));
+            second.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps.stats, Stats::merge));
+
+            return totalsPerPipeline.entrySet()
+                .stream()
+                .map(v -> new PipelineStat(v.getKey(), v.getValue()))
+                .sorted(PIPELINE_STAT_COMPARATOR)
+                .toList();
+        }
+    }
 
     /**
      * Container for processor stats.
      */
-    public record ProcessorStat(String name, String type, Stats stats) {}
+    public record ProcessorStat(String name, String type, Stats stats) {
+
+        // The list of ProcessorStats has *always* stats for each processor (even if processor was executed or not), so it's safe to zip
+        // both lists using a common index iterator.
+        private static List<ProcessorStat> merge(List<ProcessorStat> first, List<ProcessorStat> second) {
+            var merged = new ArrayList<ProcessorStat>();
+            for (var i = 0; i < first.size(); i++) {
+                merged.add(new ProcessorStat(first.get(i).name, first.get(i).type, Stats.merge(first.get(i).stats, second.get(i).stats)));
+            }
+            return merged;
+        }
+    }
 }

+ 11 - 3
server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java

@@ -8,7 +8,6 @@
 
 package org.elasticsearch.rest.action.info;
 
-import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@@ -18,6 +17,7 @@ import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
 import org.elasticsearch.http.HttpStats;
+import org.elasticsearch.ingest.IngestStats;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
@@ -36,13 +36,21 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.HTTP;
+import static org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.INGEST;
 import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
 
 public class RestClusterInfoAction extends BaseRestHandler {
 
     static final Map<String, Function<NodesStatsResponse, ChunkedToXContent>> RESPONSE_MAPPER = Map.of(
-        NodesInfoRequest.Metric.HTTP.metricName(),
-        nodesStatsResponse -> nodesStatsResponse.getNodes().stream().map(NodeStats::getHttp).reduce(HttpStats.IDENTITY, HttpStats::merge)
+        HTTP.metricName(),
+        nodesStatsResponse -> nodesStatsResponse.getNodes().stream().map(NodeStats::getHttp).reduce(HttpStats.IDENTITY, HttpStats::merge),
+        //
+        INGEST.metricName(),
+        nodesStatsResponse -> nodesStatsResponse.getNodes()
+            .stream()
+            .map(NodeStats::getIngestStats)
+            .reduce(IngestStats.IDENTITY, IngestStats::merge)
     );
     static final Set<String> AVAILABLE_TARGETS = RESPONSE_MAPPER.keySet();
 

+ 119 - 0
server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

@@ -13,10 +13,13 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
 public class IngestStatsTests extends ESTestCase {
 
     public void testSerialization() throws IOException {
@@ -28,6 +31,110 @@ public class IngestStatsTests extends ESTestCase {
         assertIngestStats(ingestStats, serializedStats);
     }
 
+    public void testStatsMerge() {
+        var first = randomStats();
+        var second = randomStats();
+        assertEquals(
+            new IngestStats.Stats(
+                first.ingestCount() + second.ingestCount(),
+                first.ingestTimeInMillis() + second.ingestTimeInMillis(),
+                first.ingestCurrent() + second.ingestCurrent(),
+                first.ingestFailedCount() + second.ingestFailedCount()
+            ),
+            IngestStats.Stats.merge(first, second)
+        );
+    }
+
+    public void testPipelineStatsMerge() {
+        var first = List.of(
+            randomPipelineStat("pipeline-1"),
+            randomPipelineStat("pipeline-1"),
+            randomPipelineStat("pipeline-2"),
+            randomPipelineStat("pipeline-3"),
+            randomPipelineStat("pipeline-5")
+        );
+        var second = List.of(
+            randomPipelineStat("pipeline-2"),
+            randomPipelineStat("pipeline-1"),
+            randomPipelineStat("pipeline-4"),
+            randomPipelineStat("pipeline-3")
+        );
+
+        assertThat(
+            IngestStats.PipelineStat.merge(first, second),
+            containsInAnyOrder(
+                new IngestStats.PipelineStat("pipeline-1", merge(first.get(0).stats(), first.get(1).stats(), second.get(1).stats())),
+                new IngestStats.PipelineStat("pipeline-2", merge(first.get(2).stats(), second.get(0).stats())),
+                new IngestStats.PipelineStat("pipeline-3", merge(first.get(3).stats(), second.get(3).stats())),
+                new IngestStats.PipelineStat("pipeline-4", second.get(2).stats()),
+                new IngestStats.PipelineStat("pipeline-5", first.get(4).stats())
+            )
+        );
+    }
+
+    public void testProcessorStatsMerge() {
+        {
+            var first = Map.of("pipeline-1", randomPipelineProcessorStats());
+            assertEquals(IngestStats.merge(Map.of(), first), first);
+            assertEquals(IngestStats.merge(first, Map.of()), first);
+        }
+        {
+            var first = Map.of(
+                "pipeline-1",
+                randomPipelineProcessorStats(),
+                "pipeline-2",
+                randomPipelineProcessorStats(),
+                "pipeline-3",
+                randomPipelineProcessorStats()
+            );
+            var second = Map.of(
+                "pipeline-2",
+                randomPipelineProcessorStats(),
+                "pipeline-3",
+                randomPipelineProcessorStats(),
+                "pipeline-1",
+                randomPipelineProcessorStats()
+            );
+
+            assertEquals(
+                IngestStats.merge(first, second),
+                Map.of(
+                    "pipeline-1",
+                    expectedPipelineProcessorStats(first.get("pipeline-1"), second.get("pipeline-1")),
+                    "pipeline-2",
+                    expectedPipelineProcessorStats(first.get("pipeline-2"), second.get("pipeline-2")),
+                    "pipeline-3",
+                    expectedPipelineProcessorStats(first.get("pipeline-3"), second.get("pipeline-3"))
+                )
+            );
+        }
+    }
+
+    private static List<IngestStats.ProcessorStat> expectedPipelineProcessorStats(
+        List<IngestStats.ProcessorStat> first,
+        List<IngestStats.ProcessorStat> second
+    ) {
+        return List.of(
+            new IngestStats.ProcessorStat("proc-1", "type-1", merge(first.get(0).stats(), second.get(0).stats())),
+            new IngestStats.ProcessorStat("proc-1", "type-2", merge(first.get(1).stats(), second.get(1).stats())),
+            new IngestStats.ProcessorStat("proc-2", "type-1", merge(first.get(2).stats(), second.get(2).stats())),
+            new IngestStats.ProcessorStat("proc-3", "type-4", merge(first.get(3).stats(), second.get(3).stats()))
+        );
+    }
+
+    private static List<IngestStats.ProcessorStat> randomPipelineProcessorStats() {
+        return List.of(
+            randomProcessorStat("proc-1", "type-1"),
+            randomProcessorStat("proc-1", "type-2"),
+            randomProcessorStat("proc-2", "type-1"),
+            randomProcessorStat("proc-3", "type-4")
+        );
+    }
+
+    private static IngestStats.Stats merge(IngestStats.Stats... stats) {
+        return Arrays.stream(stats).reduce(IngestStats.Stats.IDENTITY, IngestStats.Stats::merge);
+    }
+
     private static List<IngestStats.PipelineStat> createPipelineStats() {
         IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3));
         IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297));
@@ -98,4 +205,16 @@ public class IngestStatsTests extends ESTestCase {
             .map(IngestStats.PipelineStat::stats)
             .orElse(null);
     }
+
+    private static IngestStats.ProcessorStat randomProcessorStat(String name, String type) {
+        return new IngestStats.ProcessorStat(name, type, randomStats());
+    }
+
+    private static IngestStats.PipelineStat randomPipelineStat(String id) {
+        return new IngestStats.PipelineStat(id, randomStats());
+    }
+
+    private static IngestStats.Stats randomStats() {
+        return new IngestStats.Stats(randomLong(), randomLong(), randomLong(), randomLong());
+    }
 }