Browse Source

[Transform][Rollup] add processing stats to record the time sp… (#53770)

add 2 additional stats: processing time and processing total which capture the
time spent for processing results and how often it ran. The 2 new stats
correspond to the existing indexing and search stats. Together with indexing
and search this now allows the user to see the full picture, all 3 stages.
Hendrik Muhs 5 years ago
parent
commit
da9273ab7b
27 changed files with 583 additions and 201 deletions
  1. 30 3
      client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java
  2. 7 3
      client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java
  3. 92 27
      client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformIndexerStats.java
  4. 2 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java
  5. 5 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/rollup/GetRollupJobResponseTests.java
  6. 90 28
      client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformIndexerStatsTests.java
  7. 26 10
      client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformIndexerStatsTests.java
  8. 38 32
      client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformStatsTests.java
  9. 10 4
      docs/reference/rollup/apis/get-job.asciidoc
  10. 4 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java
  11. 38 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerJobStats.java
  12. 12 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupIndexerJobStats.java
  13. 56 30
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java
  14. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java
  15. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java
  16. 2 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupIndexerJobStatsTests.java
  17. 91 18
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java
  18. 29 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java
  19. 6 6
      x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java
  20. 6 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml
  21. 4 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml
  22. 2 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml
  23. 0 5
      x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java
  24. 11 7
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java
  25. 7 0
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java
  26. 9 7
      x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformInfoTransportActionTests.java
  27. 3 3
      x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml

+ 30 - 3
client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java

@@ -31,8 +31,10 @@ public abstract class IndexerJobStats {
     public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
     public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
     public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
+    public static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
     public static ParseField INDEX_TOTAL = new ParseField("index_total");
     public static ParseField SEARCH_TOTAL = new ParseField("search_total");
+    public static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
     public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
     public static ParseField INDEX_FAILURES = new ParseField("index_failures");
 
@@ -44,11 +46,14 @@ public abstract class IndexerJobStats {
     protected final long indexTotal;
     protected final long searchTime;
     protected final long searchTotal;
+    protected final long processingTime;
+    protected final long processingTotal;
     protected final long indexFailures;
     protected final long searchFailures;
 
     public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations,
-                           long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
+                           long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal, long processingTotal,
+                           long indexFailures, long searchFailures) {
         this.numPages = numPages;
         this.numInputDocuments = numInputDocuments;
         this.numOuputDocuments = numOutputDocuments;
@@ -57,6 +62,8 @@ public abstract class IndexerJobStats {
         this.indexTotal = indexTotal;
         this.searchTime = searchTime;
         this.searchTotal = searchTotal;
+        this.processingTime = processingTime;
+        this.processingTotal = processingTotal;
         this.indexFailures = indexFailures;
         this.searchFailures = searchFailures;
     }
@@ -117,6 +124,13 @@ public abstract class IndexerJobStats {
         return searchTime;
     }
 
+    /**
+     * Returns the time spent processing (cumulative) in milliseconds
+     */
+    public long getProcessingTime() {
+        return processingTime;
+    }
+
     /**
      * Returns the total number of indexing requests that have been processed
      * (Note: this is not the number of _documents_ that have been indexed)
@@ -132,6 +146,14 @@ public abstract class IndexerJobStats {
         return searchTotal;
     }
 
+    /**
+     * Returns the total number of processing runs that have been made
+     */
+    public long getProcessingTotal() {
+        return processingTotal;
+    }
+
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {
@@ -149,16 +171,19 @@ public abstract class IndexerJobStats {
                 && Objects.equals(this.numInvocations, that.numInvocations)
                 && Objects.equals(this.indexTime, that.indexTime)
                 && Objects.equals(this.searchTime, that.searchTime)
+                && Objects.equals(this.processingTime, that.processingTime)
                 && Objects.equals(this.indexFailures, that.indexFailures)
                 && Objects.equals(this.searchFailures, that.searchFailures)
                 && Objects.equals(this.searchTotal, that.searchTotal)
+                && Objects.equals(this.processingTotal, that.processingTotal)
                 && Objects.equals(this.indexTotal, that.indexTotal);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
-                indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
+                indexTime, searchTime, processingTime, indexFailures, searchFailures, searchTotal,
+                indexTotal, processingTotal);
     }
 
     @Override
@@ -172,6 +197,8 @@ public abstract class IndexerJobStats {
                 + ", index_time_in_ms=" + indexTime
                 + ", index_total=" + indexTotal
                 + ", search_time_in_ms=" + searchTime
-                + ", search_total=" + searchTotal+ "}";
+                + ", search_total=" + searchTotal
+                + ", processing_time_in_ms=" + processingTime
+                + ", processing_total=" + processingTotal + "}";
     }
 }

+ 7 - 3
client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java

@@ -177,16 +177,18 @@ public class GetRollupJobResponse {
     public static class RollupIndexerJobStats extends IndexerJobStats {
 
         RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
-                              long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
+                              long indexTime, long indexTotal, long searchTime, long searchTotal, long processingTime,
+                              long processingTotal, long indexFailures, long searchFailures) {
             super(numPages, numInputDocuments, numOuputDocuments, numInvocations,
-                    indexTime, searchTime, indexTotal, searchTotal, indexFailures, searchFailures);
+                    indexTime, searchTime, processingTime, indexTotal, searchTotal, processingTotal, indexFailures, searchFailures);
         }
 
         private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
                 STATS.getPreferredName(),
                 true,
                 args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
-                    (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
+                    (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
+                    (long) args[10], (long) args[11]));
         static {
             PARSER.declareLong(constructorArg(), NUM_PAGES);
             PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
@@ -196,6 +198,8 @@ public class GetRollupJobResponse {
             PARSER.declareLong(constructorArg(), INDEX_TOTAL);
             PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
             PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
+            PARSER.declareLong(constructorArg(), PROCESSING_TIME_IN_MS);
+            PARSER.declareLong(constructorArg(), PROCESSING_TOTAL);
             PARSER.declareLong(constructorArg(), INDEX_FAILURES);
             PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
         }

+ 92 - 27
client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformIndexerStats.java

@@ -27,7 +27,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import java.io.IOException;
 import java.util.Objects;
 
-import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
 
 public class TransformIndexerStats extends IndexerJobStats {
@@ -39,21 +38,38 @@ public class TransformIndexerStats extends IndexerJobStats {
     public static final ConstructingObjectParser<TransformIndexerStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
         NAME,
         true,
-        args -> new TransformIndexerStats((long) args[0], (long) args[1], (long) args[2],
-            (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
-            (Double) args[10], (Double) args[11], (Double) args[12]));
+        args -> new TransformIndexerStats(
+            unboxSafe(args[0], 0L),
+            unboxSafe(args[1], 0L),
+            unboxSafe(args[2], 0L),
+            unboxSafe(args[3], 0L),
+            unboxSafe(args[4], 0L),
+            unboxSafe(args[5], 0L),
+            unboxSafe(args[6], 0L),
+            unboxSafe(args[7], 0L),
+            unboxSafe(args[8], 0L),
+            unboxSafe(args[9], 0L),
+            unboxSafe(args[10], 0L),
+            unboxSafe(args[11], 0L),
+            unboxSafe(args[12], 0.0),
+            unboxSafe(args[13], 0.0),
+            unboxSafe(args[14], 0.0)
+        )
+    );
 
     static {
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
-        LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
-        LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
-        LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
-        LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
-        LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
-        LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_PAGES);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INPUT_DOCUMENTS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_OUTPUT_DOCUMENTS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INVOCATIONS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TIME_IN_MS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TIME_IN_MS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TIME_IN_MS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TOTAL);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TOTAL);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TOTAL);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_FAILURES);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_FAILURES);
         LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
         LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
         LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
@@ -67,16 +83,40 @@ public class TransformIndexerStats extends IndexerJobStats {
     private final double expAvgDocumentsIndexed;
     private final double expAvgDocumentsProcessed;
 
-    public TransformIndexerStats(long numPages, long numInputDocuments, long numOuputDocuments,
-                                 long numInvocations, long indexTime, long searchTime,
-                                 long indexTotal, long searchTotal, long indexFailures, long searchFailures,
-                                 Double expAvgCheckpointDurationMs, Double expAvgDocumentsIndexed,
-                                 Double expAvgDocumentsProcessed) {
-        super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
-                indexTotal, searchTotal, indexFailures, searchFailures);
-        this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
-        this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
-        this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
+    public TransformIndexerStats(
+        long numPages,
+        long numInputDocuments,
+        long numOuputDocuments,
+        long numInvocations,
+        long indexTime,
+        long searchTime,
+        long processingTime,
+        long indexTotal,
+        long searchTotal,
+        long processingTotal,
+        long indexFailures,
+        long searchFailures,
+        double expAvgCheckpointDurationMs,
+        double expAvgDocumentsIndexed,
+        double expAvgDocumentsProcessed
+    ) {
+        super(
+            numPages,
+            numInputDocuments,
+            numOuputDocuments,
+            numInvocations,
+            indexTime,
+            searchTime,
+            processingTime,
+            indexTotal,
+            searchTotal,
+            processingTotal,
+            indexFailures,
+            searchFailures
+        );
+        this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs;
+        this.expAvgDocumentsIndexed = expAvgDocumentsIndexed;
+        this.expAvgDocumentsProcessed = expAvgDocumentsProcessed;
     }
 
     public double getExpAvgCheckpointDurationMs() {
@@ -109,10 +149,12 @@ public class TransformIndexerStats extends IndexerJobStats {
             && Objects.equals(this.numInvocations, that.numInvocations)
             && Objects.equals(this.indexTime, that.indexTime)
             && Objects.equals(this.searchTime, that.searchTime)
+            && Objects.equals(this.processingTime, that.processingTime)
             && Objects.equals(this.indexFailures, that.indexFailures)
             && Objects.equals(this.searchFailures, that.searchFailures)
             && Objects.equals(this.indexTotal, that.indexTotal)
             && Objects.equals(this.searchTotal, that.searchTotal)
+            && Objects.equals(this.processingTotal, that.processingTotal)
             && Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
             && Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
             && Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
@@ -120,8 +162,31 @@ public class TransformIndexerStats extends IndexerJobStats {
 
     @Override
     public int hashCode() {
-        return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
-            indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
-            expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
+        return Objects.hash(
+            numPages,
+            numInputDocuments,
+            numOuputDocuments,
+            numInvocations,
+            indexTime,
+            searchTime,
+            processingTime,
+            indexFailures,
+            searchFailures,
+            indexTotal,
+            searchTotal,
+            processingTotal,
+            expAvgCheckpointDurationMs,
+            expAvgDocumentsIndexed,
+            expAvgDocumentsProcessed
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T unboxSafe(Object l, T default_value) {
+        if (l == null) {
+            return default_value;
+        } else {
+            return (T) l;
+        }
     }
 }

+ 2 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/TransformIT.java

@@ -431,6 +431,8 @@ public class TransformIT extends ESRestHighLevelClientTestCase {
         assertEquals(TransformStats.State.STOPPED, stats.getState());
 
         TransformIndexerStats zeroIndexerStats = new TransformIndexerStats(
+            0L,
+            0L,
             0L,
             0L,
             0L,

+ 5 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/rollup/GetRollupJobResponseTests.java

@@ -64,8 +64,9 @@ public class GetRollupJobResponseTests extends ESTestCase {
 
     private RollupIndexerJobStats randomStats() {
         return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-            randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-            randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
+            randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+            randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+            randomNonNegativeLong());
     }
 
     private RollupJobStatus randomStatus() {
@@ -120,6 +121,8 @@ public class GetRollupJobResponseTests extends ESTestCase {
         builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
         builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
         builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
+        builder.field(IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
+        builder.field(IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
         builder.endObject();
     }
 

+ 90 - 28
client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformIndexerStatsTests.java

@@ -31,41 +31,103 @@ public class TransformIndexerStatsTests extends ESTestCase {
 
     public void testFromXContent() throws IOException {
         xContentTester(
-                this::createParser,
-                TransformIndexerStatsTests::randomStats,
-                TransformIndexerStatsTests::toXContent,
-                TransformIndexerStats::fromXContent)
-                .supportsUnknownFields(true)
-                .test();
+            this::createParser,
+            TransformIndexerStatsTests::randomStats,
+            TransformIndexerStatsTests::toXContent,
+            TransformIndexerStats::fromXContent
+        ).supportsUnknownFields(true).test();
     }
 
     public static TransformIndexerStats randomStats() {
-        return new TransformIndexerStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-                randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-                randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-            randomBoolean() ? null : randomDouble(),
-            randomBoolean() ? null : randomDouble(),
-            randomBoolean() ? null : randomDouble());
+        return new TransformIndexerStats(
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomNonNegativeLong(),
+            randomDouble(),
+            randomDouble(),
+            randomDouble()
+        );
     }
 
     public static void toXContent(TransformIndexerStats stats, XContentBuilder builder) throws IOException {
         builder.startObject();
-        builder.field(IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
-        builder.field(IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
-        builder.field(IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
-        builder.field(IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
-        builder.field(IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
-        builder.field(IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
-        builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
-        builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
-        builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
-        builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
-        builder.field(TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
-            stats.getExpAvgCheckpointDurationMs());
-        builder.field(TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
-            stats.getExpAvgDocumentsIndexed());
-        builder.field(TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
-            stats.getExpAvgDocumentsProcessed());
+        if (randomBoolean()) {
+            builder.field(IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
+            builder.field(IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
+            builder.field(IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
+            builder.field(IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
+            builder.field(IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
+            builder.field(IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
+            builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
+            builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
+            builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
+            builder.field(IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
+            builder.field(IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
+            builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
+            builder.field(
+                TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
+                stats.getExpAvgCheckpointDurationMs()
+            );
+            builder.field(TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), stats.getExpAvgDocumentsIndexed());
+            builder.field(
+                TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
+                stats.getExpAvgDocumentsProcessed()
+            );
+        } else {
+            // a toXContent version which leaves out field with value 0 (simulating the case that an older version misses a field)
+            xContentFieldIfNotZero(builder, IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
+            xContentFieldIfNotZero(builder, IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
+            xContentFieldIfNotZero(builder, IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
+            xContentFieldIfNotZero(builder, IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
+            xContentFieldIfNotZero(builder, IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
+            xContentFieldIfNotZero(builder, IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
+            xContentFieldIfNotZero(builder, IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
+            xContentFieldIfNotZero(builder, IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
+            xContentFieldIfNotZero(builder, IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
+            xContentFieldIfNotZero(builder, IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
+            xContentFieldIfNotZero(builder, IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
+            xContentFieldIfNotZero(builder, IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
+            xContentFieldIfNotZero(
+                builder,
+                TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
+                stats.getExpAvgCheckpointDurationMs()
+            );
+            xContentFieldIfNotZero(
+                builder,
+                TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
+                stats.getExpAvgDocumentsIndexed()
+            );
+            xContentFieldIfNotZero(
+                builder,
+                TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
+                stats.getExpAvgDocumentsProcessed()
+            );
+        }
         builder.endObject();
     }
+
+    private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, long value) throws IOException {
+        if (value > 0) {
+            builder.field(name, value);
+        }
+
+        return builder;
+    }
+
+    private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, double value) throws IOException {
+        if (value > 0.0) {
+            builder.field(name, value);
+        }
+
+        return builder;
+    }
 }

+ 26 - 10
client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformIndexerStatsTests.java

@@ -30,17 +30,31 @@ import static org.elasticsearch.client.transform.transforms.hlrc.TransformStatsT
 
 public class TransformIndexerStatsTests extends AbstractResponseTestCase<
     org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats,
-        TransformIndexerStats> {
+    TransformIndexerStats> {
+
+    public static org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats randomStats() {
+        return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomDouble(),
+            randomDouble(),
+            randomDouble()
+        );
+    }
 
     @Override
     protected org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats createServerTestInstance(XContentType xContentType) {
-        return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(randomLongBetween(10L, 10000L),
-            randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomBoolean() ? null : randomDouble(),
-            randomBoolean() ? null : randomDouble(),
-            randomBoolean() ? null : randomDouble());
+        return randomStats();
     }
 
     @Override
@@ -49,8 +63,10 @@ public class TransformIndexerStatsTests extends AbstractResponseTestCase<
     }
 
     @Override
-    protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
-                                   TransformIndexerStats clientInstance) {
+    protected void assertInstances(
+        org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
+        TransformIndexerStats clientInstance
+    ) {
         assertTransformIndexerStats(serverTestInstance, clientInstance);
     }
 }

+ 38 - 32
client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformStatsTests.java

@@ -36,40 +36,36 @@ import java.util.Map;
 
 import static org.hamcrest.Matchers.equalTo;
 
-public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsearch.xpack.core.transform.transforms.TransformStats,
+public class TransformStatsTests extends AbstractResponseTestCase<
+    org.elasticsearch.xpack.core.transform.transforms.TransformStats,
     org.elasticsearch.client.transform.transforms.TransformStats> {
 
     public static org.elasticsearch.xpack.core.transform.transforms.NodeAttributes randomNodeAttributes() {
         int numberOfAttributes = randomIntBetween(1, 10);
         Map<String, String> attributes = new HashMap<>(numberOfAttributes);
-        for(int i = 0; i < numberOfAttributes; i++) {
+        for (int i = 0; i < numberOfAttributes; i++) {
             String val = randomAlphaOfLength(10);
-            attributes.put("key-"+i, val);
+            attributes.put("key-" + i, val);
         }
-        return new org.elasticsearch.xpack.core.transform.transforms.NodeAttributes(randomAlphaOfLength(10),
+        return new org.elasticsearch.xpack.core.transform.transforms.NodeAttributes(
             randomAlphaOfLength(10),
             randomAlphaOfLength(10),
             randomAlphaOfLength(10),
-            attributes);
+            randomAlphaOfLength(10),
+            attributes
+        );
     }
 
-    public static org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats randomStats() {
-        return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(randomLongBetween(10L, 10000L),
-            randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomBoolean() ? null : randomDouble(),
-            randomBoolean() ? null : randomDouble(),
-            randomBoolean() ? null : randomDouble());
-    }
     @Override
     protected org.elasticsearch.xpack.core.transform.transforms.TransformStats createServerTestInstance(XContentType xContentType) {
-        return new org.elasticsearch.xpack.core.transform.transforms.TransformStats(randomAlphaOfLength(10),
+        return new org.elasticsearch.xpack.core.transform.transforms.TransformStats(
+            randomAlphaOfLength(10),
             randomFrom(org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.values()),
             randomBoolean() ? null : randomAlphaOfLength(100),
             randomBoolean() ? null : randomNodeAttributes(),
-            randomStats(),
-            TransformCheckpointingInfoTests.randomTransformCheckpointingInfo());
+            TransformIndexerStatsTests.randomStats(),
+            TransformCheckpointingInfoTests.randomTransformCheckpointingInfo()
+        );
     }
 
     @Override
@@ -78,8 +74,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
     }
 
     @Override
-    protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformStats serverTestInstance,
-                                   TransformStats clientInstance) {
+    protected void assertInstances(
+        org.elasticsearch.xpack.core.transform.transforms.TransformStats serverTestInstance,
+        TransformStats clientInstance
+    ) {
         assertThat(serverTestInstance.getId(), equalTo(clientInstance.getId()));
         assertThat(serverTestInstance.getState().value(), equalTo(clientInstance.getState().value()));
         assertTransformIndexerStats(serverTestInstance.getIndexerStats(), clientInstance.getIndexerStats());
@@ -88,8 +86,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
         assertThat(serverTestInstance.getReason(), equalTo(clientInstance.getReason()));
     }
 
-    private void assertNodeAttributes(org.elasticsearch.xpack.core.transform.transforms.NodeAttributes serverTestInstance,
-                                      NodeAttributes clientInstance) {
+    private void assertNodeAttributes(
+        org.elasticsearch.xpack.core.transform.transforms.NodeAttributes serverTestInstance,
+        NodeAttributes clientInstance
+    ) {
         if (serverTestInstance == null || clientInstance == null) {
             assertNull(serverTestInstance);
             assertNull(clientInstance);
@@ -102,8 +102,10 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
         assertThat(serverTestInstance.getTransportAddress(), equalTo(clientInstance.getTransportAddress()));
     }
 
-    public static void assertTransformProgress(org.elasticsearch.xpack.core.transform.transforms.TransformProgress serverTestInstance,
-                                         TransformProgress clientInstance) {
+    public static void assertTransformProgress(
+        org.elasticsearch.xpack.core.transform.transforms.TransformProgress serverTestInstance,
+        TransformProgress clientInstance
+    ) {
         if (serverTestInstance == null || clientInstance == null) {
             assertNull(serverTestInstance);
             assertNull(clientInstance);
@@ -115,16 +117,18 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
         assertThat(serverTestInstance.getDocumentsIndexed(), equalTo(clientInstance.getDocumentsIndexed()));
     }
 
-    public static void assertPosition(org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition serverTestInstance,
-                                TransformIndexerPosition clientInstance) {
+    public static void assertPosition(
+        org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition serverTestInstance,
+        TransformIndexerPosition clientInstance
+    ) {
         assertThat(serverTestInstance.getIndexerPosition(), equalTo(clientInstance.getIndexerPosition()));
         assertThat(serverTestInstance.getBucketsPosition(), equalTo(clientInstance.getBucketsPosition()));
     }
 
-
     public static void assertTransformCheckpointStats(
-            org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats serverTestInstance,
-            TransformCheckpointStats clientInstance) {
+        org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats serverTestInstance,
+        TransformCheckpointStats clientInstance
+    ) {
         assertTransformProgress(serverTestInstance.getCheckpointProgress(), clientInstance.getCheckpointProgress());
         assertThat(serverTestInstance.getCheckpoint(), equalTo(clientInstance.getCheckpoint()));
         assertPosition(serverTestInstance.getPosition(), clientInstance.getPosition());
@@ -133,8 +137,9 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
     }
 
     public static void assertTransformCheckpointInfo(
-            org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
-            TransformCheckpointingInfo clientInstance) {
+        org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance,
+        TransformCheckpointingInfo clientInstance
+    ) {
         assertTransformCheckpointStats(serverTestInstance.getNext(), clientInstance.getNext());
         assertTransformCheckpointStats(serverTestInstance.getLast(), clientInstance.getLast());
         assertThat(serverTestInstance.getChangesLastDetectedAt(), equalTo(clientInstance.getChangesLastDetectedAt()));
@@ -142,8 +147,9 @@ public class TransformStatsTests extends AbstractResponseTestCase<org.elasticsea
     }
 
     public static void assertTransformIndexerStats(
-            org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
-            TransformIndexerStats clientInstance) {
+        org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats serverTestInstance,
+        TransformIndexerStats clientInstance
+    ) {
         assertThat(serverTestInstance.getExpAvgCheckpointDurationMs(), equalTo(clientInstance.getExpAvgCheckpointDurationMs()));
         assertThat(serverTestInstance.getExpAvgDocumentsProcessed(), equalTo(clientInstance.getExpAvgDocumentsProcessed()));
         assertThat(serverTestInstance.getExpAvgDocumentsIndexed(), equalTo(clientInstance.getExpAvgDocumentsIndexed()));

+ 10 - 4
docs/reference/rollup/apis/get-job.asciidoc

@@ -142,14 +142,16 @@ The API yields the following response:
             "index_total": 0,
             "search_failures": 0,
             "search_time_in_ms": 0,
-            "search_total": 0
+            "search_total": 0,
+            "processing_time_in_ms": 0,
+            "processing_total": 0
           }
         }
     ]
 }
 ----
 
-The `jobs` array contains a single job (`id: sensor`) since we requested a single job in the endpoint's URL. 
+The `jobs` array contains a single job (`id: sensor`) since we requested a single job in the endpoint's URL.
 If we add another job, we can see how multi-job responses are handled:
 
 [source,console]
@@ -245,7 +247,9 @@ Which will yield the following response:
             "index_total": 0,
             "search_failures": 0,
             "search_time_in_ms": 0,
-            "search_total": 0
+            "search_total": 0,
+            "processing_time_in_ms": 0,
+            "processing_total": 0
           }
         },
         {
@@ -299,7 +303,9 @@ Which will yield the following response:
             "index_total": 0,
             "search_failures": 0,
             "search_time_in_ms": 0,
-            "search_total": 0
+            "search_total": 0,
+            "processing_time_in_ms": 0,
+            "processing_total": 0
           }
         }
     ]

+ 4 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -347,7 +347,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
 
             // allowPartialSearchResults is set to false, so we should never see shard failures here
             assert (searchResponse.getShardFailures().length == 0);
-
+            stats.markStartProcessing();
             stats.incrementNumPages(1);
             IterationResult<JobPosition> iterationResult = doProcess(searchResponse);
 
@@ -355,11 +355,11 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
                 logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down.");
 
                 position.set(iterationResult.getPosition());
+                stats.markEndProcessing();
                 // execute finishing tasks
                 onFinish(ActionListener.wrap(
                         r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
                         e -> doSaveState(finishAndSetState(), position.get(), () -> {})));
-
                 return;
             }
 
@@ -369,7 +369,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
             if (docs.isEmpty() == false) {
                 final BulkRequest bulkRequest = new BulkRequest();
                 docs.forEach(bulkRequest::add);
-
+                stats.markEndProcessing();
                 stats.markStartIndexing();
                 doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
                     // TODO we should check items in the response and move after accordingly to
@@ -390,6 +390,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
                     onBulkResponse(bulkResponse, newPosition);
                 }, this::finishWithIndexingFailure));
             } else {
+                stats.markEndProcessing();
                 // no documents need to be indexed, continue with search
                 try {
                     JobPosition newPosition = iterationResult.getPosition();

+ 38 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerJobStats.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.core.indexing;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -31,26 +32,31 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
     protected long searchTime = 0;
     protected long indexTotal = 0;
     protected long searchTotal = 0;
+    protected long processingTime = 0;
+    protected long processingTotal = 0;
     protected long indexFailures = 0;
     protected long searchFailures = 0;
 
     private long startIndexTime;
     private long startSearchTime;
+    private long startProcessingTime;
 
     public IndexerJobStats() {
     }
 
     public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
-                           long indexTime, long searchTime, long indexTotal, long searchTotal,
-                           long indexFailures, long searchFailures) {
+                           long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal,
+                           long processingTotal, long indexFailures, long searchFailures) {
         this.numPages = numPages;
         this.numInputDocuments = numInputDocuments;
         this.numOuputDocuments = numOuputDocuments;
         this.numInvocations = numInvocations;
         this.indexTime = indexTime;
         this.searchTime = searchTime;
+        this.processingTime = processingTime;
         this.indexTotal = indexTotal;
         this.searchTotal = searchTotal;
+        this.processingTotal = processingTotal;
         this.indexFailures = indexFailures;
         this.searchFailures = searchFailures;
     }
@@ -66,6 +72,11 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
         this.searchTotal = in.readVLong();
         this.indexFailures = in.readVLong();
         this.searchFailures = in.readVLong();
+
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo V_7_7_0
+            this.processingTime = in.readVLong();
+            this.processingTotal = in.readVLong();
+        }
     }
 
     public long getNumPages() {
@@ -100,6 +111,10 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
         return searchTime;
     }
 
+    public long getProcessingTime() {
+        return processingTime;
+    }
+
     public long getIndexTotal() {
         return indexTotal;
     }
@@ -108,6 +123,10 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
         return searchTotal;
     }
 
+    public long getProcessingTotal() {
+        return processingTotal;
+    }
+
     public void incrementNumPages(long n) {
         assert(n >= 0);
         numPages += n;
@@ -154,6 +173,15 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
         searchTotal += 1;
     }
 
+    public void markStartProcessing() {
+        this.startProcessingTime = System.nanoTime();
+    }
+
+    public void markEndProcessing() {
+        processingTime += ((System.nanoTime() - startProcessingTime) / 1000000);
+        processingTotal += 1;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeVLong(numPages);
@@ -166,6 +194,10 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
         out.writeVLong(searchTotal);
         out.writeVLong(indexFailures);
         out.writeVLong(searchFailures);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeVLong(processingTime);
+            out.writeVLong(processingTotal);
+        }
     }
 
     @Override
@@ -186,15 +218,17 @@ public abstract class IndexerJobStats implements ToXContentObject, Writeable {
             && Objects.equals(this.numInvocations, that.numInvocations)
             && Objects.equals(this.indexTime, that.indexTime)
             && Objects.equals(this.searchTime, that.searchTime)
+            && Objects.equals(this.processingTime, that.processingTime)
             && Objects.equals(this.indexFailures, that.indexFailures)
             && Objects.equals(this.searchFailures, that.searchFailures)
             && Objects.equals(this.indexTotal, that.indexTotal)
-            && Objects.equals(this.searchTotal, that.searchTotal);
+            && Objects.equals(this.searchTotal, that.searchTotal)
+            && Objects.equals(this.processingTotal, that.processingTotal);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
-            indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal);
+            indexTime, searchTime, processingTime, indexFailures, searchFailures, indexTotal, searchTotal, processingTotal);
     }
 }

+ 12 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupIndexerJobStats.java

@@ -27,15 +27,18 @@ public class RollupIndexerJobStats extends IndexerJobStats {
     private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
     private static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
     private static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
+    private static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
     private static ParseField INDEX_TOTAL = new ParseField("index_total");
     private static ParseField SEARCH_TOTAL = new ParseField("search_total");
+    private static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
     private static ParseField SEARCH_FAILURES = new ParseField("search_failures");
     private static ParseField INDEX_FAILURES = new ParseField("index_failures");
 
     public static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER =
         new ConstructingObjectParser<>(NAME.getPreferredName(),
             args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
-                (long) args[4], (long) args[5], (long) args[6], (long) args[7],  (long) args[8], (long) args[9]));
+                (long) args[4], (long) args[5], (long) args[6], (long) args[7],  (long) args[8], (long) args[9],
+                (long) args[10], (long) args[11]));
 
     static {
         PARSER.declareLong(constructorArg(), NUM_PAGES);
@@ -44,8 +47,10 @@ public class RollupIndexerJobStats extends IndexerJobStats {
         PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
         PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
         PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
+        PARSER.declareLong(constructorArg(), PROCESSING_TIME_IN_MS);
         PARSER.declareLong(constructorArg(), INDEX_TOTAL);
         PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
+        PARSER.declareLong(constructorArg(), PROCESSING_TOTAL);
         PARSER.declareLong(constructorArg(), INDEX_FAILURES);
         PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
     }
@@ -55,10 +60,10 @@ public class RollupIndexerJobStats extends IndexerJobStats {
     }
 
     public RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
-                                 long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures,
-                                 long searchFailures) {
-        super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
-            indexTotal, searchTotal, indexFailures, searchFailures);
+                                 long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal,
+                                 long processingTotal, long indexFailures, long searchFailures) {
+        super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime, processingTime,
+            indexTotal, searchTotal, processingTotal, indexFailures, searchFailures);
     }
 
     public RollupIndexerJobStats(StreamInput in) throws IOException {
@@ -78,6 +83,8 @@ public class RollupIndexerJobStats extends IndexerJobStats {
         builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
         builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
         builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
+        builder.field(PROCESSING_TIME_IN_MS.getPreferredName(), processingTime);
+        builder.field(PROCESSING_TOTAL.getPreferredName(), processingTotal);
         builder.endObject();
         return builder;
     }

+ 56 - 30
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java

@@ -19,7 +19,6 @@ import org.elasticsearch.xpack.core.indexing.IndexerJobStats;
 import java.io.IOException;
 import java.util.Objects;
 
-import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
 import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
 
 public class TransformIndexerStats extends IndexerJobStats {
@@ -33,8 +32,10 @@ public class TransformIndexerStats extends IndexerJobStats {
     public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
     public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
     public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
+    public static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
     public static ParseField INDEX_TOTAL = new ParseField("index_total");
     public static ParseField SEARCH_TOTAL = new ParseField("search_total");
+    public static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
     public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
     public static ParseField INDEX_FAILURES = new ParseField("index_failures");
     public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms");
@@ -50,33 +51,37 @@ public class TransformIndexerStats extends IndexerJobStats {
         NAME,
         true,
         args -> new TransformIndexerStats(
-            (long) args[0],
-            (long) args[1],
-            (long) args[2],
-            (long) args[3],
-            (long) args[4],
-            (long) args[5],
-            (long) args[6],
-            (long) args[7],
-            (long) args[8],
-            (long) args[9],
-            (Double) args[10],
-            (Double) args[11],
-            (Double) args[12]
+            unboxSafe(args[0], 0L),
+            unboxSafe(args[1], 0L),
+            unboxSafe(args[2], 0L),
+            unboxSafe(args[3], 0L),
+            unboxSafe(args[4], 0L),
+            unboxSafe(args[5], 0L),
+            unboxSafe(args[6], 0L),
+            unboxSafe(args[7], 0L),
+            unboxSafe(args[8], 0L),
+            unboxSafe(args[9], 0L),
+            unboxSafe(args[10], 0L),
+            unboxSafe(args[11], 0L),
+            unboxSafe(args[12], 0.0),
+            unboxSafe(args[13], 0.0),
+            unboxSafe(args[14], 0.0)
         )
     );
 
     static {
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
-        LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
-        LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
-        LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
-        LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
-        LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
-        LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
-        LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_PAGES);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INPUT_DOCUMENTS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_OUTPUT_DOCUMENTS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), NUM_INVOCATIONS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TIME_IN_MS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TIME_IN_MS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TIME_IN_MS);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_TOTAL);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_TOTAL);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), PROCESSING_TOTAL);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), INDEX_FAILURES);
+        LENIENT_PARSER.declareLong(optionalConstructorArg(), SEARCH_FAILURES);
         LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
         LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
         LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
@@ -100,13 +105,15 @@ public class TransformIndexerStats extends IndexerJobStats {
         long numInvocations,
         long indexTime,
         long searchTime,
+        long processingTime,
         long indexTotal,
         long searchTotal,
+        long processingTotal,
         long indexFailures,
         long searchFailures,
-        Double expAvgCheckpointDurationMs,
-        Double expAvgDocumentsIndexed,
-        Double expAvgDocumentsProcessed
+        double expAvgCheckpointDurationMs,
+        double expAvgDocumentsIndexed,
+        double expAvgDocumentsProcessed
     ) {
         super(
             numPages,
@@ -115,14 +122,16 @@ public class TransformIndexerStats extends IndexerJobStats {
             numInvocations,
             indexTime,
             searchTime,
+            processingTime,
             indexTotal,
             searchTotal,
+            processingTotal,
             indexFailures,
             searchFailures
         );
-        this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
-        this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
-        this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
+        this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs;
+        this.expAvgDocumentsIndexed = expAvgDocumentsIndexed;
+        this.expAvgDocumentsProcessed = expAvgDocumentsProcessed;
     }
 
     public TransformIndexerStats(TransformIndexerStats other) {
@@ -133,8 +142,10 @@ public class TransformIndexerStats extends IndexerJobStats {
             other.numInvocations,
             other.indexTime,
             other.searchTime,
+            other.processingTime,
             other.indexTotal,
             other.searchTotal,
+            other.processingTotal,
             other.indexFailures,
             other.searchFailures,
             other.expAvgCheckpointDurationMs,
@@ -181,6 +192,8 @@ public class TransformIndexerStats extends IndexerJobStats {
         builder.field(SEARCH_TIME_IN_MS.getPreferredName(), searchTime);
         builder.field(SEARCH_TOTAL.getPreferredName(), searchTotal);
         builder.field(SEARCH_FAILURES.getPreferredName(), searchFailures);
+        builder.field(PROCESSING_TIME_IN_MS.getPreferredName(), processingTime);
+        builder.field(PROCESSING_TOTAL.getPreferredName(), processingTotal);
         builder.field(EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(), this.expAvgCheckpointDurationMs);
         builder.field(EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(), this.expAvgDocumentsIndexed);
         builder.field(EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), this.expAvgDocumentsProcessed);
@@ -238,10 +251,12 @@ public class TransformIndexerStats extends IndexerJobStats {
             && Objects.equals(this.numInvocations, that.numInvocations)
             && Objects.equals(this.indexTime, that.indexTime)
             && Objects.equals(this.searchTime, that.searchTime)
+            && Objects.equals(this.processingTime, that.processingTime)
             && Objects.equals(this.indexFailures, that.indexFailures)
             && Objects.equals(this.searchFailures, that.searchFailures)
             && Objects.equals(this.indexTotal, that.indexTotal)
             && Objects.equals(this.searchTotal, that.searchTotal)
+            && Objects.equals(this.processingTotal, that.processingTotal)
             && Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
             && Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
             && Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
@@ -256,10 +271,12 @@ public class TransformIndexerStats extends IndexerJobStats {
             numInvocations,
             indexTime,
             searchTime,
+            processingTime,
             indexFailures,
             searchFailures,
             indexTotal,
             searchTotal,
+            processingTotal,
             expAvgCheckpointDurationMs,
             expAvgDocumentsIndexed,
             expAvgDocumentsProcessed
@@ -278,4 +295,13 @@ public class TransformIndexerStats extends IndexerJobStats {
             throw new RuntimeException(e);
         }
     }
+
+    @SuppressWarnings("unchecked")
+    private static <T> T unboxSafe(Object l, T default_value) {
+        if (l == null) {
+            return default_value;
+        } else {
+            return (T) l;
+        }
+    }
 }

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

@@ -24,7 +24,7 @@ public final class TransformInternalIndexConstants {
     // internal index
 
     // version is not a rollover pattern, however padded because sort is string based
-    public static final String INDEX_VERSION = "004";
+    public static final String INDEX_VERSION = "005";
     public static final String INDEX_PATTERN = ".transform-internal-";
     public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
     public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java

@@ -43,7 +43,8 @@ public class JobWrapperSerializingTests extends AbstractSerializingTestCase<GetR
         return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.randomRollupJobConfig(random()),
                 new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
                     randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-                    randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()),
+                    randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+                    randomNonNegativeLong(), randomNonNegativeLong()),
                 new RollupJobStatus(state, Collections.emptyMap()));
     }
 }

+ 2 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupIndexerJobStatsTests.java

@@ -29,7 +29,8 @@ public class RollupIndexerJobStatsTests extends AbstractSerializingTestCase<Roll
     public static RollupIndexerJobStats randomStats() {
         return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(),
             randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
-            randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
+            randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+            randomNonNegativeLong(), randomNonNegativeLong());
     }
 
     @Override

+ 91 - 18
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java

@@ -7,14 +7,43 @@
 package org.elasticsearch.xpack.core.transform.transforms;
 
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 
+import java.io.IOException;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
 
 public class TransformIndexerStatsTests extends AbstractSerializingTestCase<TransformIndexerStats> {
 
+    public static TransformIndexerStats randomStats() {
+        return new TransformIndexerStats(
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomDouble(),
+            randomDouble(),
+            randomDouble()
+        );
+    }
+
+    @Override
+    protected boolean supportsUnknownFields() {
+        return true;
+    }
+
     @Override
     protected TransformIndexerStats createTestInstance() {
         return randomStats();
@@ -30,24 +59,6 @@ public class TransformIndexerStatsTests extends AbstractSerializingTestCase<Tran
         return TransformIndexerStats.fromXContent(parser);
     }
 
-    public static TransformIndexerStats randomStats() {
-        return new TransformIndexerStats(
-            randomLongBetween(10L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L),
-            randomBoolean() ? randomDouble() : null,
-            randomBoolean() ? randomDouble() : null,
-            randomBoolean() ? randomDouble() : null
-        );
-    }
-
     public void testExpAvgIncrement() {
         TransformIndexerStats stats = new TransformIndexerStats();
 
@@ -67,4 +78,66 @@ public class TransformIndexerStatsTests extends AbstractSerializingTestCase<Tran
         assertThat(stats.getExpAvgDocumentsIndexed(), closeTo(20.54545454, 0.0000001));
         assertThat(stats.getExpAvgDocumentsProcessed(), closeTo(59.0909090, 0.0000001));
     }
+
+    public void testXContentLeniencyForMissingFields() throws IOException {
+        // this is essentially the same test as done in the super class, but with the difference of a custom toXContent method that leaves
+        // out fields if the value is 0, this allow us to test successful parsing if fields are not available, e.g. on older versions
+        xContentTester(this::createParser, this::createTestInstance, TransformIndexerStatsTests::toXContentIfNotZero, this::doParseInstance)
+            .numberOfTestRuns(NUMBER_OF_TEST_RUNS)
+            .supportsUnknownFields(supportsUnknownFields())
+            .shuffleFieldsExceptions(getShuffleFieldsExceptions())
+            .randomFieldsExcludeFilter(getRandomFieldsExcludeFilter())
+            .assertEqualsConsumer(this::assertEqualInstances)
+            .assertToXContentEquivalence(assertToXContentEquivalence())
+            .test();
+    }
+
+    public static void toXContentIfNotZero(TransformIndexerStats stats, XContentBuilder builder) throws IOException {
+        // a toXContent version which leaves out field with value 0
+        builder.startObject();
+        xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
+        xContentFieldIfNotZero(builder, TransformIndexerStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
+        xContentFieldIfNotZero(
+            builder,
+            TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
+            stats.getExpAvgCheckpointDurationMs()
+        );
+        xContentFieldIfNotZero(
+            builder,
+            TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
+            stats.getExpAvgDocumentsIndexed()
+        );
+        xContentFieldIfNotZero(
+            builder,
+            TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
+            stats.getExpAvgDocumentsProcessed()
+        );
+        builder.endObject();
+    }
+
+    private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, long value) throws IOException {
+        if (value > 0) {
+            builder.field(name, value);
+        }
+
+        return builder;
+    }
+
+    private static XContentBuilder xContentFieldIfNotZero(XContentBuilder builder, String name, double value) throws IOException {
+        if (value > 0.0) {
+            builder.field(name, value);
+        }
+
+        return builder;
+    }
 }

+ 29 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java

@@ -69,7 +69,7 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
                 STARTED,
                 randomBoolean() ? null : randomAlphaOfLength(100),
                 randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
-                new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0.0, 0.0, 0.0),
+                new TransformIndexerStats(1, 2, 3, 4, 5, 6, 0, 8, 9, 0, 11, 12, 0.0, 0.0, 0.0),
                 new TransformCheckpointingInfo(
                     new TransformCheckpointStats(0, null, null, 10, 100),
                     new TransformCheckpointStats(0, null, null, 100, 1000),
@@ -89,4 +89,32 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
             }
         }
     }
+
+    public void testBwcWith76() throws IOException {
+        for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
+            TransformStats stats = new TransformStats(
+                "bwc-id",
+                STARTED,
+                randomBoolean() ? null : randomAlphaOfLength(100),
+                randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
+                new TransformIndexerStats(1, 2, 3, 4, 5, 6, 0, 8, 9, 0, 11, 12, 13.0, 14.0, 15.0),
+                new TransformCheckpointingInfo(
+                    new TransformCheckpointStats(0, null, null, 10, 100),
+                    new TransformCheckpointStats(0, null, null, 100, 1000),
+                    // changesLastDetectedAt aren't serialized back
+                    100,
+                    null
+                )
+            );
+            try (BytesStreamOutput output = new BytesStreamOutput()) {
+                output.setVersion(Version.V_7_6_0);
+                stats.writeTo(output);
+                try (StreamInput in = output.bytes().streamInput()) {
+                    in.setVersion(Version.V_7_6_0);
+                    TransformStats statsFromOld = new TransformStats(in);
+                    assertThat(statsFromOld, equalTo(stats));
+                }
+            }
+        }
+    }
 }

+ 6 - 6
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java

@@ -67,7 +67,7 @@ import static org.mockito.Mockito.when;
 public class IndexerUtilsTests extends AggregatorTestCase {
     public void testMissingFields() throws IOException {
         String indexName = randomAlphaOfLengthBetween(1, 10);
-        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         String timestampField = "the_histo";
         String valueField = "the_avg";
@@ -131,7 +131,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
 
     public void testCorrectFields() throws IOException {
         String indexName = randomAlphaOfLengthBetween(1, 10);
-        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         String timestampField = "the_histo";
         String valueField = "the_avg";
@@ -199,7 +199,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
 
     public void testNumericTerms() throws IOException {
         String indexName = randomAlphaOfLengthBetween(1, 10);
-        RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        RollupIndexerJobStats stats= new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         String valueField = "the_avg";
 
@@ -255,7 +255,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
 
     public void testEmptyCounts() throws IOException {
         String indexName = randomAlphaOfLengthBetween(1, 10);
-        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         String timestampField = "ts";
         String valueField = "the_avg";
@@ -444,7 +444,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
 
     public void testMissingBuckets() throws IOException {
         String indexName = randomAlphaOfLengthBetween(1, 10);
-        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         String metricField = "metric_field";
         String valueField = "value_field";
@@ -517,7 +517,7 @@ public class IndexerUtilsTests extends AggregatorTestCase {
 
     public void testTimezone() throws IOException {
         String indexName = randomAlphaOfLengthBetween(1, 10);
-        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+        RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
 
         String timestampField = "the_histo";
         String valueField = "the_avg";

+ 6 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml

@@ -75,6 +75,8 @@ setup:
             index_total: 0
             search_time_in_ms: 0
             search_total: 0
+            processing_time_in_ms: 0
+            processing_total: 0
           status:
             job_state: "stopped"
 
@@ -127,6 +129,8 @@ setup:
             index_total: 0
             search_time_in_ms: 0
             search_total: 0
+            processing_time_in_ms: 0
+            processing_total: 0
           status:
             job_state: "stopped"
 
@@ -179,6 +183,8 @@ setup:
             index_total: 0
             search_time_in_ms: 0
             search_total: 0
+            processing_time_in_ms: 0
+            processing_total: 0
           status:
             job_state: "stopped"
 

+ 4 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml

@@ -76,6 +76,8 @@ setup:
             index_total: 0
             search_time_in_ms: 0
             search_total: 0
+            processing_time_in_ms: 0
+            processing_total: 0
           status:
             job_state: "stopped"
 ---
@@ -217,6 +219,8 @@ setup:
             index_total: 0
             search_time_in_ms: 0
             search_total: 0
+            processing_time_in_ms: 0
+            processing_total: 0
           status:
             job_state: "stopped"
 

+ 2 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml

@@ -76,6 +76,8 @@ setup:
             index_total: 0
             search_time_in_ms: 0
             search_total: 0
+            processing_time_in_ms: 0
+            processing_total: 0
           status:
             job_state: "stopped"
 

+ 0 - 5
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java

@@ -100,11 +100,6 @@ public class TransformUsageIT extends TransformRestTestCase {
             assertEquals(2, XContentMapValues.extractValue("transform.transforms.stopped", statsMap));
             assertEquals(1, XContentMapValues.extractValue("transform.transforms.started", statsMap));
             for (String statName : PROVIDED_STATS) {
-                if (statName.equals(TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName())
-                    || statName.equals(TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName())) {
-                    continue;
-                }
-
                 // the trigger count can be higher if the scheduler kicked before usage has been called, therefore check for gte
                 if (statName.equals(TransformIndexerStats.NUM_INVOCATIONS.getPreferredName())) {
                     assertThat(

+ 11 - 7
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java

@@ -51,8 +51,10 @@ public class TransformInfoTransportAction extends XPackInfoFeatureTransportActio
         TransformIndexerStats.NUM_INVOCATIONS.getPreferredName(),
         TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName(),
         TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName(),
+        TransformIndexerStats.PROCESSING_TIME_IN_MS.getPreferredName(),
         TransformIndexerStats.INDEX_TOTAL.getPreferredName(),
         TransformIndexerStats.SEARCH_TOTAL.getPreferredName(),
+        TransformIndexerStats.PROCESSING_TOTAL.getPreferredName(),
         TransformIndexerStats.INDEX_FAILURES.getPreferredName(),
         TransformIndexerStats.SEARCH_FAILURES.getPreferredName(),
         TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
@@ -105,13 +107,15 @@ public class TransformInfoTransportAction extends XPackInfoFeatureTransportActio
             statisticsList.get(3).longValue(),  // numInvocations
             statisticsList.get(4).longValue(),  // indexTime
             statisticsList.get(5).longValue(),  // searchTime
-            statisticsList.get(6).longValue(),  // indexTotal
-            statisticsList.get(7).longValue(),  // searchTotal
-            statisticsList.get(8).longValue(),  // indexFailures
-            statisticsList.get(9).longValue(), // searchFailures
-            statisticsList.get(10), // exponential_avg_checkpoint_duration_ms
-            statisticsList.get(11), // exponential_avg_documents_indexed
-            statisticsList.get(12)  // exponential_avg_documents_processed
+            statisticsList.get(6).longValue(),  // processingTime
+            statisticsList.get(7).longValue(),  // indexTotal
+            statisticsList.get(8).longValue(),  // searchTotal
+            statisticsList.get(9).longValue(),  // processingTotal
+            statisticsList.get(10).longValue(),  // indexFailures
+            statisticsList.get(11).longValue(), // searchFailures
+            statisticsList.get(12), // exponential_avg_checkpoint_duration_ms
+            statisticsList.get(13), // exponential_avg_documents_indexed
+            statisticsList.get(14)  // exponential_avg_documents_processed
         );
     }
 

+ 7 - 0
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

@@ -58,6 +58,7 @@ public final class TransformInternalIndex {
      * version 3 (7.5): rename to .transform-internal-xxx
      * version 4 (7.6): state::should_stop_at_checkpoint
      *                  checkpoint::checkpoint
+     * version 5 (7.7): stats::processing_time_in_ms, stats::processing_total
      */
 
     // constants for mappings
@@ -238,11 +239,17 @@ public final class TransformInternalIndex {
                      .startObject(TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName())
                         .field(TYPE, LONG)
                     .endObject()
+                    .startObject(TransformIndexerStats.PROCESSING_TIME_IN_MS.getPreferredName())
+                        .field(TYPE, LONG)
+                     .endObject()
                      .startObject(TransformIndexerStats.INDEX_TOTAL.getPreferredName())
                         .field(TYPE, LONG)
                     .endObject()
                      .startObject(TransformIndexerStats.SEARCH_TOTAL.getPreferredName())
                         .field(TYPE, LONG)
+                    .endObject()
+                    .startObject(TransformIndexerStats.PROCESSING_TOTAL.getPreferredName())
+                        .field(TYPE, LONG)
                     .endObject()
                      .startObject(TransformIndexerStats.SEARCH_FAILURES.getPreferredName())
                         .field(TYPE, LONG)

+ 9 - 7
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformInfoTransportActionTests.java

@@ -98,13 +98,15 @@ public class TransformInfoTransportActionTests extends ESTestCase {
             4,  // numInvocations
             5,  // indexTime
             6,  // searchTime
-            7,  // indexTotal
-            8,  // searchTotal
-            9,  // indexFailures
-            10, // searchFailures
-            11.0,  // exponential_avg_checkpoint_duration_ms
-            12.0,  // exponential_avg_documents_indexed
-            13.0   // exponential_avg_documents_processed
+            7,  // processingTime
+            8,  // indexTotal
+            9,  // searchTotal
+            10, // processingTotal
+            11, // indexFailures
+            12, // searchFailures
+            13.0,  // exponential_avg_checkpoint_duration_ms
+            14.0,  // exponential_avg_documents_indexed
+            15.0   // exponential_avg_documents_processed
         );
 
         int currentStat = 1;

+ 3 - 3
x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_transform_jobs_crud.yml

@@ -279,9 +279,9 @@ setup:
 
   - do:
       indices.get_mapping:
-        index: .transform-internal-004
-  - match: { \.transform-internal-004.mappings.dynamic: "false" }
-  - match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
+        index: .transform-internal-005
+  - match: { \.transform-internal-005.mappings.dynamic: "false" }
+  - match: { \.transform-internal-005.mappings.properties.id.type: "keyword" }
   - do:
       indices.get_mapping:
         index: .transform-notifications-000002