Bladeren bron

Add DatafeedTimingStats.average_search_time_per_bucket_ms and TimingStats.total_bucket_processing_time_ms stats (#44125)

Przemysław Witek 6 jaren geleden
bovenliggende
commit
e2bdd43703
25 gewijzigde bestanden met toevoegingen van 449 en 98 verwijderingen
  1. 35 5
      client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java
  2. 34 3
      client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java
  3. 14 8
      client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java
  4. 32 8
      client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java
  5. 11 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java
  6. 9 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java
  7. 38 6
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStats.java
  8. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java
  9. 28 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java
  10. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java
  11. 6 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java
  12. 5 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java
  13. 57 11
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java
  14. 24 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java
  15. 1 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java
  16. 6 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java
  17. 17 5
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java
  18. 34 10
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java
  19. 5 3
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java
  20. 64 15
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java
  21. 2 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java
  22. 8 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java
  23. 1 0
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java
  24. 10 6
      x-pack/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml
  25. 5 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml

+ 35 - 5
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStats.java

@@ -36,7 +36,9 @@ public class DatafeedTimingStats implements ToXContentObject {
 
     public static final ParseField JOB_ID = new ParseField("job_id");
     public static final ParseField SEARCH_COUNT = new ParseField("search_count");
+    public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
     public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms");
+    public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms");
 
     public static final ParseField TYPE = new ParseField("datafeed_timing_stats");
 
@@ -50,23 +52,37 @@ public class DatafeedTimingStats implements ToXContentObject {
                 args -> {
                     String jobId = (String) args[0];
                     Long searchCount = (Long) args[1];
-                    Double totalSearchTimeMs = (Double) args[2];
-                    return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
+                    Long bucketCount = (Long) args[2];
+                    Double totalSearchTimeMs = (Double) args[3];
+                    Double avgSearchTimePerBucketMs = (Double) args[4];
+                    return new DatafeedTimingStats(
+                        jobId,
+                        getOrDefault(searchCount, 0L),
+                        getOrDefault(bucketCount, 0L),
+                        getOrDefault(totalSearchTimeMs, 0.0),
+                        avgSearchTimePerBucketMs);
                 });
         parser.declareString(constructorArg(), JOB_ID);
         parser.declareLong(optionalConstructorArg(), SEARCH_COUNT);
+        parser.declareLong(optionalConstructorArg(), BUCKET_COUNT);
         parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS);
+        parser.declareDouble(optionalConstructorArg(), AVG_SEARCH_TIME_PER_BUCKET_MS);
         return parser;
     }
 
     private final String jobId;
     private long searchCount;
+    private long bucketCount;
     private double totalSearchTimeMs;
+    private Double avgSearchTimePerBucketMs;
 
-    public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) {
+    public DatafeedTimingStats(
+            String jobId, long searchCount, long bucketCount, double totalSearchTimeMs, @Nullable Double avgSearchTimePerBucketMs) {
         this.jobId = Objects.requireNonNull(jobId);
         this.searchCount = searchCount;
+        this.bucketCount = bucketCount;
         this.totalSearchTimeMs = totalSearchTimeMs;
+        this.avgSearchTimePerBucketMs = avgSearchTimePerBucketMs;
     }
 
     public String getJobId() {
@@ -77,16 +93,28 @@ public class DatafeedTimingStats implements ToXContentObject {
         return searchCount;
     }
 
+    public long getBucketCount() {
+        return bucketCount;
+    }
+
     public double getTotalSearchTimeMs() {
         return totalSearchTimeMs;
     }
 
+    public Double getAvgSearchTimePerBucketMs() {
+        return avgSearchTimePerBucketMs;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
         builder.startObject();
         builder.field(JOB_ID.getPreferredName(), jobId);
         builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
+        builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
         builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs);
+        if (avgSearchTimePerBucketMs != null) {
+            builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), avgSearchTimePerBucketMs);
+        }
         builder.endObject();
         return builder;
     }
@@ -103,12 +131,14 @@ public class DatafeedTimingStats implements ToXContentObject {
         DatafeedTimingStats other = (DatafeedTimingStats) obj;
         return Objects.equals(this.jobId, other.jobId)
             && this.searchCount == other.searchCount
-            && this.totalSearchTimeMs == other.totalSearchTimeMs;
+            && this.bucketCount == other.bucketCount
+            && this.totalSearchTimeMs == other.totalSearchTimeMs
+            && Objects.equals(this.avgSearchTimePerBucketMs, other.avgSearchTimePerBucketMs);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jobId, searchCount, totalSearchTimeMs);
+        return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs, avgSearchTimePerBucketMs);
     }
 
     @Override

+ 34 - 3
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java

@@ -39,6 +39,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
 public class TimingStats implements ToXContentObject {
 
     public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
+    public static final ParseField TOTAL_BUCKET_PROCESSING_TIME_MS = new ParseField("total_bucket_processing_time_ms");
     public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
     public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
     public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
@@ -49,12 +50,28 @@ public class TimingStats implements ToXContentObject {
         new ConstructingObjectParser<>(
             "timing_stats",
             true,
-            args ->
-                new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
+            args -> {
+                String jobId = (String) args[0];
+                Long bucketCount = (Long) args[1];
+                Double totalBucketProcessingTimeMs = (Double) args[2];
+                Double minBucketProcessingTimeMs = (Double) args[3];
+                Double maxBucketProcessingTimeMs = (Double) args[4];
+                Double avgBucketProcessingTimeMs = (Double) args[5];
+                Double exponentialAvgBucketProcessingTimeMs = (Double) args[6];
+                return new TimingStats(
+                    jobId,
+                    getOrDefault(bucketCount, 0L),
+                    getOrDefault(totalBucketProcessingTimeMs, 0.0),
+                    minBucketProcessingTimeMs,
+                    maxBucketProcessingTimeMs,
+                    avgBucketProcessingTimeMs,
+                    exponentialAvgBucketProcessingTimeMs);
+            });
 
     static {
         PARSER.declareString(constructorArg(), Job.ID);
-        PARSER.declareLong(constructorArg(), BUCKET_COUNT);
+        PARSER.declareLong(optionalConstructorArg(), BUCKET_COUNT);
+        PARSER.declareDouble(optionalConstructorArg(), TOTAL_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
@@ -63,6 +80,7 @@ public class TimingStats implements ToXContentObject {
 
     private final String jobId;
     private long bucketCount;
+    private double totalBucketProcessingTimeMs;
     private Double minBucketProcessingTimeMs;
     private Double maxBucketProcessingTimeMs;
     private Double avgBucketProcessingTimeMs;
@@ -71,12 +89,14 @@ public class TimingStats implements ToXContentObject {
     public TimingStats(
             String jobId,
             long bucketCount,
+            double totalBucketProcessingTimeMs,
             @Nullable Double minBucketProcessingTimeMs,
             @Nullable Double maxBucketProcessingTimeMs,
             @Nullable Double avgBucketProcessingTimeMs,
             @Nullable Double exponentialAvgBucketProcessingTimeMs) {
         this.jobId = jobId;
         this.bucketCount = bucketCount;
+        this.totalBucketProcessingTimeMs = totalBucketProcessingTimeMs;
         this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
         this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
         this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
@@ -91,6 +111,10 @@ public class TimingStats implements ToXContentObject {
         return bucketCount;
     }
 
+    public double getTotalBucketProcessingTimeMs() {
+        return totalBucketProcessingTimeMs;
+    }
+
     public Double getMinBucketProcessingTimeMs() {
         return minBucketProcessingTimeMs;
     }
@@ -112,6 +136,7 @@ public class TimingStats implements ToXContentObject {
         builder.startObject();
         builder.field(Job.ID.getPreferredName(), jobId);
         builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
+        builder.field(TOTAL_BUCKET_PROCESSING_TIME_MS.getPreferredName(), totalBucketProcessingTimeMs);
         if (minBucketProcessingTimeMs != null) {
             builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs);
         }
@@ -135,6 +160,7 @@ public class TimingStats implements ToXContentObject {
         TimingStats that = (TimingStats) o;
         return Objects.equals(this.jobId, that.jobId)
             && this.bucketCount == that.bucketCount
+            && this.totalBucketProcessingTimeMs == that.totalBucketProcessingTimeMs
             && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
             && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
             && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
@@ -146,6 +172,7 @@ public class TimingStats implements ToXContentObject {
         return Objects.hash(
             jobId,
             bucketCount,
+            totalBucketProcessingTimeMs,
             minBucketProcessingTimeMs,
             maxBucketProcessingTimeMs,
             avgBucketProcessingTimeMs,
@@ -156,4 +183,8 @@ public class TimingStats implements ToXContentObject {
     public String toString() {
         return Strings.toString(this);
     }
+
+    private static <T> T getOrDefault(@Nullable T value, T defaultValue) {
+        return value != null ? value : defaultValue;
+    }
 }

+ 14 - 8
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedTimingStatsTests.java

@@ -27,13 +27,15 @@ import org.elasticsearch.test.AbstractXContentTestCase;
 import java.io.IOException;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 
 public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedTimingStats> {
 
     private static final String JOB_ID = "my-job-id";
 
     public static DatafeedTimingStats createRandomInstance() {
-        return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble());
+        return new DatafeedTimingStats(
+            randomAlphaOfLength(10), randomLong(), randomLong(), randomDouble(), randomBoolean() ? null : randomDouble());
     }
 
     @Override
@@ -59,14 +61,16 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedT
             DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null);
             assertThat(stats.getJobId(), equalTo(JOB_ID));
             assertThat(stats.getSearchCount(), equalTo(0L));
+            assertThat(stats.getBucketCount(), equalTo(0L));
             assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
+            assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue());
         }
     }
 
     public void testEquals() {
-        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
+        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
+        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
+        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0, 20.0);
 
         assertTrue(stats1.equals(stats1));
         assertTrue(stats1.equals(stats2));
@@ -74,9 +78,9 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedT
     }
 
     public void testHashCode() {
-        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
+        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
+        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0, 20.0);
+        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0, 20.0);
 
         assertEquals(stats1.hashCode(), stats1.hashCode());
         assertEquals(stats1.hashCode(), stats2.hashCode());
@@ -84,9 +88,11 @@ public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedT
     }
 
     public void testConstructorAndGetters() {
-        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456);
+        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456, 78.9);
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getSearchCount(), equalTo(5L));
+        assertThat(stats.getBucketCount(), equalTo(10L));
         assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(78.9));
     }
 }

+ 32 - 8
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java

@@ -18,9 +18,14 @@
  */
 package org.elasticsearch.client.ml.job.process;
 
+import org.elasticsearch.common.xcontent.DeprecationHandler;
+import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.test.AbstractXContentTestCase;
 
+import java.io.IOException;
+
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 
@@ -32,6 +37,7 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
         return new TimingStats(
             jobId,
             randomLong(),
+            randomDouble(),
             randomBoolean() ? null : randomDouble(),
             randomBoolean() ? null : randomDouble(),
             randomBoolean() ? null : randomDouble(),
@@ -54,10 +60,11 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
     }
 
     public void testConstructor() {
-        TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(7L));
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61));
         assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
         assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
         assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
@@ -65,20 +72,37 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
     }
 
     public void testConstructor_NullValues() {
-        TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null);
+        TimingStats stats = new TimingStats(JOB_ID, 7, 8.61, null, null, null, null);
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(7L));
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61));
         assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
     }
 
+    public void testParse_OptionalFieldsAbsent() throws IOException {
+        String json = "{\"job_id\": \"my-job-id\"}";
+        try (XContentParser parser =
+                 XContentFactory.xContent(XContentType.JSON).createParser(
+                     xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json)) {
+            TimingStats stats = TimingStats.PARSER.apply(parser, null);
+            assertThat(stats.getJobId(), equalTo(JOB_ID));
+            assertThat(stats.getBucketCount(), equalTo(0L));
+            assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0));
+            assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
+            assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
+            assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
+            assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
+        }
+    }
+
     public void testEquals() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
-        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
-        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats2 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats3 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 3.0, 1.23, 7.89);
 
         assertTrue(stats1.equals(stats1));
         assertTrue(stats1.equals(stats2));
@@ -86,9 +110,9 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
     }
 
     public void testHashCode() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
-        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
-        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats2 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats3 = new TimingStats(JOB_ID, 7, 8.61, 1.0, 3.0, 1.23, 7.89);
 
         assertEquals(stats1.hashCode(), stats1.hashCode());
         assertEquals(stats1.hashCode(), stats2.hashCode());

+ 11 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedsStatsAction.java

@@ -24,8 +24,10 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
@@ -38,6 +40,9 @@ public class GetDatafeedsStatsAction extends ActionType<GetDatafeedsStatsAction.
 
     public static final String ALL = "_all";
     private static final String STATE = "state";
+    private static final String NODE = "node";
+    private static final String ASSIGNMENT_EXPLANATION = "assignment_explanation";
+    private static final String TIMING_STATS = "timing_stats";
 
     private GetDatafeedsStatsAction() {
         super(NAME, Response::new);
@@ -176,7 +181,7 @@ public class GetDatafeedsStatsAction extends ActionType<GetDatafeedsStatsAction.
                 builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
                 builder.field(STATE, datafeedState.toString());
                 if (node != null) {
-                    builder.startObject("node");
+                    builder.startObject(NODE);
                     builder.field("id", node.getId());
                     builder.field("name", node.getName());
                     builder.field("ephemeral_id", node.getEphemeralId());
@@ -192,10 +197,13 @@ public class GetDatafeedsStatsAction extends ActionType<GetDatafeedsStatsAction.
                     builder.endObject();
                 }
                 if (assignmentExplanation != null) {
-                    builder.field("assignment_explanation", assignmentExplanation);
+                    builder.field(ASSIGNMENT_EXPLANATION, assignmentExplanation);
                 }
                 if (timingStats != null) {
-                    builder.field("timing_stats", timingStats);
+                    builder.field(
+                        TIMING_STATS,
+                        timingStats,
+                        new MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_CALCULATED_FIELDS, "true")));
                 }
                 builder.endObject();
                 return builder;

+ 9 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java

@@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeSta
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -51,6 +52,8 @@ public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response>
     private static final String FORECASTS_STATS = "forecasts_stats";
     private static final String STATE = "state";
     private static final String NODE = "node";
+    private static final String ASSIGNMENT_EXPLANATION = "assignment_explanation";
+    private static final String OPEN_TIME = "open_time";
     private static final String TIMING_STATS = "timing_stats";
 
     private GetJobsStatsAction() {
@@ -266,13 +269,16 @@ public class GetJobsStatsAction extends ActionType<GetJobsStatsAction.Response>
                     builder.endObject();
                 }
                 if (assignmentExplanation != null) {
-                    builder.field("assignment_explanation", assignmentExplanation);
+                    builder.field(ASSIGNMENT_EXPLANATION, assignmentExplanation);
                 }
                 if (openTime != null) {
-                    builder.field("open_time", openTime.getStringRep());
+                    builder.field(OPEN_TIME, openTime.getStringRep());
                 }
                 if (timingStats != null) {
-                    builder.field(TIMING_STATS, timingStats);
+                    builder.field(
+                        TIMING_STATS,
+                        timingStats,
+                        new MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_CALCULATED_FIELDS, "true")));
                 }
                 return builder;
             }

+ 38 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStats.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -26,7 +27,9 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
 
     public static final ParseField JOB_ID = new ParseField("job_id");
     public static final ParseField SEARCH_COUNT = new ParseField("search_count");
+    public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
     public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms");
+    public static final ParseField AVG_SEARCH_TIME_PER_BUCKET_MS = new ParseField("average_search_time_per_bucket_ms");
 
     public static final ParseField TYPE = new ParseField("datafeed_timing_stats");
 
@@ -40,11 +43,14 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
                 args -> {
                     String jobId = (String) args[0];
                     Long searchCount = (Long) args[1];
-                    Double totalSearchTimeMs = (Double) args[2];
-                    return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
+                    Long bucketCount = (Long) args[2];
+                    Double totalSearchTimeMs = (Double) args[3];
+                    return new DatafeedTimingStats(
+                        jobId, getOrDefault(searchCount, 0L), getOrDefault(bucketCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
                 });
         parser.declareString(constructorArg(), JOB_ID);
         parser.declareLong(optionalConstructorArg(), SEARCH_COUNT);
+        parser.declareLong(optionalConstructorArg(), BUCKET_COUNT);
         parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS);
         return parser;
     }
@@ -55,26 +61,29 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
 
     private final String jobId;
     private long searchCount;
+    private long bucketCount;
     private double totalSearchTimeMs;
 
-    public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) {
+    public DatafeedTimingStats(String jobId, long searchCount, long bucketCount, double totalSearchTimeMs) {
         this.jobId = Objects.requireNonNull(jobId);
         this.searchCount = searchCount;
+        this.bucketCount = bucketCount;
         this.totalSearchTimeMs = totalSearchTimeMs;
     }
 
     public DatafeedTimingStats(String jobId) {
-        this(jobId, 0, 0);
+        this(jobId, 0, 0, 0.0);
     }
 
     public DatafeedTimingStats(StreamInput in) throws IOException {
         jobId = in.readString();
         searchCount = in.readLong();
+        bucketCount = in.readLong();
         totalSearchTimeMs = in.readDouble();
     }
 
     public DatafeedTimingStats(DatafeedTimingStats other) {
-        this(other.jobId, other.searchCount, other.totalSearchTimeMs);
+        this(other.jobId, other.searchCount, other.bucketCount, other.totalSearchTimeMs);
     }
 
     public String getJobId() {
@@ -85,19 +94,34 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
         return searchCount;
     }
 
+    public long getBucketCount() {
+        return bucketCount;
+    }
+
     public double getTotalSearchTimeMs() {
         return totalSearchTimeMs;
     }
 
+    public Double getAvgSearchTimePerBucketMs() {
+        return bucketCount > 0
+            ? totalSearchTimeMs / bucketCount
+            : null;
+    }
+
     public void incrementTotalSearchTimeMs(double searchTimeMs) {
         this.searchCount++;
         this.totalSearchTimeMs += searchTimeMs;
     }
 
+    public void setBucketCount(long bucketCount) {
+        this.bucketCount = bucketCount;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(jobId);
         out.writeLong(searchCount);
+        out.writeLong(bucketCount);
         out.writeDouble(totalSearchTimeMs);
     }
 
@@ -106,7 +130,14 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
         builder.startObject();
         builder.field(JOB_ID.getPreferredName(), jobId);
         builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
+        builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
         builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs);
+        if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) {
+            Double avgSearchTimePerBucket = getAvgSearchTimePerBucketMs();
+            if (avgSearchTimePerBucket != null) {
+                builder.field(AVG_SEARCH_TIME_PER_BUCKET_MS.getPreferredName(), getAvgSearchTimePerBucketMs());
+            }
+        }
         builder.endObject();
         return builder;
     }
@@ -123,12 +154,13 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
         DatafeedTimingStats other = (DatafeedTimingStats) obj;
         return Objects.equals(this.jobId, other.jobId)
             && this.searchCount == other.searchCount
+            && this.bucketCount == other.bucketCount
             && this.totalSearchTimeMs == other.totalSearchTimeMs;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jobId, searchCount, totalSearchTimeMs);
+        return Objects.hash(jobId, searchCount, bucketCount, totalSearchTimeMs);
     }
 
     @Override

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

@@ -936,6 +936,7 @@ public class ElasticsearchMappings {
 
     /**
      * {@link DatafeedTimingStats} mapping.
+     * Does not include mapping for BUCKET_COUNT as this mapping is added by {@link #addDataCountsMapping} method.
      *
      * @throws IOException On builder write error
      */
@@ -944,6 +945,7 @@ public class ElasticsearchMappings {
             .startObject(DatafeedTimingStats.SEARCH_COUNT.getPreferredName())
                 .field(TYPE, LONG)
             .endObject()
+            // re-used: BUCKET_COUNT
             .startObject(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName())
                 .field(TYPE, DOUBLE)
             .endObject();

+ 28 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -28,6 +29,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
 public class TimingStats implements ToXContentObject, Writeable {
 
     public static final ParseField BUCKET_COUNT = new ParseField("bucket_count");
+    public static final ParseField TOTAL_BUCKET_PROCESSING_TIME_MS = new ParseField("total_bucket_processing_time_ms");
     public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
     public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
     public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
@@ -40,8 +42,21 @@ public class TimingStats implements ToXContentObject, Writeable {
         new ConstructingObjectParser<>(
             TYPE.getPreferredName(),
             true,
-            args ->
-                new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
+            args -> {
+                String jobId = (String) args[0];
+                long bucketCount = (long) args[1];
+                Double minBucketProcessingTimeMs = (Double) args[2];
+                Double maxBucketProcessingTimeMs = (Double) args[3];
+                Double avgBucketProcessingTimeMs = (Double) args[4];
+                Double exponentialAvgBucketProcessingTimeMs = (Double) args[5];
+                return new TimingStats(
+                    jobId,
+                    bucketCount,
+                    minBucketProcessingTimeMs,
+                    maxBucketProcessingTimeMs,
+                    avgBucketProcessingTimeMs,
+                    exponentialAvgBucketProcessingTimeMs);
+            });
 
     static {
         PARSER.declareString(constructorArg(), Job.ID);
@@ -109,6 +124,13 @@ public class TimingStats implements ToXContentObject, Writeable {
         return bucketCount;
     }
 
+    /** Calculates total bucket processing time as a product of the all-time average bucket processing time and the number of buckets. */
+    public double getTotalBucketProcessingTimeMs() {
+        return avgBucketProcessingTimeMs != null
+            ? bucketCount * avgBucketProcessingTimeMs
+            : 0.0;
+    }
+
     public Double getMinBucketProcessingTimeMs() {
         return minBucketProcessingTimeMs;
     }
@@ -126,7 +148,7 @@ public class TimingStats implements ToXContentObject, Writeable {
     }
 
     /**
-     * Updates the statistics (min, max, avg) for the given data point (bucket processing time).
+     * Updates the statistics (min, max, avg, exponential avg) for the given data point (bucket processing time).
      */
     public void updateStats(double bucketProcessingTimeMs) {
         if (bucketProcessingTimeMs < 0.0) {
@@ -175,6 +197,9 @@ public class TimingStats implements ToXContentObject, Writeable {
         builder.startObject();
         builder.field(Job.ID.getPreferredName(), jobId);
         builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
+        if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) {
+            builder.field(TOTAL_BUCKET_PROCESSING_TIME_MS.getPreferredName(), getTotalBucketProcessingTimeMs());
+        }
         if (minBucketProcessingTimeMs != null) {
             builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs);
         }

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java

@@ -187,6 +187,7 @@ public final class ReservedFieldNames {
             TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
 
             DatafeedTimingStats.SEARCH_COUNT.getPreferredName(),
+            DatafeedTimingStats.BUCKET_COUNT.getPreferredName(),
             DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(),
 
             GetResult._ID,

+ 6 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ToXContentParams.java

@@ -24,6 +24,12 @@ public final class ToXContentParams {
      */
     public static final String INCLUDE_TYPE = "include_type";
 
+    /**
+     * When serialising POJOs to X Content this indicates whether the calculated (i.e. not stored) fields
+     * should be included or not
+     */
+    public static final String INCLUDE_CALCULATED_FIELDS = "include_calculated_fields";
+
     private ToXContentParams() {
     }
 }

+ 5 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetDatafeedStatsActionResponseTests.java

@@ -78,7 +78,7 @@ public class GetDatafeedStatsActionResponseTests extends AbstractWireSerializing
                 Set.of(),
                 Version.CURRENT);
 
-        DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 123.456);
+        DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 10, 100.0);
 
         Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null, timingStats);
 
@@ -110,9 +110,11 @@ public class GetDatafeedStatsActionResponseTests extends AbstractWireSerializing
         assertThat(nodeAttributes, hasEntry("ml.max_open_jobs", "5"));
 
         Map<String, Object> timingStatsMap = (Map<String, Object>) dfStatsMap.get("timing_stats");
-        assertThat(timingStatsMap.size(), is(equalTo(3)));
+        assertThat(timingStatsMap.size(), is(equalTo(5)));
         assertThat(timingStatsMap, hasEntry("job_id", "my-job-id"));
         assertThat(timingStatsMap, hasEntry("search_count", 5));
-        assertThat(timingStatsMap, hasEntry("total_search_time_ms", 123.456));
+        assertThat(timingStatsMap, hasEntry("bucket_count", 10));
+        assertThat(timingStatsMap, hasEntry("total_search_time_ms", 100.0));
+        assertThat(timingStatsMap, hasEntry("average_search_time_per_bucket_ms", 10.0));
     }
 }

+ 57 - 11
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedTimingStatsTests.java

@@ -14,14 +14,16 @@ import org.elasticsearch.test.AbstractSerializingTestCase;
 
 import java.io.IOException;
 
+import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 
 public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<DatafeedTimingStats> {
 
     private static final String JOB_ID = "my-job-id";
 
     public static DatafeedTimingStats createRandom() {
-        return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble());
+        return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomLong(), randomDouble());
     }
 
     @Override
@@ -43,10 +45,12 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
     protected DatafeedTimingStats mutateInstance(DatafeedTimingStats instance) throws IOException {
         String jobId = instance.getJobId();
         long searchCount = instance.getSearchCount();
+        long bucketCount = instance.getBucketCount();
         double totalSearchTimeMs = instance.getTotalSearchTimeMs();
         return new DatafeedTimingStats(
             jobId + randomAlphaOfLength(5),
-            searchCount + 1,
+            searchCount + 2,
+            bucketCount + 1,
             totalSearchTimeMs + randomDoubleBetween(1.0, 100.0, true));
     }
 
@@ -58,14 +62,16 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
             DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null);
             assertThat(stats.getJobId(), equalTo(JOB_ID));
             assertThat(stats.getSearchCount(), equalTo(0L));
+            assertThat(stats.getBucketCount(), equalTo(0L));
             assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
+            assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue());
         }
     }
 
     public void testEquals() {
-        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
+        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
+        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
+        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0);
 
         assertTrue(stats1.equals(stats1));
         assertTrue(stats1.equals(stats2));
@@ -73,9 +79,9 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
     }
 
     public void testHashCode() {
-        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
-        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
+        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
+        DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
+        DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 10, 200.0);
 
         assertEquals(stats1.hashCode(), stats1.hashCode());
         assertEquals(stats1.hashCode(), stats2.hashCode());
@@ -83,32 +89,72 @@ public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<Datafe
     }
 
     public void testConstructorsAndGetters() {
-        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456);
+        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456);
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getSearchCount(), equalTo(5L));
+        assertThat(stats.getBucketCount(), equalTo(10L));
         assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), closeTo(12.3456, 1e-9));
 
         stats = new DatafeedTimingStats(JOB_ID);
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getSearchCount(), equalTo(0L));
+        assertThat(stats.getBucketCount(), equalTo(0L));
         assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), nullValue());
     }
 
     public void testCopyConstructor() {
-        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 123.456);
+        DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 10, 123.456);
         DatafeedTimingStats stats2 = new DatafeedTimingStats(stats1);
 
         assertThat(stats2.getJobId(), equalTo(JOB_ID));
         assertThat(stats2.getSearchCount(), equalTo(5L));
+        assertThat(stats2.getBucketCount(), equalTo(10L));
         assertThat(stats2.getTotalSearchTimeMs(), equalTo(123.456));
+        assertThat(stats2.getAvgSearchTimePerBucketMs(), closeTo(12.3456, 1e-9));
     }
 
     public void testIncrementTotalSearchTimeMs() {
-        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 100.0);
+        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
         stats.incrementTotalSearchTimeMs(200.0);
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getSearchCount(), equalTo(6L));
+        assertThat(stats.getBucketCount(), equalTo(10L));
         assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(30.0));
+    }
+
+    public void testSetBucketCount() {
+        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
+        stats.setBucketCount(20);
+        assertThat(stats.getJobId(), equalTo(JOB_ID));
+        assertThat(stats.getSearchCount(), equalTo(5L));
+        assertThat(stats.getBucketCount(), equalTo(20L));
+        assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0));
+    }
+
+    public void testAvgSearchTimePerBucketIsCalculatedProperlyAfterUpdates() {
+        DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 10, 100.0);
+        assertThat(stats.getBucketCount(), equalTo(10L));
+        assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(10.0));
+
+        stats.setBucketCount(20);
+        assertThat(stats.getBucketCount(), equalTo(20L));
+        assertThat(stats.getTotalSearchTimeMs(), equalTo(100.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(5.0));
+
+        stats.incrementTotalSearchTimeMs(200.0);
+        assertThat(stats.getBucketCount(), equalTo(20L));
+        assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(15.0));
+
+        stats.setBucketCount(25);
+        assertThat(stats.getBucketCount(), equalTo(25L));
+        assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
+        assertThat(stats.getAvgSearchTimePerBucketMs(), equalTo(12.0));
     }
 
     public void testDocumentId() {

+ 24 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java

@@ -69,6 +69,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(0L));
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0));
         assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
@@ -80,6 +81,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(7L));
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(8.61));
         assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
         assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
         assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
@@ -92,6 +94,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
 
         assertThat(stats2.getJobId(), equalTo(JOB_ID));
         assertThat(stats2.getBucketCount(), equalTo(7L));
+        assertThat(stats2.getTotalBucketProcessingTimeMs(), equalTo(8.61));
         assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0));
         assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0));
         assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23));
@@ -119,6 +122,26 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
         assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0, 3.00029801), 1e-9));
     }
 
+    public void testTotalBucketProcessingTimeIsCalculatedProperlyAfterUpdates() {
+        TimingStats stats = new TimingStats(JOB_ID);
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(0.0));
+
+        stats.updateStats(3);
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(3.0));
+
+        stats.updateStats(2);
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(5.0));
+
+        stats.updateStats(4);
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(9.0));
+
+        stats.updateStats(1);
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(10.0));
+
+        stats.updateStats(5);
+        assertThat(stats.getTotalBucketProcessingTimeMs(), equalTo(15.0));
+    }
+
     public void testDocumentId() {
         assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats"));
     }
@@ -138,6 +161,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
             protected boolean matchesSafely(TimingStats item) {
                 return equalTo(operand.getJobId()).matches(item.getJobId())
                     && equalTo(operand.getBucketCount()).matches(item.getBucketCount())
+                    && closeTo(operand.getTotalBucketProcessingTimeMs(), error).matches(item.getTotalBucketProcessingTimeMs())
                     && closeTo(operand.getMinBucketProcessingTimeMs(), error).matches(item.getMinBucketProcessingTimeMs())
                     && closeTo(operand.getMaxBucketProcessingTimeMs(), error).matches(item.getMaxBucketProcessingTimeMs())
                     && closeTo(operand.getAvgBucketProcessingTimeMs(), error).matches(item.getAvgBucketProcessingTimeMs())

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

@@ -251,7 +251,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
             datafeed,
             job,
             xContentRegistry,
-            // Creating fake {@link TimingStatsReporter} so that search API call is not needed.
+            // Creating fake DatafeedTimingStatsReporter so that search API call is not needed.
             new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
             ActionListener.wrap(
                 unused ->

+ 6 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

@@ -61,6 +61,7 @@ class DatafeedJob {
     private final long queryDelayMs;
     private final Client client;
     private final DataExtractorFactory dataExtractorFactory;
+    private final DatafeedTimingStatsReporter timingStatsReporter;
     private final Supplier<Long> currentTimeSupplier;
     private final DelayedDataDetector delayedDataDetector;
 
@@ -74,13 +75,15 @@ class DatafeedJob {
     private volatile boolean isIsolated;
 
     DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs,
-                DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> currentTimeSupplier,
-                DelayedDataDetector delayedDataDetector, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
+                DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client,
+                Auditor auditor, Supplier<Long> currentTimeSupplier, DelayedDataDetector delayedDataDetector,
+                long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
         this.jobId = jobId;
         this.dataDescription = Objects.requireNonNull(dataDescription);
         this.frequencyMs = frequencyMs;
         this.queryDelayMs = queryDelayMs;
         this.dataExtractorFactory = dataExtractorFactory;
+        this.timingStatsReporter = timingStatsReporter;
         this.client = client;
         this.auditor = auditor;
         this.currentTimeSupplier = currentTimeSupplier;
@@ -350,6 +353,7 @@ class DatafeedJob {
                 try (InputStream in = extractedData.get()) {
                     counts = postData(in, XContentType.JSON);
                     LOGGER.trace("[{}] Processed another {} records", jobId, counts.getProcessedRecordCount());
+                    timingStatsReporter.reportDataCounts(counts);
                 } catch (Exception e) {
                     if (e instanceof InterruptedException) {
                         Thread.currentThread().interrupt();

+ 17 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

@@ -69,10 +69,20 @@ public class DatafeedJobBuilder {
             TimeValue queryDelay = datafeedConfigHolder.get().getQueryDelay();
             DelayedDataDetector delayedDataDetector =
                     DelayedDataDetectorFactory.buildDetector(jobHolder.get(), datafeedConfigHolder.get(), client, xContentRegistry);
-            DatafeedJob datafeedJob = new DatafeedJob(jobHolder.get().getId(), buildDataDescription(jobHolder.get()),
-                    frequency.millis(), queryDelay.millis(),
-                    context.dataExtractorFactory, client, auditor, currentTimeSupplier, delayedDataDetector,
-                    context.latestFinalBucketEndMs, context.latestRecordTimeMs);
+            DatafeedJob datafeedJob =
+                new DatafeedJob(
+                    jobHolder.get().getId(),
+                    buildDataDescription(jobHolder.get()),
+                    frequency.millis(),
+                    queryDelay.millis(),
+                    context.dataExtractorFactory,
+                    context.timingStatsReporter,
+                    client,
+                    auditor,
+                    currentTimeSupplier,
+                    delayedDataDetector,
+                    context.latestFinalBucketEndMs,
+                    context.latestRecordTimeMs);
 
             listener.onResponse(datafeedJob);
         };
@@ -92,12 +102,13 @@ public class DatafeedJobBuilder {
 
         // Create data extractor factory
         Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
+            context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
             DataExtractorFactory.create(
                 client,
                 datafeedConfigHolder.get(),
                 jobHolder.get(),
                 xContentRegistry,
-                new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
+                context.timingStatsReporter,
                 dataExtractorFactoryHandler);
         };
 
@@ -189,5 +200,6 @@ public class DatafeedJobBuilder {
         volatile long latestFinalBucketEndMs = -1L;
         volatile long latestRecordTimeMs = -1L;
         volatile DataExtractorFactory dataExtractorFactory;
+        volatile DatafeedTimingStatsReporter timingStatsReporter;
     }
 }

+ 34 - 10
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.datafeed;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 
 import java.util.Objects;
@@ -46,32 +47,55 @@ public class DatafeedTimingStatsReporter {
             return;
         }
         currentTimingStats.incrementTotalSearchTimeMs(searchDuration.millis());
+        flushIfDifferSignificantly();
+    }
+
+    /**
+     * Reports the data counts received from the autodetect process.
+     */
+    public void reportDataCounts(DataCounts dataCounts) {
+        if (dataCounts == null) {
+            return;
+        }
+        currentTimingStats.setBucketCount(dataCounts.getBucketCount());
+        flushIfDifferSignificantly();
+    }
+
+    private void flushIfDifferSignificantly() {
         if (differSignificantly(currentTimingStats, persistedTimingStats)) {
-            // TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
-            flush(WriteRequest.RefreshPolicy.IMMEDIATE);
+            flush();
         }
     }
 
-    private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
+    private void flush() {
         persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
-        jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
+        // TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
+        jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
     }
 
     /**
      * Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
      */
     public static boolean differSignificantly(DatafeedTimingStats stats1, DatafeedTimingStats stats2) {
-        return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs());
+        return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs())
+            || differSignificantly(stats1.getAvgSearchTimePerBucketMs(), stats2.getAvgSearchTimePerBucketMs());
     }
 
     /**
-     * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
+     * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO or
+     * the absolute difference |value1 - value2| is greater than MAX_VALID_ABS_DIFFERENCE_MS.
      * This can be interpreted as values { value1, value2 } differing significantly from each other.
+     * This method also returns:
+     *   - {@code true} in case one value is {@code null} while the other is not.
+     *   - {@code false} in case both values are {@code null}.
      */
-    private static boolean differSignificantly(double value1, double value2) {
-        return (value2 / value1 < MIN_VALID_RATIO)
-            || (value1 / value2 < MIN_VALID_RATIO)
-            || Math.abs(value1 - value2) > MAX_VALID_ABS_DIFFERENCE_MS;
+    private static boolean differSignificantly(Double value1, Double value2) {
+        if (value1 != null && value2 != null) {
+            return (value2 / value1 < MIN_VALID_RATIO)
+                || (value1 / value2 < MIN_VALID_RATIO)
+                || Math.abs(value1 - value2) > MAX_VALID_ABS_DIFFERENCE_MS;
+        }
+        return (value1 != null) || (value2 != null);
     }
 
     /**

+ 5 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java

@@ -73,10 +73,11 @@ public class DatafeedJobTests extends ESTestCase {
     private Auditor auditor;
     private DataExtractorFactory dataExtractorFactory;
     private DataExtractor dataExtractor;
+    private DatafeedTimingStatsReporter timingStatsReporter;
     private Client client;
     private DelayedDataDetector delayedDataDetector;
     private DataDescription.Builder dataDescription;
-    ActionFuture<PostDataAction.Response> postDataFuture;
+    private ActionFuture<PostDataAction.Response> postDataFuture;
     private ActionFuture<FlushJobAction.Response> flushJobFuture;
     private ActionFuture<IndexResponse> indexFuture;
     private ArgumentCaptor<FlushJobAction.Request> flushJobRequests;
@@ -93,6 +94,7 @@ public class DatafeedJobTests extends ESTestCase {
         dataExtractorFactory = mock(DataExtractorFactory.class);
         dataExtractor = mock(DataExtractor.class);
         when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor);
+        timingStatsReporter = mock(DatafeedTimingStatsReporter.class);
         client = mock(Client.class);
         ThreadPool threadPool = mock(ThreadPool.class);
         when(client.threadPool()).thenReturn(threadPool);
@@ -455,7 +457,7 @@ public class DatafeedJobTests extends ESTestCase {
     private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
                                             long latestRecordTimeMs) {
         Supplier<Long> currentTimeSupplier = () -> currentTime;
-        return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor,
-                currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs);
+        return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter,
+                client, auditor, currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs);
     }
 }

+ 64 - 15
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporterTests.java

@@ -9,6 +9,7 @@ import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
 import org.junit.Before;
 import org.mockito.InOrder;
@@ -18,6 +19,7 @@ import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 public class DatafeedTimingStatsReporterTests extends ESTestCase {
 
@@ -31,59 +33,106 @@ public class DatafeedTimingStatsReporterTests extends ESTestCase {
         jobResultsPersister = mock(JobResultsPersister.class);
     }
 
+    public void testReportSearchDuration_Null() {
+        DatafeedTimingStatsReporter timingStatsReporter =
+            new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
+
+        timingStatsReporter.reportSearchDuration(null);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
+
+        verifyZeroInteractions(jobResultsPersister);
+    }
+
     public void testReportSearchDuration() {
         DatafeedTimingStatsReporter timingStatsReporter =
-            new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10000.0), jobResultsPersister);
-        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10000.0)));
+            new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
 
         timingStatsReporter.reportSearchDuration(ONE_SECOND);
-        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 11000.0)));
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 10, 11000.0)));
 
         timingStatsReporter.reportSearchDuration(ONE_SECOND);
-        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 12000.0)));
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 10, 12000.0)));
 
         timingStatsReporter.reportSearchDuration(ONE_SECOND);
-        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 13000.0)));
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 10, 13000.0)));
 
         timingStatsReporter.reportSearchDuration(ONE_SECOND);
-        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 14000.0)));
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 10, 14000.0)));
 
         InOrder inOrder = inOrder(jobResultsPersister);
         inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
-            new DatafeedTimingStats(JOB_ID, 5, 12000.0), RefreshPolicy.IMMEDIATE);
+            new DatafeedTimingStats(JOB_ID, 5, 10, 12000.0), RefreshPolicy.IMMEDIATE);
+        inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
+            new DatafeedTimingStats(JOB_ID, 7, 10, 14000.0), RefreshPolicy.IMMEDIATE);
+        verifyNoMoreInteractions(jobResultsPersister);
+    }
+
+    public void testReportDataCounts_Null() {
+        DatafeedTimingStatsReporter timingStatsReporter =
+            new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0), jobResultsPersister);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
+
+        timingStatsReporter.reportDataCounts(null);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10, 10000.0)));
+
+        verifyZeroInteractions(jobResultsPersister);
+    }
+
+    public void testReportDataCounts() {
+        DataCounts dataCounts = new DataCounts(JOB_ID);
+        dataCounts.incrementBucketCount(20);
+        DatafeedTimingStatsReporter timingStatsReporter =
+            new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, dataCounts.getBucketCount(), 10000.0), jobResultsPersister);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 20, 10000.0)));
+
+        dataCounts.incrementBucketCount(1);
+        timingStatsReporter.reportDataCounts(dataCounts);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 21, 10000.0)));
+
+        dataCounts.incrementBucketCount(1);
+        timingStatsReporter.reportDataCounts(dataCounts);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 22, 10000.0)));
+
+        dataCounts.incrementBucketCount(1);
+        timingStatsReporter.reportDataCounts(dataCounts);
+        assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0)));
+
+        InOrder inOrder = inOrder(jobResultsPersister);
         inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
-            new DatafeedTimingStats(JOB_ID, 7, 14000.0), RefreshPolicy.IMMEDIATE);
+            new DatafeedTimingStats(JOB_ID, 3, 23, 10000.0), RefreshPolicy.IMMEDIATE);
         verifyNoMoreInteractions(jobResultsPersister);
     }
 
     public void testTimingStatsDifferSignificantly() {
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1000.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0)),
             is(false));
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1100.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1100.0)),
             is(false));
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1120.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 1120.0)),
             is(true));
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11000.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11000.0)),
             is(false));
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11200.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 11200.0)),
             is(true));
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110000.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110000.0)),
             is(false));
         assertThat(
             DatafeedTimingStatsReporter.differSignificantly(
-                new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110001.0)),
+                new DatafeedTimingStats(JOB_ID, 5, 10, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 10, 110001.0)),
             is(true));
     }
 }

+ 2 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java

@@ -246,7 +246,7 @@ public class JobResultsPersisterTests extends ESTestCase {
             .when(client).index(any(), any(ActionListener.class));
 
         JobResultsPersister persister = new JobResultsPersister(client);
-        DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 666.0);
+        DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 66, 666.0);
         persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
 
         ArgumentCaptor<IndexRequest> indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
@@ -261,6 +261,7 @@ public class JobResultsPersisterTests extends ESTestCase {
                 Map.of(
                     "job_id", "foo",
                     "search_count", 6,
+                    "bucket_count", 66,
                     "total_search_time_ms", 666.0)));
 
         verify(client, times(1)).threadPool();

+ 8 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java

@@ -903,12 +903,14 @@ public class JobResultsProviderTests extends ESTestCase {
                 Map.of(
                     Job.ID.getPreferredName(), "foo",
                     DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6,
+                    DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66,
                     DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0));
         List<Map<String, Object>> sourceBar =
             Arrays.asList(
                 Map.of(
                     Job.ID.getPreferredName(), "bar",
                     DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 7,
+                    DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 77,
                     DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 777.0));
         SearchResponse responseFoo = createSearchResponse(sourceFoo);
         SearchResponse responseBar = createSearchResponse(sourceBar);
@@ -943,7 +945,10 @@ public class JobResultsProviderTests extends ESTestCase {
             statsByJobId ->
                 assertThat(
                     statsByJobId,
-                    equalTo(Map.of("foo", new DatafeedTimingStats("foo", 6, 666.0), "bar", new DatafeedTimingStats("bar", 7, 777.0)))),
+                    equalTo(
+                        Map.of(
+                            "foo", new DatafeedTimingStats("foo", 6, 66, 666.0),
+                            "bar", new DatafeedTimingStats("bar", 7, 77, 777.0)))),
             e -> { throw new AssertionError(); });
 
         verify(client).threadPool();
@@ -961,6 +966,7 @@ public class JobResultsProviderTests extends ESTestCase {
                 Map.of(
                     Job.ID.getPreferredName(), "foo",
                     DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6,
+                    DatafeedTimingStats.BUCKET_COUNT.getPreferredName(), 66,
                     DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0));
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(
@@ -971,7 +977,7 @@ public class JobResultsProviderTests extends ESTestCase {
         JobResultsProvider provider = createProvider(client);
         provider.datafeedTimingStats(
             "foo",
-            stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 666.0))),
+            stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 66, 666.0))),
             e -> { throw new AssertionError(); });
 
         verify(client).prepareSearch(indexName);

+ 1 - 0
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java

@@ -175,6 +175,7 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
                        + "\"timing_stats\":{"
                         + "\"job_id\":\"_job_id\","
                         + "\"bucket_count\":100,"
+                        + "\"total_bucket_processing_time_ms\":2000.0,"
                         + "\"minimum_bucket_processing_time_ms\":10.0,"
                         + "\"maximum_bucket_processing_time_ms\":30.0,"
                         + "\"average_bucket_processing_time_ms\":20.0,"

+ 10 - 6
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/get_datafeed_stats.yml

@@ -175,15 +175,18 @@ setup:
       ml.start_datafeed:
         datafeed_id: "datafeed-1"
         start: 0
+  - match: { started: true}
 
   - do:
       ml.get_datafeed_stats:
         datafeed_id: datafeed-1
-  - match: { datafeeds.0.datafeed_id: "datafeed-1"}
-  - match: { datafeeds.0.state: "started"}
-  - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
-  - match: { datafeeds.0.timing_stats.search_count: 0}
-  - match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
+  - match:  { datafeeds.0.datafeed_id: "datafeed-1"}
+  - match:  { datafeeds.0.state: "started"}
+  - match:  { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
+  - match:  { datafeeds.0.timing_stats.search_count: 0}
+  - match:  { datafeeds.0.timing_stats.bucket_count: 0}
+  - match:  { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
+  - is_false: datafeeds.0.timing_stats.average_search_time_per_bucket_ms
 
   - do:
       ml.stop_datafeed:
@@ -196,8 +199,9 @@ setup:
   - match: { datafeeds.0.datafeed_id: "datafeed-1"}
   - match: { datafeeds.0.state: "stopped"}
   - match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
-  # TODO: Change "gte 0" to "match 1" once https://github.com/elastic/elasticsearch/issues/44132 is fixed
+  # We don't really know at this point if datafeed managed to perform at least one search, hence the very relaxed assertion
   - gte:   { datafeeds.0.timing_stats.search_count: 0}
+  - gte:   { datafeeds.0.timing_stats.bucket_count: 0}
   - gte:   { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
 
 ---

+ 5 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml

@@ -101,6 +101,7 @@ setup:
   - is_true:  jobs.0.open_time
   - match:  { jobs.0.timing_stats.job_id: job-stats-test }
   - match:  { jobs.0.timing_stats.bucket_count: 1 }  # Records are 1h apart and bucket span is 1h so 1 bucket is produced
+  - gte:    { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
   - gte:    { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
   - gte:    { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
   - gte:    { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
@@ -140,6 +141,7 @@ setup:
   - is_false: jobs.0.open_time
   - match:  { jobs.0.timing_stats.job_id: job-stats-test }
   - match:  { jobs.0.timing_stats.bucket_count: 1 }  # Records are 1h apart and bucket span is 1h so 1 bucket is produced
+  - gte:    { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
   - gte:    { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
   - gte:    { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
   - gte:    { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
@@ -158,6 +160,7 @@ setup:
   - is_true:  jobs.0.open_time
   - match:  { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
   - match:  { jobs.0.timing_stats.bucket_count: 0 }
+  - match:  { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
   - is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
   - is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
   - is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
@@ -342,6 +345,7 @@ setup:
   - is_false: jobs.0.open_time
   - match:  { jobs.0.timing_stats.job_id: job-stats-test }
   - match:  { jobs.0.timing_stats.bucket_count: 0 }
+  - match:  { jobs.0.timing_stats.total_bucket_processing_time_ms: 0.0 }
   - is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
   - is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
   - is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
@@ -356,6 +360,7 @@ setup:
   - is_false: jobs.1.open_time
   - match:  { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
   - match:  { jobs.1.timing_stats.bucket_count: 0 }
+  - match:  { jobs.1.timing_stats.total_bucket_processing_time_ms: 0.0 }
   - is_false: jobs.1.timing_stats.minimum_bucket_processing_time_ms
   - is_false: jobs.1.timing_stats.maximum_bucket_processing_time_ms
   - is_false: jobs.1.timing_stats.average_bucket_processing_time_ms