Browse Source

Added ingest statistics to node stats API

The ingest stats include the following statistics:
* `ingest.total.count`- The total number of document ingested during the lifetime of this node
* `ingest.total.time_in_millis` - The total time spent on ingest preprocessing documents during the lifetime of this node
* `ingest.total.current` - The total number of documents currently being ingested.
* `ingest.total.failed` - The total number ingest preprocessing operations failed during the lifetime of this node

Also these stats are returned on a per pipeline basis.
Martijn van Groningen 9 years ago
parent
commit
2fa33d5c47
19 changed files with 469 additions and 41 deletions
  1. 18 2
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java
  2. 16 0
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java
  3. 8 0
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java
  4. 2 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
  5. 1 1
      core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
  6. 1 1
      core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java
  7. 9 0
      core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
  8. 9 0
      core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java
  9. 171 0
      core/src/main/java/org/elasticsearch/ingest/IngestStats.java
  10. 118 23
      core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java
  11. 6 3
      core/src/main/java/org/elasticsearch/node/service/NodeService.java
  12. 2 1
      core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java
  13. 6 6
      core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
  14. 40 1
      core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java
  15. 23 0
      docs/reference/cluster/nodes-stats.asciidoc
  16. 3 0
      docs/reference/ingest/ingest-node.asciidoc
  17. 34 0
      rest-api-spec/src/main/resources/rest-api-spec/test/ingest/70_bulk.yaml
  18. 1 1
      test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java
  19. 1 1
      test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

+ 18 - 2
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java

@@ -31,6 +31,7 @@ import org.elasticsearch.discovery.DiscoveryStats;
 import org.elasticsearch.http.HttpStats;
 import org.elasticsearch.indices.NodeIndicesStats;
 import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
+import org.elasticsearch.ingest.IngestStats;
 import org.elasticsearch.monitor.fs.FsInfo;
 import org.elasticsearch.monitor.jvm.JvmStats;
 import org.elasticsearch.monitor.os.OsStats;
@@ -81,6 +82,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
     @Nullable
     private DiscoveryStats discoveryStats;
 
+    @Nullable
+    private IngestStats ingestStats;
+
     NodeStats() {
     }
 
@@ -89,7 +93,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
                      @Nullable FsInfo fs, @Nullable TransportStats transport, @Nullable HttpStats http,
                      @Nullable AllCircuitBreakerStats breaker,
                      @Nullable ScriptStats scriptStats,
-                     @Nullable DiscoveryStats discoveryStats) {
+                     @Nullable DiscoveryStats discoveryStats,
+                     @Nullable IngestStats ingestStats) {
         super(node);
         this.timestamp = timestamp;
         this.indices = indices;
@@ -103,6 +108,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         this.breaker = breaker;
         this.scriptStats = scriptStats;
         this.discoveryStats = discoveryStats;
+        this.ingestStats = ingestStats;
     }
 
     public long getTimestamp() {
@@ -187,6 +193,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         return this.discoveryStats;
     }
 
+    @Nullable
+    public IngestStats getIngestStats() {
+        return ingestStats;
+    }
+
     public static NodeStats readNodeStats(StreamInput in) throws IOException {
         NodeStats nodeInfo = new NodeStats();
         nodeInfo.readFrom(in);
@@ -224,7 +235,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         breaker = AllCircuitBreakerStats.readOptionalAllCircuitBreakerStats(in);
         scriptStats = in.readOptionalStreamable(ScriptStats::new);
         discoveryStats = in.readOptionalStreamable(() -> new DiscoveryStats(null));
-
+        ingestStats = in.readOptionalWritable(IngestStats.PROTO);
     }
 
     @Override
@@ -282,6 +293,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
         out.writeOptionalStreamable(breaker);
         out.writeOptionalStreamable(scriptStats);
         out.writeOptionalStreamable(discoveryStats);
+        out.writeOptionalWriteable(ingestStats);
     }
 
     @Override
@@ -337,6 +349,10 @@ public class NodeStats extends BaseNodeResponse implements ToXContent {
             getDiscoveryStats().toXContent(builder, params);
         }
 
+        if (getIngestStats() != null) {
+            getIngestStats().toXContent(builder, params);
+        }
+
         return builder;
     }
 }

+ 16 - 0
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

@@ -42,6 +42,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
     private boolean breaker;
     private boolean script;
     private boolean discovery;
+    private boolean ingest;
 
     public NodesStatsRequest() {
     }
@@ -69,6 +70,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         this.breaker = true;
         this.script = true;
         this.discovery = true;
+        this.ingest = true;
         return this;
     }
 
@@ -87,6 +89,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         this.breaker = false;
         this.script = false;
         this.discovery = false;
+        this.ingest = false;
         return this;
     }
 
@@ -250,6 +253,17 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         return this;
     }
 
+    public boolean ingest() {
+        return ingest;
+    }
+
+    /**
+     * Should ingest statistics be returned.
+     */
+    public NodesStatsRequest ingest(boolean ingest) {
+        this.ingest = ingest;
+        return this;
+    }
 
     @Override
     public void readFrom(StreamInput in) throws IOException {
@@ -265,6 +279,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         breaker = in.readBoolean();
         script = in.readBoolean();
         discovery = in.readBoolean();
+        ingest = in.readBoolean();
     }
 
     @Override
@@ -281,6 +296,7 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         out.writeBoolean(breaker);
         out.writeBoolean(script);
         out.writeBoolean(discovery);
+        out.writeBoolean(ingest);
     }
 
 }

+ 8 - 0
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequestBuilder.java

@@ -137,4 +137,12 @@ public class NodesStatsRequestBuilder extends NodesOperationRequestBuilder<Nodes
         request.discovery(discovery);
         return this;
     }
+
+    /**
+     * Should ingest statistics be returned.
+     */
+    public NodesStatsRequestBuilder ingest(boolean ingest) {
+        request.ingest(ingest);
+        return this;
+    }
 }

+ 2 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -80,7 +80,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
     protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
         NodesStatsRequest request = nodeStatsRequest.request;
         return nodeService.stats(request.indices(), request.os(), request.process(), request.jvm(), request.threadPool(),
-                request.fs(), request.transport(), request.http(), request.breaker(), request.script(), request.discovery());
+                request.fs(), request.transport(), request.http(), request.breaker(), request.script(), request.discovery(),
+                request.ingest());
     }
 
     @Override

+ 1 - 1
core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

@@ -99,7 +99,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
     @Override
     protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
         NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, true, false, true, false);
-        NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false, false);
+        NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, true, false, false, false, false, false, false);
         List<ShardStats> shardsStats = new ArrayList<>();
         for (IndexService indexService : indicesService) {
             for (IndexShard indexShard : indexService) {

+ 1 - 1
core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java

@@ -112,7 +112,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
                 logger.error("failed to execute pipeline for a bulk request", throwable);
                 listener.onFailure(throwable);
             } else {
-                long ingestTookInMillis = TimeUnit.MILLISECONDS.convert(System.nanoTime() - ingestStartTimeInNanos, TimeUnit.NANOSECONDS);
+                long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
                 BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
                 ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
                 if (bulkRequest.requests().isEmpty()) {

+ 9 - 0
core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder;
 import org.elasticsearch.common.text.Text;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
+import org.elasticsearch.ingest.IngestStats;
 import org.elasticsearch.search.rescore.RescoreBuilder;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.search.aggregations.AggregatorBuilder;
@@ -552,6 +553,14 @@ public abstract class StreamInput extends InputStream {
         }
     }
 
+    public <T extends Writeable> T readOptionalWritable(T prototype) throws IOException {
+        if (readBoolean()) {
+            return (T) prototype.readFrom(this);
+        } else {
+            return null;
+        }
+    }
+
     public <T extends Throwable> T readThrowable() throws IOException {
         if (readBoolean()) {
             int key = readVInt();

+ 9 - 0
core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -520,6 +520,15 @@ public abstract class StreamOutput extends OutputStream {
         }
     }
 
+    public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
+        if (writeable != null) {
+            writeBoolean(true);
+            writeable.writeTo(this);
+        } else {
+            writeBoolean(false);
+        }
+    }
+
     public void writeThrowable(Throwable throwable) throws IOException {
         if (throwable == null) {
             writeBoolean(false);

+ 171 - 0
core/src/main/java/org/elasticsearch/ingest/IngestStats.java

@@ -0,0 +1,171 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.ingest;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class IngestStats implements Writeable<IngestStats>, ToXContent {
+
+    public final static IngestStats PROTO = new IngestStats(null, null);
+
+    private final Stats totalStats;
+    private final Map<String, Stats> statsPerPipeline;
+
+    public IngestStats(Stats totalStats, Map<String, Stats> statsPerPipeline) {
+        this.totalStats = totalStats;
+        this.statsPerPipeline = statsPerPipeline;
+    }
+
+    /**
+     * @return The accumulated stats for all pipelines
+     */
+    public Stats getTotalStats() {
+        return totalStats;
+    }
+
+    /**
+     * @return The stats on a per pipeline basis
+     */
+    public Map<String, Stats> getStatsPerPipeline() {
+        return statsPerPipeline;
+    }
+
+    @Override
+    public IngestStats readFrom(StreamInput in) throws IOException {
+        Stats totalStats = Stats.PROTO.readFrom(in);
+        totalStats.readFrom(in);
+        int size = in.readVInt();
+        Map<String, Stats> statsPerPipeline = new HashMap<>(size);
+        for (int i = 0; i < size; i++) {
+            Stats stats = Stats.PROTO.readFrom(in);
+            statsPerPipeline.put(in.readString(), stats);
+            stats.readFrom(in);
+        }
+        return new IngestStats(totalStats, statsPerPipeline);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        totalStats.writeTo(out);
+        out.writeVLong(statsPerPipeline.size());
+        for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) {
+            out.writeString(entry.getKey());
+            entry.getValue().writeTo(out);
+        }
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject("ingest");
+        builder.startObject("total");
+        totalStats.toXContent(builder, params);
+        builder.endObject();
+        builder.startObject("pipelines");
+        for (Map.Entry<String, Stats> entry : statsPerPipeline.entrySet()) {
+            builder.startObject(entry.getKey());
+            entry.getValue().toXContent(builder, params);
+            builder.endObject();
+        }
+        builder.endObject();
+        builder.endObject();
+        return builder;
+    }
+
+    public static class Stats implements Writeable<Stats>, ToXContent {
+
+        private final static Stats PROTO = new Stats(0, 0, 0, 0);
+
+        private final long ingestCount;
+        private final long ingestTimeInMillis;
+        private final long ingestCurrent;
+        private final long ingestFailedCount;
+
+        public Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount) {
+            this.ingestCount = ingestCount;
+            this.ingestTimeInMillis = ingestTimeInMillis;
+            this.ingestCurrent = ingestCurrent;
+            this.ingestFailedCount = ingestFailedCount;
+        }
+
+        /**
+         * @return The total number of executed ingest preprocessing operations.
+         */
+        public long getIngestCount() {
+            return ingestCount;
+        }
+
+        /**
+         *
+         * @return The total time spent of ingest preprocessing in millis.
+         */
+        public long getIngestTimeInMillis() {
+            return ingestTimeInMillis;
+        }
+
+        /**
+         * @return The total number of ingest preprocessing operations currently executing.
+         */
+        public long getIngestCurrent() {
+            return ingestCurrent;
+        }
+
+        /**
+         * @return The total number of ingest preprocessing operations that have failed.
+         */
+        public long getIngestFailedCount() {
+            return ingestFailedCount;
+        }
+
+        @Override
+        public Stats readFrom(StreamInput in) throws IOException {
+            long ingestCount = in.readVLong();
+            long ingestTimeInMillis = in.readVLong();
+            long ingestCurrent = in.readVLong();
+            long ingestFailedCount = in.readVLong();
+            return new Stats(ingestCount, ingestTimeInMillis, ingestCurrent, ingestFailedCount);
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(ingestCount);
+            out.writeVLong(ingestTimeInMillis);
+            out.writeVLong(ingestCurrent);
+            out.writeVLong(ingestFailedCount);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field("count", ingestCount);
+            builder.timeValueField("time_in_millis", "time", ingestTimeInMillis, TimeUnit.MILLISECONDS);
+            builder.field("current", ingestCurrent);
+            builder.field("failed", ingestFailedCount);
+            return builder;
+        }
+    }
+}

+ 118 - 23
core/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java

@@ -19,23 +19,36 @@
 
 package org.elasticsearch.ingest;
 
+import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.metrics.CounterMetric;
+import org.elasticsearch.common.metrics.MeanMetric;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.ingest.core.IngestDocument;
 import org.elasticsearch.ingest.core.Pipeline;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
-public class PipelineExecutionService {
+public class PipelineExecutionService implements ClusterStateListener {
 
     private final PipelineStore store;
     private final ThreadPool threadPool;
 
+    private final StatsHolder totalStats = new StatsHolder();
+    private volatile Map<String, StatsHolder> statsHolderPerPipeline = Collections.emptyMap();
+
     public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) {
         this.store = store;
         this.threadPool = threadPool;
@@ -89,29 +102,85 @@ public class PipelineExecutionService {
         });
     }
 
+    public IngestStats stats() {
+        Map<String, StatsHolder> statsHolderPerPipeline = this.statsHolderPerPipeline;
+
+        Map<String, IngestStats.Stats> statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size());
+        for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
+            statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
+        }
+
+        return new IngestStats(totalStats.createStats(), statsPerPipeline);
+    }
+
+    @Override
+    public void clusterChanged(ClusterChangedEvent event) {
+        IngestMetadata ingestMetadata = event.state().getMetaData().custom(IngestMetadata.TYPE);
+        if (ingestMetadata != null) {
+            updatePipelineStats(ingestMetadata);
+        }
+    }
+
+    void updatePipelineStats(IngestMetadata ingestMetadata) {
+        boolean changed = false;
+        Map<String, StatsHolder> newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline);
+        for (String pipeline : newStatsPerPipeline.keySet()) {
+            if (ingestMetadata.getPipelines().containsKey(pipeline) == false) {
+                newStatsPerPipeline.remove(pipeline);
+                changed = true;
+            }
+        }
+        for (String pipeline : ingestMetadata.getPipelines().keySet()) {
+            if (newStatsPerPipeline.containsKey(pipeline) == false) {
+                newStatsPerPipeline.put(pipeline, new StatsHolder());
+                changed = true;
+            }
+        }
+
+        if (changed) {
+            statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline);
+        }
+    }
+
     private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
-        String index = indexRequest.index();
-        String type = indexRequest.type();
-        String id = indexRequest.id();
-        String routing = indexRequest.routing();
-        String parent = indexRequest.parent();
-        String timestamp = indexRequest.timestamp();
-        String ttl = indexRequest.ttl() == null ? null : indexRequest.ttl().toString();
-        Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
-        IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, timestamp, ttl, sourceAsMap);
-        pipeline.execute(ingestDocument);
-
-        Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
-        //it's fine to set all metadata fields all the time, as ingest document holds their starting values
-        //before ingestion, which might also get modified during ingestion.
-        indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX));
-        indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE));
-        indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID));
-        indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING));
-        indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT));
-        indexRequest.timestamp(metadataMap.get(IngestDocument.MetaData.TIMESTAMP));
-        indexRequest.ttl(metadataMap.get(IngestDocument.MetaData.TTL));
-        indexRequest.source(ingestDocument.getSourceAndMetadata());
+        long startTimeInNanos = System.nanoTime();
+        // the pipeline specific stat holder may not exist and that is fine:
+        // (e.g. the pipeline may have been removed while we're ingesting a document
+        Optional<StatsHolder> pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId()));
+        try {
+            totalStats.preIngest();
+            pipelineStats.ifPresent(StatsHolder::preIngest);
+            String index = indexRequest.index();
+            String type = indexRequest.type();
+            String id = indexRequest.id();
+            String routing = indexRequest.routing();
+            String parent = indexRequest.parent();
+            String timestamp = indexRequest.timestamp();
+            String ttl = indexRequest.ttl() == null ? null : indexRequest.ttl().toString();
+            Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
+            IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, timestamp, ttl, sourceAsMap);
+            pipeline.execute(ingestDocument);
+
+            Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
+            //it's fine to set all metadata fields all the time, as ingest document holds their starting values
+            //before ingestion, which might also get modified during ingestion.
+            indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX));
+            indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE));
+            indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID));
+            indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING));
+            indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT));
+            indexRequest.timestamp(metadataMap.get(IngestDocument.MetaData.TIMESTAMP));
+            indexRequest.ttl(metadataMap.get(IngestDocument.MetaData.TTL));
+            indexRequest.source(ingestDocument.getSourceAndMetadata());
+        } catch (Exception e) {
+            totalStats.ingestFailed();
+            pipelineStats.ifPresent(StatsHolder::ingestFailed);
+            throw e;
+        } finally {
+            long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
+            totalStats.postIngest(ingestTimeInMillis);
+            pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis));
+        }
     }
 
     private Pipeline getPipeline(String pipelineId) {
@@ -121,4 +190,30 @@ public class PipelineExecutionService {
         }
         return pipeline;
     }
+
+    static class StatsHolder {
+
+        private final MeanMetric ingestMetric = new MeanMetric();
+        private final CounterMetric ingestCurrent = new CounterMetric();
+        private final CounterMetric ingestFailed = new CounterMetric();
+
+        void preIngest() {
+            ingestCurrent.inc();
+        }
+
+        void postIngest(long ingestTimeInMillis) {
+            ingestCurrent.dec();
+            ingestMetric.inc(ingestTimeInMillis);
+        }
+
+        void ingestFailed() {
+            ingestFailed.inc();
+        }
+
+        IngestStats.Stats createStats() {
+            return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count());
+        }
+
+    }
+
 }

+ 6 - 3
core/src/main/java/org/elasticsearch/node/service/NodeService.java

@@ -90,6 +90,7 @@ public class NodeService extends AbstractComponent implements Closeable {
         this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
         this.settingsFilter = settingsFilter;
         clusterService.add(ingestService.getPipelineStore());
+        clusterService.add(ingestService.getPipelineExecutionService());
     }
 
     // can not use constructor injection or there will be a circular dependency
@@ -165,13 +166,14 @@ public class NodeService extends AbstractComponent implements Closeable {
                 httpServer == null ? null : httpServer.stats(),
                 circuitBreakerService.stats(),
                 scriptService.stats(),
-                discovery.stats()
+                discovery.stats(),
+                ingestService.getPipelineExecutionService().stats()
         );
     }
 
     public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool,
                            boolean fs, boolean transport, boolean http, boolean circuitBreaker,
-                           boolean script, boolean discoveryStats) {
+                           boolean script, boolean discoveryStats, boolean ingest) {
         // for indices stats we want to include previous allocated shards stats as well (it will
         // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
         return new NodeStats(discovery.localNode(), System.currentTimeMillis(),
@@ -185,7 +187,8 @@ public class NodeService extends AbstractComponent implements Closeable {
                 http ? (httpServer == null ? null : httpServer.stats()) : null,
                 circuitBreaker ? circuitBreakerService.stats() : null,
                 script ? scriptService.stats() : null,
-                discoveryStats ? discovery.stats() : null
+                discoveryStats ? discovery.stats() : null,
+                ingest ? ingestService.getPipelineExecutionService().stats() : null
         );
     }
 

+ 2 - 1
core/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/stats/RestNodesStatsAction.java

@@ -81,6 +81,7 @@ public class RestNodesStatsAction extends BaseRestHandler {
             nodesStatsRequest.breaker(metrics.contains("breaker"));
             nodesStatsRequest.script(metrics.contains("script"));
             nodesStatsRequest.discovery(metrics.contains("discovery"));
+            nodesStatsRequest.ingest(metrics.contains("ingest"));
 
             // check for index specific metrics
             if (metrics.contains("indices")) {
@@ -113,6 +114,6 @@ public class RestNodesStatsAction extends BaseRestHandler {
             nodesStatsRequest.indices().includeSegmentFileSizes(true);
         }
 
-        client.admin().cluster().nodesStats(nodesStatsRequest, new RestToXContentListener<NodesStatsResponse>(channel));
+        client.admin().cluster().nodesStats(nodesStatsRequest, new RestToXContentListener<>(channel));
     }
 }

+ 6 - 6
core/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java

@@ -143,11 +143,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         NodeStats[] nodeStats = new NodeStats[] {
                 new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null),
+                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null),
+                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null)
+                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null)
         };
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvaiableUsages, newMostAvaiableUsages);
         DiskUsage leastNode_1 = newLeastAvaiableUsages.get("node_1");
@@ -184,11 +184,11 @@ public class DiskUsageTests extends ESTestCase {
         };
         NodeStats[] nodeStats = new NodeStats[] {
                 new NodeStats(new DiscoveryNode("node_1", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null),
+                        null,null,null,null,null,new FsInfo(0, node1FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_2", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null),
+                        null,null,null,null,null, new FsInfo(0, node2FSInfo), null,null,null,null,null, null),
                 new NodeStats(new DiscoveryNode("node_3", DummyTransportAddress.INSTANCE, Version.CURRENT), 0,
-                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null)
+                        null,null,null,null,null, new FsInfo(0, node3FSInfo), null,null,null,null,null, null)
         };
         InternalClusterInfoService.fillDiskUsagePerNode(logger, nodeStats, newLeastAvailableUsages, newMostAvailableUsages);
         DiskUsage leastNode_1 = newLeastAvailableUsages.get("node_1");

+ 40 - 1
core/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java

@@ -25,6 +25,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.ingest.core.CompoundProcessor;
 import org.elasticsearch.ingest.core.IngestDocument;
@@ -38,15 +39,16 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doAnswer;
@@ -341,6 +343,43 @@ public class PipelineExecutionServiceTests extends ESTestCase {
         verify(completionHandler, times(1)).accept(null);
     }
 
+    public void testStats() throws Exception {
+        IngestStats ingestStats = executionService.stats();
+        assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(0));
+        assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(0L));
+        assertThat(ingestStats.getTotalStats().getIngestCurrent(), equalTo(0L));
+        assertThat(ingestStats.getTotalStats().getIngestFailedCount(), equalTo(0L));
+        assertThat(ingestStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L));
+
+        when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, new CompoundProcessor()));
+        when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, new CompoundProcessor()));
+
+        Map<String, PipelineConfiguration> configurationMap = new HashMap<>();
+        configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}")));
+        configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}")));
+        executionService.updatePipelineStats(new IngestMetadata(configurationMap));
+
+        Consumer<Throwable> failureHandler = mock(Consumer.class);
+        Consumer<Boolean> completionHandler = mock(Consumer.class);
+
+        IndexRequest indexRequest = new IndexRequest("_index");
+        indexRequest.setPipeline("_id1");
+        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
+        ingestStats = executionService.stats();
+        assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(2));
+        assertThat(ingestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
+        assertThat(ingestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L));
+        assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(1L));
+
+        indexRequest.setPipeline("_id2");
+        executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
+        ingestStats = executionService.stats();
+        assertThat(ingestStats.getStatsPerPipeline().size(), equalTo(2));
+        assertThat(ingestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L));
+        assertThat(ingestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L));
+        assertThat(ingestStats.getTotalStats().getIngestCount(), equalTo(2L));
+    }
+
     private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {
         return argThat(new IngestDocumentMatcher(index, type, id, source));
     }

+ 23 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -60,6 +60,9 @@ of `indices`, `os`, `process`, `jvm`, `transport`, `http`,
 `discovery`::
 	Statistics about the discovery
 
+`ingest`::
+    Statistics about ingest preprocessing
+
 [source,js]
 --------------------------------------------------
 # return indices and os
@@ -227,3 +230,23 @@ curl -XGET 'http://localhost:9200/_nodes/stats?pretty&groups=_all'
 # Some groups from just the indices stats
 curl -XGET 'http://localhost:9200/_nodes/stats/indices?pretty&groups=foo,bar'
 --------------------------------------------------
+
+[float]
+[[ingest-stats]]
+=== Ingest statistics
+
+The `ingest` flag can be set to retrieve statistics that concern ingest:
+
+`ingest.total.count`::
+    The total number of document ingested during the lifetime of this node
+
+`ingest.total.time_in_millis`::
+    The total time spent on ingest preprocessing documents during the lifetime of this node
+
+`ingest.total.current`::
+    The total number of documents currently being ingested.
+
+`ingest.total.failed`::
+    The total number ingest preprocessing operations failed during the lifetime of this node
+
+On top of these overall ingest statistics, these statistics are also provided on a per pipeline basis.

+ 3 - 0
docs/reference/ingest/ingest-node.asciidoc

@@ -634,6 +634,9 @@ plugin.mandatory: ingest-attachment,ingest-geoip
 
 A node will not start if either of these plugins are not available.
 
+The <<ingest-stats,node stats API>> can be used to fetch ingest usage statistics, globally and on a per
+pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.
+
 [[append-procesesor]]
 === Append Processor
 Appends one or more values to an existing array if the field already exists and it is an array.

+ 34 - 0
rest-api-spec/src/main/resources/rest-api-spec/test/ingest/70_bulk.yaml

@@ -68,6 +68,23 @@ setup:
   - is_false: _source.field1
   - is_false: _source.field2
 
+  - do:
+    cluster.state: {}
+    # Get master node id
+  - set: { master_node: master }
+
+  - do:
+      nodes.stats:
+        metric: [ ingest ]
+  - gte: {nodes.$master.ingest.total.count: 1}
+  - gte: {nodes.$master.ingest.total.failed: 0}
+  - gte: {nodes.$master.ingest.total.time_in_millis: 0}
+  - match: {nodes.$master.ingest.total.current: 0}
+  - match: {nodes.$master.ingest.pipelines.pipeline1.count: 1}
+  - match: {nodes.$master.ingest.pipelines.pipeline1.failed: 0}
+  - gte: {nodes.$master.ingest.pipelines.pipeline1.time_in_millis: 0}
+  - match: {nodes.$master.ingest.pipelines.pipeline1.current: 0}
+
 ---
 "Test bulk request with default pipeline":
 
@@ -88,6 +105,23 @@ setup:
           - f1: v2
   - gte: { ingest_took: 0 }
 
+  - do:
+      cluster.state: {}
+  # Get master node id
+  - set: { master_node: master }
+
+  - do:
+      nodes.stats:
+        metric: [ ingest ]
+  - gte: {nodes.$master.ingest.total.count: 1}
+  - gte: {nodes.$master.ingest.total.failed: 0}
+  - gte: {nodes.$master.ingest.total.time_in_millis: 0}
+  - match: {nodes.$master.ingest.total.current: 0}
+  - match: {nodes.$master.ingest.pipelines.pipeline2.count: 1}
+  - match: {nodes.$master.ingest.pipelines.pipeline2.failed: 0}
+  - gte: {nodes.$master.ingest.pipelines.pipeline2.time_in_millis: 0}
+  - match: {nodes.$master.ingest.pipelines.pipeline2.current: 0}
+
   - do:
       get:
         index: test_index

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

@@ -74,7 +74,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
             null, null, null, null, null,
             fsInfo,
             null, null, null,
-            null, null);
+            null, null, null);
     }
 
     @Inject

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -1850,7 +1850,7 @@ public final class InternalTestCluster extends TestCluster {
                 }
 
                 NodeService nodeService = getInstanceFromNode(NodeService.class, nodeAndClient.node);
-                NodeStats stats = nodeService.stats(CommonStatsFlags.ALL, false, false, false, false, false, false, false, false, false, false);
+                NodeStats stats = nodeService.stats(CommonStatsFlags.ALL, false, false, false, false, false, false, false, false, false, false, false);
                 assertThat("Fielddata size must be 0 on node: " + stats.getNode(), stats.getIndices().getFieldData().getMemorySizeInBytes(), equalTo(0L));
                 assertThat("Query cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getQueryCache().getMemorySizeInBytes(), equalTo(0L));
                 assertThat("FixedBitSet cache size must be 0 on node: " + stats.getNode(), stats.getIndices().getSegments().getBitsetMemoryInBytes(), equalTo(0L));