Explorar o código

Include doc size info in ingest stats (#107240)

Add ingested_in_bytes and produced_in_bytes stats to pipeline ingest stats.
These track how many bytes are ingested and produced by a given pipeline.
For efficiency, these stats are recorded for the first pipeline to process a 
document. Thus, if a pipeline is called as a final pipeline after a default pipeline,
as a pipeline processor, and after a reroute request, a document will not 
contribute to the stats for that pipeline. If a given pipeline has 0 bytes recorded
for both of these stats, due to not being the first pipeline to run any doc, these
stats will not appear in the pipeline's entry in ingest stats.
Larisa Motova hai 1 ano
pai
achega
a01baa3d79

+ 6 - 0
docs/changelog/107240.yaml

@@ -0,0 +1,6 @@
+pr: 107240
+summary: Include doc size info in ingest stats
+area: Ingest Node
+type: enhancement
+issues:
+ - 106386

+ 16 - 0
docs/reference/cluster/cluster-info.asciidoc

@@ -207,6 +207,22 @@ pipeline.
 (integer)
 Total number of failed operations for the ingest pipeline.
 
+`ingested_in_bytes`::
+(Optional, integer)
+Total number of bytes of all documents ingested by the pipeline.
+This field is only present on pipelines which are the first to process a document.
+Thus, it is not present on pipelines which only serve as a final pipeline after a default pipeline, a pipeline run after
+a reroute processor, or pipelines in pipeline processors.
+
+`produced_in_bytes`::
+(Optional, integer)
+Total number of bytes of all documents produced by the pipeline.
+This field is only present on pipelines which are the first to process a document.
+Thus, it is not present on pipelines which only serve as a final pipeline after a default pipeline, a pipeline run after
+a reroute processor, or pipelines in pipeline processors.
+In situations where there are subsequent pipelines, the value represents the size of the document after all pipelines
+have run.
+
 `processors`::
 (array of objects)
 Contains information for the ingest processors for the ingest pipeline.

+ 16 - 0
docs/reference/cluster/nodes-stats.asciidoc

@@ -2643,6 +2643,22 @@ pipeline.
 (integer)
 Total number of failed operations for the ingest pipeline.
 
+`ingested_in_bytes`::
+(Optional, integer)
+Total number of bytes of all documents ingested by the pipeline.
+This field is only present on pipelines which are the first to process a document.
+Thus, it is not present on pipelines which only serve as a final pipeline after a default pipeline, a pipeline run after
+a reroute processor, or pipelines in pipeline processors.
+
+`produced_in_bytes`::
+(Optional, integer)
+Total number of bytes of all documents produced by the pipeline.
+This field is only present on pipelines which are the first to process a document.
+Thus, it is not present on pipelines which only serve as a final pipeline after a default pipeline, a pipeline run after
+a reroute processor, or pipelines in pipeline processors.
+In situations where there are subsequent pipelines, the value represents the size of the document after all pipelines
+have run.
+
 `processors`::
 (array of objects)
 Contains statistics for the ingest processors for the ingest pipeline.

+ 261 - 0
modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/15_info_ingest.yml

@@ -16,6 +16,31 @@ teardown:
         index: "ingest_info_index"
         ignore_unavailable: true
 
+  - do:
+      indices.delete:
+        index: "index-1"
+        ignore_unavailable: true
+
+  - do:
+      indices.delete:
+        index: "index-2"
+        ignore_unavailable: true
+
+  - do:
+      indices.delete:
+        index: "an-index"
+        ignore_unavailable: true
+
+  - do:
+      ingest.delete_pipeline:
+        id: "pipeline-1"
+        ignore: 404
+
+  - do:
+      ingest.delete_pipeline:
+        id: "pipeline-2"
+        ignore: 404
+
 ---
 "Cluster ingest information":
   - do:
@@ -65,6 +90,8 @@ teardown:
   - gte: { ingest.pipelines.ingest_info_pipeline.time_in_millis: 0 }
   - match: { ingest.pipelines.ingest_info_pipeline.current: 0 }
   - match: { ingest.pipelines.ingest_info_pipeline.failed: 0 }
+  - gt: { ingest.pipelines.ingest_info_pipeline.ingested_in_bytes: 0 }
+  - gt: { ingest.pipelines.ingest_info_pipeline.produced_in_bytes: 0 }
 
   # Processors section
   - is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set
@@ -74,3 +101,237 @@ teardown:
   - gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.time_in_millis: 0 }
   - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.current: 0 }
   - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.failed: 0 }
+
+---
+"Test bytes_produced not increased when pipeline fails":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-1"
+        body:  >
+          {
+            "processors": [
+              {
+                "pipeline": {
+                  "name": "fake-pipeline"
+                }
+              }
+            ]
+          }
+  - do:
+      bulk:
+        refresh: true
+        index: an-index
+        body:
+          - '{"create": {"pipeline" : "pipeline-1"}}'
+          - '{"some-field": "some-value"}'
+
+  - do:
+      cluster.info:
+        target: [ ingest ]
+  - match: { ingest.pipelines.pipeline-1.failed: 1 }
+  - gt: { ingest.pipelines.pipeline-1.ingested_in_bytes: 0 }
+  - match: { ingest.pipelines.pipeline-1.produced_in_bytes: 0 }
+
+---
+"Test drop processor":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-1"
+        body:  >
+          {
+            "processors": [
+              {
+                "drop" : {}
+              }
+            ]
+          }
+  - do:
+      bulk:
+        refresh: true
+        index: an-index
+        body:
+          - '{"create": {"pipeline" : "pipeline-1"}}'
+          - '{"some-field": "some-value"}'
+
+  - do:
+      cluster.info:
+        target: [ ingest ]
+  - gt: { ingest.pipelines.pipeline-1.ingested_in_bytes: 0 }
+  - match: { ingest.pipelines.pipeline-1.produced_in_bytes: 0 }
+
+---
+"Test that pipeline processor has byte stats recorded in first pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-1"
+        body:  >
+          {
+            "processors": [
+              {
+                "pipeline": {
+                  "name": "pipeline-2"
+                }
+              }
+            ]
+          }
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-2"
+        body:  >
+          {
+            "processors": [
+              {
+                "set" : {
+                  "field": "added-in-second-pipeline",
+                  "value": "foo bar baz"
+                }
+              }
+            ]
+          }
+  - do:
+      indices.create:
+        index: an-index
+        body:
+          settings:
+            index:
+              default_pipeline: "pipeline-1"
+  - do:
+      bulk:
+        refresh: true
+        index: index-foo
+        body:
+          - '{"index": { "_index": "an-index", "_id": 1 }}'
+          - '{"some-field": 1 }'
+  - do:
+      get:
+        id: 1
+        index: an-index
+  - match: { _source.added-in-second-pipeline: "foo bar baz" }
+
+  - do:
+      cluster.info:
+        target: [ ingest ]
+  - gt: { ingest.pipelines.pipeline-1.ingested_in_bytes: 0 }
+  - set: { ingest.pipelines.pipeline-1.ingested_in_bytes: ingest_bytes }
+  - gt: { ingest.pipelines.pipeline-1.produced_in_bytes: $ingest_bytes }
+  - match: { ingest.pipelines.pipeline-2.ingested_in_bytes: null }
+  - match: { ingest.pipelines.pipeline-2.produced_in_bytes: null }
+
+---
+"Test that final pipeline has byte stats recorded in first pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-1"
+        body:  >
+          {
+            "processors": []
+          }
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-2"
+        body:  >
+          {
+            "processors": [
+              {
+                "set" : {
+                  "field": "added-in-second-pipeline",
+                  "value": "foo bar baz"
+                }
+              }
+            ]
+          }
+  - do:
+      indices.create:
+        index: an-index
+        body:
+          settings:
+            index:
+              default_pipeline: "pipeline-1"
+              final_pipeline: "pipeline-2"
+  - do:
+      bulk:
+        refresh: true
+        index: index-foo
+        body:
+          - '{"index": { "_index": "an-index", "_id": 1 }}'
+          - '{"some-field": 1 }'
+  - do:
+      get:
+        id: 1
+        index: an-index
+  - match: { _source.added-in-second-pipeline: "foo bar baz" }
+
+  - do:
+      cluster.info:
+        target: [ ingest ]
+  - gt: { ingest.pipelines.pipeline-1.ingested_in_bytes: 0 }
+  - set: { ingest.pipelines.pipeline-1.ingested_in_bytes: ingest_bytes }
+  - gt: { ingest.pipelines.pipeline-1.produced_in_bytes: $ingest_bytes }
+  - match: { ingest.pipelines.pipeline-2.ingested_in_bytes: null }
+  - match: { ingest.pipelines.pipeline-2.produced_in_bytes: null }
+
+
+---
+"Test that reroute processor has byte stats recorded in first pipeline":
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-1"
+        body:  >
+          {
+            "processors": [
+              {
+                "reroute": {
+                  "destination": "index-2"
+                }
+              }
+            ]
+          }
+  - do:
+      ingest.put_pipeline:
+        id: "pipeline-2"
+        body:  >
+          {
+            "processors": [
+              {
+                "set" : {
+                  "field": "added-in-second-pipeline",
+                  "value": "foo bar baz"
+                }
+              }
+            ]
+          }
+  - do:
+      indices.create:
+        index: index-1
+        body:
+          settings:
+            index:
+              default_pipeline: "pipeline-1"
+  - do:
+      indices.create:
+        index: index-2
+        body:
+          settings:
+            index:
+              default_pipeline: "pipeline-2"
+  - do:
+      bulk:
+        refresh: true
+        index: index-1
+        body:
+          - '{"index": { "_index": "index-1", "_id": 1 }}'
+          - '{"some-field": 1 }'
+  - do:
+      get:
+        id: 1
+        index: index-2
+  - match: { _source.added-in-second-pipeline: "foo bar baz" }
+
+  - do:
+      cluster.info:
+        target: [ ingest ]
+  - gt: { ingest.pipelines.pipeline-1.ingested_in_bytes: 0 }
+  - set: { ingest.pipelines.pipeline-1.ingested_in_bytes: ingest_bytes }
+  - gt: { ingest.pipelines.pipeline-1.produced_in_bytes: $ingest_bytes }
+  - match: { ingest.pipelines.pipeline-2.ingested_in_bytes: null }
+  - match: { ingest.pipelines.pipeline-2.produced_in_bytes: null }

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -168,6 +168,7 @@ public class TransportVersions {
     public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO = def(8_659_00_0);
     public static final TransportVersion ML_INFERENCE_COHERE_COMPLETION_ADDED = def(8_660_00_0);
     public static final TransportVersion ESQL_REMOVE_ES_SOURCE_OPTIONS = def(8_661_00_0);
+    public static final TransportVersion NODE_STATS_INGEST_BYTES = def(8_662_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 57 - 0
server/src/main/java/org/elasticsearch/ingest/IngestPipelineMetric.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.ingest;
+
+import org.elasticsearch.common.metrics.CounterMetric;
+
+/**
+ * <p>Metrics to measure ingest actions, specific to pipelines.
+ */
+public class IngestPipelineMetric extends IngestMetric {
+
+    /**
+     * The amount of bytes ingested by a pipeline.
+     */
+    private final CounterMetric bytesIngested = new CounterMetric();
+
+    /**
+     * The amount of bytes produced by a pipeline.
+     */
+    private final CounterMetric bytesProduced = new CounterMetric();
+
+    void add(IngestPipelineMetric metrics) {
+        super.add(metrics);
+        bytesIngested.inc(metrics.bytesIngested.count());
+        bytesProduced.inc(metrics.bytesProduced.count());
+    }
+
+    /**
+     * Call this prior to the ingest action.
+     * @param bytesIngested The number of bytes ingested by the pipeline.
+     */
+    void preIngestBytes(long bytesIngested) {
+        this.bytesIngested.inc(bytesIngested);
+    }
+
+    /**
+     * Call this after performing the ingest action.
+     * @param bytesProduced The number of bytes resulting from running a request in the pipeline.
+     */
+    void postIngestBytes(long bytesProduced) {
+        this.bytesProduced.inc(bytesProduced);
+    }
+
+    /**
+     * Creates a serializable representation for these metrics.
+     */
+    IngestStats.ByteStats createByteStats() {
+        return new IngestStats.ByteStats(this.bytesIngested.count(), this.bytesProduced.count());
+    }
+
+}

+ 11 - 0
server/src/main/java/org/elasticsearch/ingest/IngestService.java

@@ -731,6 +731,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         }
 
                         PipelineIterator pipelines = getAndResetPipelines(indexRequest);
+                        Pipeline firstPipeline = pipelines.peekFirst();
                         if (pipelines.hasNext() == false) {
                             i++;
                             continue;
@@ -739,6 +740,9 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                         // start the stopwatch and acquire a ref to indicate that we're working on this document
                         final long startTimeInNanos = System.nanoTime();
                         totalMetrics.preIngest();
+                        if (firstPipeline != null) {
+                            firstPipeline.getMetrics().preIngestBytes(indexRequest.ramBytesUsed());
+                        }
                         final int slot = i;
                         final Releasable ref = refs.acquire();
                         final DocumentSizeObserver documentSizeObserver = documentParsingProvider.newDocumentSizeObserver();
@@ -754,6 +758,9 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                                     if (result.success) {
                                         if (result.shouldKeep == false) {
                                             onDropped.accept(slot);
+                                        } else {
+                                            assert firstPipeline != null;
+                                            firstPipeline.getMetrics().postIngestBytes(indexRequest.ramBytesUsed());
                                         }
                                     } else {
                                         // We were given a failure result in the onResponse method, so we must store the failure
@@ -860,6 +867,10 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
         public PipelineSlot next() {
             return pipelineSlotIterator.next();
         }
+
+        public Pipeline peekFirst() {
+            return getPipeline(defaultPipeline != null ? defaultPipeline : finalPipeline);
+        }
     }
 
     private void executePipelines(

+ 58 - 18
server/src/main/java/org/elasticsearch/ingest/IngestStats.java

@@ -8,6 +8,7 @@
 
 package org.elasticsearch.ingest;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -34,16 +35,9 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
         Writeable,
         ChunkedToXContent {
 
-    private static final Comparator<PipelineStat> PIPELINE_STAT_COMPARATOR = (p1, p2) -> {
-        final Stats p2Stats = p2.stats;
-        final Stats p1Stats = p1.stats;
-        final int ingestTimeCompare = Long.compare(p2Stats.ingestTimeInMillis, p1Stats.ingestTimeInMillis);
-        if (ingestTimeCompare == 0) {
-            return Long.compare(p2Stats.ingestCount, p1Stats.ingestCount);
-        } else {
-            return ingestTimeCompare;
-        }
-    };
+    private static final Comparator<PipelineStat> PIPELINE_STAT_COMPARATOR = Comparator.comparingLong(
+        (PipelineStat p) -> p.stats.ingestTimeInMillis
+    ).thenComparingLong((PipelineStat p) -> p.stats.ingestCount).thenComparingLong((PipelineStat p) -> p.byteStats.bytesProduced);
 
     public static final IngestStats IDENTITY = new IngestStats(Stats.IDENTITY, List.of(), Map.of());
 
@@ -69,7 +63,10 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
         for (var i = 0; i < size; i++) {
             var pipelineId = in.readString();
             var pipelineStat = new Stats(in);
-            pipelineStats.add(new PipelineStat(pipelineId, pipelineStat));
+            var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.NODE_STATS_INGEST_BYTES)
+                ? new ByteStats(in)
+                : new ByteStats(0, 0);
+            pipelineStats.add(new PipelineStat(pipelineId, pipelineStat, byteStat));
             int processorsSize = in.readVInt();
             var processorStatsPerPipeline = new ArrayList<ProcessorStat>(processorsSize);
             for (var j = 0; j < processorsSize; j++) {
@@ -91,6 +88,9 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
         for (PipelineStat pipelineStat : pipelineStats) {
             out.writeString(pipelineStat.pipelineId());
             pipelineStat.stats().writeTo(out);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_STATS_INGEST_BYTES)) {
+                pipelineStat.byteStats().writeTo(out);
+            }
             List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.pipelineId());
             if (processorStatsForPipeline == null) {
                 out.writeVInt(0);
@@ -124,6 +124,7 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
                     Iterators.single((builder, params) -> {
                         builder.startObject(pipelineStat.pipelineId());
                         pipelineStat.stats().toXContent(builder, params);
+                        pipelineStat.byteStats().toXContent(builder, params);
                         builder.startArray("processors");
                         return builder;
                     }),
@@ -223,8 +224,10 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
             return this;
         }
 
-        Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
-            this.pipelineStats.add(new PipelineStat(pipelineId, pipelineMetric.createStats()));
+        Builder addPipelineMetrics(String pipelineId, IngestPipelineMetric ingestPipelineMetrics) {
+            this.pipelineStats.add(
+                new PipelineStat(pipelineId, ingestPipelineMetrics.createStats(), ingestPipelineMetrics.createByteStats())
+            );
             return this;
         }
 
@@ -242,19 +245,56 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
     /**
      * Container for pipeline stats.
      */
-    public record PipelineStat(String pipelineId, Stats stats) {
+    public record PipelineStat(String pipelineId, Stats stats, ByteStats byteStats) {
         static List<PipelineStat> merge(List<PipelineStat> first, List<PipelineStat> second) {
-            var totalsPerPipeline = new HashMap<String, Stats>();
+            var totalsPerPipeline = new HashMap<String, PipelineStat>();
 
-            first.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps.stats, Stats::merge));
-            second.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps.stats, Stats::merge));
+            first.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps, PipelineStat::merge));
+            second.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps, PipelineStat::merge));
 
             return totalsPerPipeline.entrySet()
                 .stream()
-                .map(v -> new PipelineStat(v.getKey(), v.getValue()))
+                .map(v -> new PipelineStat(v.getKey(), v.getValue().stats, v.getValue().byteStats))
                 .sorted(PIPELINE_STAT_COMPARATOR)
                 .toList();
         }
+
+        private static PipelineStat merge(PipelineStat first, PipelineStat second) {
+            assert first.pipelineId.equals(second.pipelineId) : "Can only merge stats from the same pipeline";
+            return new PipelineStat(
+                first.pipelineId,
+                Stats.merge(first.stats, second.stats),
+                ByteStats.merge(first.byteStats, second.byteStats)
+            );
+        }
+    }
+
+    /**
+     * Container for ingested byte stats
+     */
+    public record ByteStats(long bytesIngested, long bytesProduced) implements Writeable, ToXContentFragment {
+        public ByteStats(StreamInput in) throws IOException {
+            this(in.readVLong(), in.readVLong());
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(bytesIngested);
+            out.writeVLong(bytesProduced);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            if (bytesIngested > 0 || bytesProduced > 0) {
+                builder.field("ingested_in_bytes", bytesIngested);
+                builder.field("produced_in_bytes", bytesProduced);
+            }
+            return builder;
+        }
+
+        static ByteStats merge(ByteStats first, ByteStats second) {
+            return new ByteStats((first.bytesIngested + second.bytesIngested), first.bytesProduced + second.bytesProduced);
+        }
     }
 
     /**

+ 3 - 3
server/src/main/java/org/elasticsearch/ingest/Pipeline.java

@@ -38,7 +38,7 @@ public final class Pipeline {
     @Nullable
     private final Map<String, Object> metadata;
     private final CompoundProcessor compoundProcessor;
-    private final IngestMetric metrics;
+    private final IngestPipelineMetric metrics;
     private final LongSupplier relativeTimeProvider;
     @Nullable
     private final Boolean deprecated;
@@ -79,7 +79,7 @@ public final class Pipeline {
         this.metadata = metadata;
         this.compoundProcessor = compoundProcessor;
         this.version = version;
-        this.metrics = new IngestMetric();
+        this.metrics = new IngestPipelineMetric();
         this.relativeTimeProvider = relativeTimeProvider;
         this.deprecated = deprecated;
     }
@@ -199,7 +199,7 @@ public final class Pipeline {
     /**
      * The metrics associated with this pipeline.
      */
-    public IngestMetric getMetrics() {
+    public IngestPipelineMetric getMetrics() {
         return metrics;
     }
 

+ 2 - 1
server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

@@ -971,7 +971,8 @@ public class NodeStatsTests extends ESTestCase {
                             randomLongBetween(0, maxStatValue),
                             randomLongBetween(0, maxStatValue),
                             randomLongBetween(0, maxStatValue)
-                        )
+                        ),
+                        new IngestStats.ByteStats(randomLongBetween(0, maxStatValue), randomLongBetween(0, maxStatValue))
                     )
                 );
 

+ 73 - 18
server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

@@ -1957,9 +1957,9 @@ public class IngestServiceTests extends ESTestCase {
             // total
             assertStats(ingestStats.totalStats(), 0, 0, 0);
             // pipeline
-            assertPipelineStats(ingestStats.pipelineStats(), "_id1", 0, 0, 0);
-            assertPipelineStats(ingestStats.pipelineStats(), "_id2", 0, 0, 0);
-            assertPipelineStats(ingestStats.pipelineStats(), "_id3", 0, 0, 0);
+            assertPipelineStats(ingestStats.pipelineStats(), "_id1", 0, 0, 0, 0, 0);
+            assertPipelineStats(ingestStats.pipelineStats(), "_id2", 0, 0, 0, 0, 0);
+            assertPipelineStats(ingestStats.pipelineStats(), "_id3", 0, 0, 0, 0, 0);
             // processor
             assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
             assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
@@ -1970,6 +1970,7 @@ public class IngestServiceTests extends ESTestCase {
         final IndexRequest indexRequest = new IndexRequest("_index");
         indexRequest.setPipeline("_id1").setFinalPipeline("_id2");
         indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
+        var startSize = indexRequest.ramBytesUsed();
         ingestService.executeBulkRequest(
             1,
             List.of(indexRequest),
@@ -1988,9 +1989,9 @@ public class IngestServiceTests extends ESTestCase {
             // total
             assertStats(ingestStats.totalStats(), 1, 0, 0);
             // pipeline
-            assertPipelineStats(ingestStats.pipelineStats(), "_id1", 1, 0, 0);
-            assertPipelineStats(ingestStats.pipelineStats(), "_id2", 1, 0, 0);
-            assertPipelineStats(ingestStats.pipelineStats(), "_id3", 1, 0, 0);
+            assertPipelineStats(ingestStats.pipelineStats(), "_id1", 1, 0, 0, startSize, indexRequest.ramBytesUsed());
+            assertPipelineStats(ingestStats.pipelineStats(), "_id2", 1, 0, 0, 0, 0);
+            assertPipelineStats(ingestStats.pipelineStats(), "_id3", 1, 0, 0, 0, 0);
             // processor
             assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
             assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
@@ -2022,6 +2023,7 @@ public class IngestServiceTests extends ESTestCase {
         Map<String, Processor.Factory> map = Maps.newMapWithExpectedSize(2);
         map.put("mock", (factories, tag, description, config) -> processor);
         map.put("failure-mock", (factories, tag, description, config) -> processorFailure);
+        map.put("drop", new DropProcessor.Factory());
         IngestService ingestService = createWithProcessors(map);
 
         final IngestStats initialStats = ingestService.stats();
@@ -2050,6 +2052,7 @@ public class IngestServiceTests extends ESTestCase {
         final IndexRequest indexRequest = new IndexRequest("_index");
         indexRequest.setPipeline("_id1").setFinalPipeline("_none");
         indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
+        var startSize1 = indexRequest.ramBytesUsed();
         ingestService.executeBulkRequest(
             1,
             List.of(indexRequest),
@@ -2061,6 +2064,7 @@ public class IngestServiceTests extends ESTestCase {
             EsExecutors.DIRECT_EXECUTOR_SERVICE
         );
         final IngestStats afterFirstRequestStats = ingestService.stats();
+        var endSize1 = indexRequest.ramBytesUsed();
         assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2));
 
         afterFirstRequestStats.processorStats().get("_id1").forEach(p -> assertEquals(p.name(), "mock:mockTag"));
@@ -2069,13 +2073,14 @@ public class IngestServiceTests extends ESTestCase {
         // total
         assertStats(afterFirstRequestStats.totalStats(), 1, 0, 0);
         // pipeline
-        assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id1", 1, 0, 0);
-        assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id2", 0, 0, 0);
+        assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id1", 1, 0, 0, startSize1, endSize1);
+        assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id2", 0, 0, 0, 0, 0);
         // processor
         assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0);
         assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0);
 
         indexRequest.setPipeline("_id2");
+        var startSize2 = indexRequest.ramBytesUsed();
         ingestService.executeBulkRequest(
             1,
             List.of(indexRequest),
@@ -2087,12 +2092,13 @@ public class IngestServiceTests extends ESTestCase {
             EsExecutors.DIRECT_EXECUTOR_SERVICE
         );
         final IngestStats afterSecondRequestStats = ingestService.stats();
+        var endSize2 = indexRequest.ramBytesUsed();
         assertThat(afterSecondRequestStats.pipelineStats().size(), equalTo(2));
         // total
         assertStats(afterSecondRequestStats.totalStats(), 2, 0, 0);
         // pipeline
-        assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id1", 1, 0, 0);
-        assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id2", 1, 0, 0);
+        assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id1", 1, 0, 0, startSize1, endSize1);
+        assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
         // processor
         assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0);
         assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0);
@@ -2107,6 +2113,7 @@ public class IngestServiceTests extends ESTestCase {
         clusterState = executePut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         indexRequest.setPipeline("_id1");
+        startSize1 += indexRequest.ramBytesUsed();
         ingestService.executeBulkRequest(
             1,
             List.of(indexRequest),
@@ -2118,12 +2125,13 @@ public class IngestServiceTests extends ESTestCase {
             EsExecutors.DIRECT_EXECUTOR_SERVICE
         );
         final IngestStats afterThirdRequestStats = ingestService.stats();
+        endSize1 += indexRequest.ramBytesUsed();
         assertThat(afterThirdRequestStats.pipelineStats().size(), equalTo(2));
         // total
         assertStats(afterThirdRequestStats.totalStats(), 3, 0, 0);
         // pipeline
-        assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id1", 2, 0, 0);
-        assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id2", 1, 0, 0);
+        assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id1", 2, 0, 0, startSize1, endSize1);
+        assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
         // The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is
         // due to the parallel array's used to identify which metrics to carry forward. Without unique ids or semantic equals for each
         // processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases,
@@ -2139,6 +2147,7 @@ public class IngestServiceTests extends ESTestCase {
         clusterState = executePut(putRequest, clusterState);
         ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
         indexRequest.setPipeline("_id1");
+        startSize1 += indexRequest.ramBytesUsed();
         ingestService.executeBulkRequest(
             1,
             List.of(indexRequest),
@@ -2150,16 +2159,47 @@ public class IngestServiceTests extends ESTestCase {
             EsExecutors.DIRECT_EXECUTOR_SERVICE
         );
         final IngestStats afterForthRequestStats = ingestService.stats();
+        endSize1 += indexRequest.ramBytesUsed();
         assertThat(afterForthRequestStats.pipelineStats().size(), equalTo(2));
         // total
         assertStats(afterForthRequestStats.totalStats(), 4, 0, 0);
         // pipeline
-        assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id1", 3, 0, 0);
-        assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id2", 1, 0, 0);
+        assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id1", 3, 0, 0, startSize1, endSize1);
+        assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
         // processor
         assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); // not carried forward since type changed
         assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); // carried forward and added from old stats
         assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0);
+
+        // test with drop processor
+        putRequest = new PutPipelineRequest("_id3", new BytesArray("{\"processors\": [{\"drop\" : {}}]}"), XContentType.JSON);
+        previousClusterState = clusterState;
+        clusterState = executePut(putRequest, clusterState);
+        ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
+        indexRequest.setPipeline("_id3");
+        long startSize3 = indexRequest.ramBytesUsed();
+        ingestService.executeBulkRequest(
+            1,
+            List.of(indexRequest),
+            indexReq -> {},
+            (s) -> false,
+            (slot, targetIndex, e) -> fail("Should not be redirecting failures"),
+            failureHandler,
+            completionHandler,
+            EsExecutors.DIRECT_EXECUTOR_SERVICE
+        );
+        final IngestStats afterFifthRequestStats = ingestService.stats();
+        assertThat(afterFifthRequestStats.pipelineStats().size(), equalTo(3));
+        // total
+        assertStats(afterFifthRequestStats.totalStats(), 5, 0, 0);
+        // pipeline
+        assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id1", 3, 0, 0, startSize1, endSize1);
+        assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2);
+        assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id3", 1, 0, 0, startSize3, 0);
+        // processor
+        assertProcessorStats(0, afterFifthRequestStats, "_id1", 1, 1, 0);
+        assertProcessorStats(1, afterFifthRequestStats, "_id1", 2, 0, 0);
+        assertProcessorStats(0, afterFifthRequestStats, "_id2", 1, 0, 0);
     }
 
     public void testStatName() {
@@ -2979,8 +3019,18 @@ public class IngestServiceTests extends ESTestCase {
         assertStats(stats.processorStats().get(pipelineId).get(processor).stats(), count, failed, time);
     }
 
-    private void assertPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String pipelineId, long count, long failed, long time) {
-        assertStats(getPipelineStats(pipelineStats, pipelineId), count, failed, time);
+    private void assertPipelineStats(
+        List<IngestStats.PipelineStat> pipelineStats,
+        String pipelineId,
+        long count,
+        long failed,
+        long time,
+        long ingested,
+        long produced
+    ) {
+        var pipeline = getPipeline(pipelineStats, pipelineId);
+        assertStats(pipeline.stats(), count, failed, time);
+        assertByteStats(pipeline.byteStats(), ingested, produced);
     }
 
     private void assertStats(IngestStats.Stats stats, long count, long failed, long time) {
@@ -2990,8 +3040,13 @@ public class IngestServiceTests extends ESTestCase {
         assertThat(stats.ingestTimeInMillis(), greaterThanOrEqualTo(time));
     }
 
-    private IngestStats.Stats getPipelineStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
-        return pipelineStats.stream().filter(p1 -> p1.pipelineId().equals(id)).findFirst().map(p2 -> p2.stats()).orElse(null);
+    private void assertByteStats(IngestStats.ByteStats byteStats, long ingested, long produced) {
+        assertThat(byteStats.bytesIngested(), equalTo(ingested));
+        assertThat(byteStats.bytesProduced(), equalTo(produced));
+    }
+
+    private IngestStats.PipelineStat getPipeline(List<IngestStats.PipelineStat> pipelineStats, String id) {
+        return pipelineStats.stream().filter(p1 -> p1.pipelineId().equals(id)).findFirst().orElse(null);
     }
 
     private static List<IngestService.PipelineClusterStateUpdateTask> oneTask(DeletePipelineRequest request) {

+ 53 - 9
server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

@@ -62,11 +62,23 @@ public class IngestStatsTests extends ESTestCase {
         assertThat(
             IngestStats.PipelineStat.merge(first, second),
             containsInAnyOrder(
-                new IngestStats.PipelineStat("pipeline-1", merge(first.get(0).stats(), first.get(1).stats(), second.get(1).stats())),
-                new IngestStats.PipelineStat("pipeline-2", merge(first.get(2).stats(), second.get(0).stats())),
-                new IngestStats.PipelineStat("pipeline-3", merge(first.get(3).stats(), second.get(3).stats())),
-                new IngestStats.PipelineStat("pipeline-4", second.get(2).stats()),
-                new IngestStats.PipelineStat("pipeline-5", first.get(4).stats())
+                new IngestStats.PipelineStat(
+                    "pipeline-1",
+                    merge(first.get(0).stats(), first.get(1).stats(), second.get(1).stats()),
+                    merge(first.get(0).byteStats(), first.get(1).byteStats(), second.get(1).byteStats())
+                ),
+                new IngestStats.PipelineStat(
+                    "pipeline-2",
+                    merge(first.get(2).stats(), second.get(0).stats()),
+                    IngestStats.ByteStats.merge(first.get(2).byteStats(), second.get(0).byteStats())
+                ),
+                new IngestStats.PipelineStat(
+                    "pipeline-3",
+                    merge(first.get(3).stats(), second.get(3).stats()),
+                    IngestStats.ByteStats.merge(first.get(3).byteStats(), second.get(3).byteStats())
+                ),
+                new IngestStats.PipelineStat("pipeline-4", second.get(2).stats(), second.get(2).byteStats()),
+                new IngestStats.PipelineStat("pipeline-5", first.get(4).stats(), first.get(4).byteStats())
             )
         );
     }
@@ -178,10 +190,26 @@ public class IngestStatsTests extends ESTestCase {
         return Arrays.stream(stats).reduce(IngestStats.Stats.IDENTITY, IngestStats.Stats::merge);
     }
 
+    private static IngestStats.ByteStats merge(IngestStats.ByteStats... stats) {
+        return Arrays.stream(stats).reduce(new IngestStats.ByteStats(0, 0), IngestStats.ByteStats::merge);
+    }
+
     private static List<IngestStats.PipelineStat> createPipelineStats() {
-        IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(3, 3, 3, 3));
-        IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(47, 97, 197, 297));
-        IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(0, 0, 0, 0));
+        IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat(
+            "pipeline1",
+            new IngestStats.Stats(3, 3, 3, 3),
+            new IngestStats.ByteStats(123, 456)
+        );
+        IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat(
+            "pipeline2",
+            new IngestStats.Stats(47, 97, 197, 297),
+            new IngestStats.ByteStats(1234567, 34567890)
+        );
+        IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat(
+            "pipeline3",
+            new IngestStats.Stats(0, 0, 0, 0),
+            new IngestStats.ByteStats(0, 0)
+        );
         return List.of(pipeline1Stats, pipeline2Stats, pipeline3Stats);
     }
 
@@ -224,6 +252,10 @@ public class IngestStatsTests extends ESTestCase {
                 getPipelineStats(ingestStats.pipelineStats(), serializedPipelineStat.pipelineId()),
                 serializedPipelineStat.stats()
             );
+            assertEquals(
+                getPipelineByteStats(ingestStats.pipelineStats(), serializedPipelineStat.pipelineId()),
+                serializedPipelineStat.byteStats()
+            );
             List<IngestStats.ProcessorStat> serializedProcessorStats = serializedStats.processorStats()
                 .get(serializedPipelineStat.pipelineId());
             List<IngestStats.ProcessorStat> processorStat = ingestStats.processorStats().get(serializedPipelineStat.pipelineId());
@@ -249,12 +281,20 @@ public class IngestStatsTests extends ESTestCase {
             .orElse(null);
     }
 
+    private static IngestStats.ByteStats getPipelineByteStats(List<IngestStats.PipelineStat> pipelineStats, String id) {
+        return pipelineStats.stream()
+            .filter(p1 -> p1.pipelineId().equals(id))
+            .findFirst()
+            .map(IngestStats.PipelineStat::byteStats)
+            .orElse(null);
+    }
+
     private static IngestStats.ProcessorStat randomProcessorStat(String name, String type) {
         return new IngestStats.ProcessorStat(name, type, randomStats());
     }
 
     private static IngestStats.PipelineStat randomPipelineStat(String id) {
-        return new IngestStats.PipelineStat(id, randomStats());
+        return new IngestStats.PipelineStat(id, randomStats(), randomByteStats());
     }
 
     private static IngestStats.Stats randomStats() {
@@ -264,4 +304,8 @@ public class IngestStatsTests extends ESTestCase {
     private static IngestStats.Stats zeroStats() {
         return new IngestStats.Stats(0, 0, 0, 0);
     }
+
+    private static IngestStats.ByteStats randomByteStats() {
+        return new IngestStats.ByteStats(randomLong(), randomLong());
+    }
 }

+ 183 - 8
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsActionResponseTests.java

@@ -56,7 +56,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
         List<String> pipelineIds = Stream.generate(() -> randomAlphaOfLength(10)).limit(randomIntBetween(0, 10)).toList();
         return new IngestStats(
             new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
-            pipelineIds.stream().map(id -> new IngestStats.PipelineStat(id, randomStats())).collect(Collectors.toList()),
+            pipelineIds.stream().map(id -> new IngestStats.PipelineStat(id, randomStats(), randomByteStats())).collect(Collectors.toList()),
             pipelineIds.stream().collect(Collectors.toMap(Function.identity(), (v) -> randomProcessorStats()))
         );
     }
@@ -65,6 +65,10 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
         return new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
     }
 
+    private IngestStats.ByteStats randomByteStats() {
+        return new IngestStats.ByteStats(randomNonNegativeLong(), randomNonNegativeLong());
+    }
+
     private List<IngestStats.ProcessorStat> randomProcessorStats() {
         return Stream.generate(() -> randomAlphaOfLength(10))
             .limit(randomIntBetween(0, 10))
@@ -89,7 +93,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 null,
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 null
@@ -110,7 +128,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 stats.getModelSizeStats(),
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 stats.getDeploymentStats() == null
@@ -168,7 +200,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 stats.getModelSizeStats(),
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 stats.getDeploymentStats() == null
@@ -226,7 +272,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 stats.getModelSizeStats(),
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 stats.getDeploymentStats() == null
@@ -284,7 +344,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 stats.getModelSizeStats(),
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 stats.getDeploymentStats() == null
@@ -343,7 +417,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 stats.getModelSizeStats(),
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 stats.getDeploymentStats() == null
@@ -402,7 +490,21 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                             stats -> new Response.TrainedModelStats(
                                 stats.getModelId(),
                                 stats.getModelSizeStats(),
-                                stats.getIngestStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
                                 stats.getPipelineCount(),
                                 stats.getInferenceStats(),
                                 stats.getDeploymentStats() == null
@@ -450,6 +552,79 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer
                     RESULTS_FIELD
                 )
             );
+        } else if (version.before(TransportVersions.NODE_STATS_INGEST_BYTES)) {
+            // added ByteStats to IngestStats.PipelineStat
+            return new Response(
+                new QueryPage<>(
+                    instance.getResources()
+                        .results()
+                        .stream()
+                        .map(
+                            stats -> new Response.TrainedModelStats(
+                                stats.getModelId(),
+                                stats.getModelSizeStats(),
+                                new IngestStats(
+                                    stats.getIngestStats().totalStats(),
+                                    stats.getIngestStats()
+                                        .pipelineStats()
+                                        .stream()
+                                        .map(
+                                            pipelineStat -> new IngestStats.PipelineStat(
+                                                pipelineStat.pipelineId(),
+                                                pipelineStat.stats(),
+                                                new IngestStats.ByteStats(0, 0)
+                                            )
+                                        )
+                                        .toList(),
+                                    stats.getIngestStats().processorStats()
+                                ),
+                                stats.getPipelineCount(),
+                                stats.getInferenceStats(),
+                                stats.getDeploymentStats() == null
+                                    ? null
+                                    : new AssignmentStats(
+                                        stats.getDeploymentStats().getDeploymentId(),
+                                        stats.getDeploymentStats().getModelId(),
+                                        stats.getDeploymentStats().getThreadsPerAllocation(),
+                                        stats.getDeploymentStats().getNumberOfAllocations(),
+                                        stats.getDeploymentStats().getQueueCapacity(),
+                                        stats.getDeploymentStats().getCacheSize(),
+                                        stats.getDeploymentStats().getStartTime(),
+                                        stats.getDeploymentStats()
+                                            .getNodeStats()
+                                            .stream()
+                                            .map(
+                                                nodeStats -> new AssignmentStats.NodeStats(
+                                                    nodeStats.getNode(),
+                                                    nodeStats.getInferenceCount().orElse(null),
+                                                    nodeStats.getAvgInferenceTime().orElse(null),
+                                                    nodeStats.getAvgInferenceTimeExcludingCacheHit().orElse(null),
+                                                    nodeStats.getLastAccess(),
+                                                    nodeStats.getPendingCount(),
+                                                    nodeStats.getErrorCount(),
+                                                    nodeStats.getCacheHitCount().orElse(null),
+                                                    nodeStats.getRejectedExecutionCount(),
+                                                    nodeStats.getTimeoutCount(),
+                                                    nodeStats.getRoutingState(),
+                                                    nodeStats.getStartTime(),
+                                                    nodeStats.getThreadsPerAllocation(),
+                                                    nodeStats.getNumberOfAllocations(),
+                                                    nodeStats.getPeakThroughput(),
+                                                    nodeStats.getThroughputLastPeriod(),
+                                                    nodeStats.getAvgInferenceTimeLastPeriod(),
+                                                    nodeStats.getCacheHitCountLastPeriod().orElse(null)
+                                                )
+                                            )
+                                            .toList(),
+                                        stats.getDeploymentStats().getPriority()
+                                    )
+                            )
+                        )
+                        .toList(),
+                    instance.getResources().count(),
+                    RESULTS_FIELD
+                )
+            );
         }
         return instance;
     }

+ 27 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java

@@ -442,15 +442,15 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction<
 
     private static IngestStats mergeStats(List<IngestStats> ingestStatsList) {
 
-        Map<String, IngestStatsAccumulator> pipelineStatsAcc = Maps.newLinkedHashMapWithExpectedSize(ingestStatsList.size());
+        Map<String, PipelineStatsAccumulator> pipelineStatsAcc = Maps.newLinkedHashMapWithExpectedSize(ingestStatsList.size());
         Map<String, Map<String, IngestStatsAccumulator>> processorStatsAcc = Maps.newLinkedHashMapWithExpectedSize(ingestStatsList.size());
         IngestStatsAccumulator totalStats = new IngestStatsAccumulator();
         ingestStatsList.forEach(ingestStats -> {
 
             ingestStats.pipelineStats()
                 .forEach(
-                    pipelineStat -> pipelineStatsAcc.computeIfAbsent(pipelineStat.pipelineId(), p -> new IngestStatsAccumulator())
-                        .inc(pipelineStat.stats())
+                    pipelineStat -> pipelineStatsAcc.computeIfAbsent(pipelineStat.pipelineId(), p -> new PipelineStatsAccumulator())
+                        .inc(pipelineStat)
                 );
 
             ingestStats.processorStats().forEach((pipelineId, processorStat) -> {
@@ -468,7 +468,9 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction<
 
         List<IngestStats.PipelineStat> pipelineStatList = new ArrayList<>(pipelineStatsAcc.size());
         pipelineStatsAcc.forEach(
-            (pipelineId, accumulator) -> pipelineStatList.add(new IngestStats.PipelineStat(pipelineId, accumulator.build()))
+            (pipelineId, accumulator) -> pipelineStatList.add(
+                new IngestStats.PipelineStat(pipelineId, accumulator.buildStats(), accumulator.buildByteStats())
+            )
         );
 
         Map<String, List<IngestStats.ProcessorStat>> processorStatList = Maps.newLinkedHashMapWithExpectedSize(processorStatsAcc.size());
@@ -509,4 +511,25 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction<
         }
     }
 
+    private static class PipelineStatsAccumulator {
+        IngestStatsAccumulator ingestStatsAccumulator = new IngestStatsAccumulator();
+        CounterMetric ingestBytesConsumed = new CounterMetric();
+        CounterMetric ingestBytesProduced = new CounterMetric();
+
+        void inc(IngestStats.PipelineStat s) {
+            ingestStatsAccumulator.inc(s.stats());
+            ingestBytesConsumed.inc(s.byteStats().bytesIngested());
+            ingestBytesProduced.inc(s.byteStats().bytesProduced());
+        }
+
+        IngestStats.Stats buildStats() {
+            return ingestStatsAccumulator.build();
+        }
+
+        IngestStats.ByteStats buildByteStats() {
+            return new IngestStats.ByteStats(ingestBytesConsumed.count(), ingestBytesProduced.count());
+        }
+
+    }
+
 }

+ 11 - 9
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java

@@ -145,9 +145,9 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             buildNodeStats(
                 new IngestStats.Stats(2, 2, 3, 4),
                 Arrays.asList(
-                    new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(0, 0, 3, 1)),
-                    new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(1, 1, 0, 1)),
-                    new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(2, 1, 1, 1))
+                    new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(0, 0, 3, 1), new IngestStats.ByteStats(789, 0)),
+                    new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(1, 1, 0, 1), new IngestStats.ByteStats(123, 123)),
+                    new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(2, 1, 1, 1), new IngestStats.ByteStats(1234, 5678))
                 ),
                 Arrays.asList(
                     Arrays.asList(
@@ -169,9 +169,9 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
             buildNodeStats(
                 new IngestStats.Stats(15, 5, 3, 4),
                 Arrays.asList(
-                    new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 3, 1)),
-                    new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(1, 1, 0, 1)),
-                    new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(2, 1, 1, 1))
+                    new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 3, 1), new IngestStats.ByteStats(5678, 123456)),
+                    new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(1, 1, 0, 1), new IngestStats.ByteStats(111, 222)),
+                    new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(2, 1, 1, 1), new IngestStats.ByteStats(555, 777))
                 ),
                 Arrays.asList(
                     Arrays.asList(
@@ -206,7 +206,9 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
 
         IngestStats expectedStatsModel1 = new IngestStats(
             new IngestStats.Stats(10, 1, 6, 2),
-            Collections.singletonList(new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 6, 2))),
+            Collections.singletonList(
+                new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 6, 2), new IngestStats.ByteStats(6467, 123456))
+            ),
             Collections.singletonMap(
                 "pipeline1",
                 Arrays.asList(
@@ -219,8 +221,8 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase {
         IngestStats expectedStatsModel2 = new IngestStats(
             new IngestStats.Stats(12, 3, 6, 4),
             Arrays.asList(
-                new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 6, 2)),
-                new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(2, 2, 0, 2))
+                new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 6, 2), new IngestStats.ByteStats(6467, 123456)),
+                new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(2, 2, 0, 2), new IngestStats.ByteStats(234, 345))
             ),
             new HashMap<>() {
                 {