Browse Source

Report exponential_avg_bucket_processing_time which gives more weight to recent buckets (#43189)

Przemysław Witek 6 years ago
parent
commit
13596c807a
15 changed files with 301 additions and 106 deletions
  1. 25 4
      client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java
  2. 15 11
      client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java
  3. 8 1
      docs/reference/ml/apis/get-job-stats.asciidoc
  4. 31 2
      docs/reference/ml/apis/jobcounts.asciidoc
  5. 3 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java
  6. 50 8
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java
  7. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java
  8. 0 1
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java
  9. 47 16
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java
  10. 43 10
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java
  11. 3 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java
  12. 3 2
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java
  13. 3 3
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java
  14. 3 2
      x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java
  15. 66 44
      x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml

+ 25 - 4
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java

@@ -42,12 +42,15 @@ public class TimingStats implements ToXContentObject {
     public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
     public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
     public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
+    public static final ParseField EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS =
+        new ParseField("exponential_average_bucket_processing_time_ms");
 
     public static final ConstructingObjectParser<TimingStats, Void> PARSER =
         new ConstructingObjectParser<>(
             "timing_stats",
             true,
-            args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
+            args ->
+                new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
 
     static {
         PARSER.declareString(constructorArg(), Job.ID);
@@ -55,6 +58,7 @@ public class TimingStats implements ToXContentObject {
         PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
+        PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS);
     }
 
     private final String jobId;
@@ -62,18 +66,21 @@ public class TimingStats implements ToXContentObject {
     private Double minBucketProcessingTimeMs;
     private Double maxBucketProcessingTimeMs;
     private Double avgBucketProcessingTimeMs;
+    private Double exponentialAvgBucketProcessingTimeMs;
 
     public TimingStats(
             String jobId,
             long bucketCount,
             @Nullable Double minBucketProcessingTimeMs,
             @Nullable Double maxBucketProcessingTimeMs,
-            @Nullable Double avgBucketProcessingTimeMs) {
+            @Nullable Double avgBucketProcessingTimeMs,
+            @Nullable Double exponentialAvgBucketProcessingTimeMs) {
         this.jobId = jobId;
         this.bucketCount = bucketCount;
         this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
         this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
         this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
+        this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
     }
 
     public String getJobId() {
@@ -96,6 +103,10 @@ public class TimingStats implements ToXContentObject {
         return avgBucketProcessingTimeMs;
     }
 
+    public Double getExponentialAvgBucketProcessingTimeMs() {
+        return exponentialAvgBucketProcessingTimeMs;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
         builder.startObject();
@@ -110,6 +121,9 @@ public class TimingStats implements ToXContentObject {
         if (avgBucketProcessingTimeMs != null) {
             builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
         }
+        if (exponentialAvgBucketProcessingTimeMs != null) {
+            builder.field(EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
+        }
         builder.endObject();
         return builder;
     }
@@ -123,12 +137,19 @@ public class TimingStats implements ToXContentObject {
             && this.bucketCount == that.bucketCount
             && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
             && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
-            && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
+            && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
+            && Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
+        return Objects.hash(
+            jobId,
+            bucketCount,
+            minBucketProcessingTimeMs,
+            maxBucketProcessingTimeMs,
+            avgBucketProcessingTimeMs,
+            exponentialAvgBucketProcessingTimeMs);
     }
 
     @Override

+ 15 - 11
client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractXContentTestCase;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 
 public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
 
@@ -33,6 +34,7 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
             randomLong(),
             randomBoolean() ? null : randomDouble(),
             randomBoolean() ? null : randomDouble(),
+            randomBoolean() ? null : randomDouble(),
             randomBoolean() ? null : randomDouble());
     }
 
@@ -52,29 +54,31 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
     }
 
     public void testConstructor() {
-        TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
+        TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(7L));
         assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
         assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
         assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
+        assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
     }
 
     public void testConstructor_NullValues() {
-        TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null);
+        TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null, null);
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(7L));
-        assertNull(stats.getMinBucketProcessingTimeMs());
-        assertNull(stats.getMaxBucketProcessingTimeMs());
-        assertNull(stats.getAvgBucketProcessingTimeMs());
+        assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
+        assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
+        assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
+        assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
     }
 
     public void testEquals() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
 
         assertTrue(stats1.equals(stats1));
         assertTrue(stats1.equals(stats2));
@@ -82,9 +86,9 @@ public class TimingStatsTests extends AbstractXContentTestCase<TimingStats> {
     }
 
     public void testHashCode() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
 
         assertEquals(stats1.hashCode(), stats1.hashCode());
         assertEquals(stats1.hashCode(), stats2.hashCode());

+ 8 - 1
docs/reference/ml/apis/get-job-stats.asciidoc

@@ -105,7 +105,14 @@ The API returns the following results:
         "log_time": 1491948163000,
         "timestamp": 1455234600000
       },
-      "state": "closed"
+      "state": "closed",
+      "timing_stats": {
+        "job_id": "farequote",
+        "minimum_bucket_processing_time_ms": 0.0,
+        "maximum_bucket_processing_time_ms": 15.0,
+        "average_bucket_processing_time_ms": 8.75,
+        "exponential_average_bucket_processing_time_ms": 6.1435899
+      }
     }
   ]
 }

+ 31 - 2
docs/reference/ml/apis/jobcounts.asciidoc

@@ -19,11 +19,15 @@ progress of a job.
 
 `model_size_stats`::
   (object) An object that provides information about the size and contents of the model.
-  See <<ml-modelsizestats,model size stats objects>>
+  See <<ml-modelsizestats,model size stats objects>>.
 
 `forecasts_stats`::
   (object) An object that provides statistical information about forecasts
-  of this job. See <<ml-forecastsstats, forecasts stats objects>>
+  of this job. See <<ml-forecastsstats, forecasts stats objects>>.
+
+`timing_stats`::
+  (object) An object that provides statistical information about timing aspect
+  of this job. See <<ml-timingstats, timing stats objects>>.
 
 `node`::
   (object) For open jobs only, contains information about the node where the
@@ -209,6 +213,31 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow
 NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise
 these fields are omitted.
 
+[float]
+[[ml-timingstats]]
+==== Timing Stats Objects
+
+The `timing_stats` object shows timing-related statistics about the job's progress. It has the following properties:
+
+`job_id`::
+  (string) A numerical character string that uniquely identifies the job.
+
+`bucket_count`::
+  (long) The number of buckets processed.
+
+`minimum_bucket_processing_time_ms`::
+  (double) Minimum among all bucket processing times in milliseconds.
+
+`maximum_bucket_processing_time_ms`::
+  (double) Maximum among all bucket processing times in milliseconds.
+
+`average_bucket_processing_time_ms`::
+  (double) Average of all bucket processing times in milliseconds.
+
+`exponential_average_bucket_processing_time_ms`::
+  (double) Exponential moving average of all bucket processing times in milliseconds.
+
+
 [float]
 [[ml-stats-node]]
 ==== Node Objects

+ 3 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

@@ -863,6 +863,9 @@ public class ElasticsearchMappings {
             .endObject()
             .startObject(TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName())
                 .field(TYPE, DOUBLE)
+            .endObject()
+            .startObject(TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName())
+                .field(TYPE, DOUBLE)
             .endObject();
     }
 

+ 50 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStats.java

@@ -31,6 +31,8 @@ public class TimingStats implements ToXContentObject, Writeable {
     public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms");
     public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms");
     public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms");
+    public static final ParseField EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS =
+        new ParseField("exponential_average_bucket_processing_time_ms");
 
     public static final ParseField TYPE = new ParseField("timing_stats");
 
@@ -38,7 +40,8 @@ public class TimingStats implements ToXContentObject, Writeable {
         new ConstructingObjectParser<>(
             TYPE.getPreferredName(),
             true,
-            args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4]));
+            args ->
+                new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4], (Double) args[5]));
 
     static {
         PARSER.declareString(constructorArg(), Job.ID);
@@ -46,6 +49,7 @@ public class TimingStats implements ToXContentObject, Writeable {
         PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS);
         PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS);
+        PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS);
     }
 
     public static String documentId(String jobId) {
@@ -57,26 +61,35 @@ public class TimingStats implements ToXContentObject, Writeable {
     private Double minBucketProcessingTimeMs;
     private Double maxBucketProcessingTimeMs;
     private Double avgBucketProcessingTimeMs;
+    private Double exponentialAvgBucketProcessingTimeMs;
 
     public TimingStats(
             String jobId,
             long bucketCount,
             @Nullable Double minBucketProcessingTimeMs,
             @Nullable Double maxBucketProcessingTimeMs,
-            @Nullable Double avgBucketProcessingTimeMs) {
+            @Nullable Double avgBucketProcessingTimeMs,
+            @Nullable Double exponentialAvgBucketProcessingTimeMs) {
         this.jobId = jobId;
         this.bucketCount = bucketCount;
         this.minBucketProcessingTimeMs = minBucketProcessingTimeMs;
         this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs;
         this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs;
+        this.exponentialAvgBucketProcessingTimeMs = exponentialAvgBucketProcessingTimeMs;
     }
 
     public TimingStats(String jobId) {
-        this(jobId, 0, null, null, null);
+        this(jobId, 0, null, null, null, null);
     }
 
     public TimingStats(TimingStats lhs) {
-        this(lhs.jobId, lhs.bucketCount, lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs);
+        this(
+            lhs.jobId,
+            lhs.bucketCount,
+            lhs.minBucketProcessingTimeMs,
+            lhs.maxBucketProcessingTimeMs,
+            lhs.avgBucketProcessingTimeMs,
+            lhs.exponentialAvgBucketProcessingTimeMs);
     }
 
     public TimingStats(StreamInput in) throws IOException {
@@ -85,6 +98,7 @@ public class TimingStats implements ToXContentObject, Writeable {
         this.minBucketProcessingTimeMs = in.readOptionalDouble();
         this.maxBucketProcessingTimeMs = in.readOptionalDouble();
         this.avgBucketProcessingTimeMs = in.readOptionalDouble();
+        this.exponentialAvgBucketProcessingTimeMs = in.readOptionalDouble();
     }
 
     public String getJobId() {
@@ -107,12 +121,16 @@ public class TimingStats implements ToXContentObject, Writeable {
         return avgBucketProcessingTimeMs;
     }
 
+    public Double getExponentialAvgBucketProcessingTimeMs() {
+        return exponentialAvgBucketProcessingTimeMs;
+    }
+
     /**
      * Updates the statistics (min, max, avg) for the given data point (bucket processing time).
      */
     public void updateStats(double bucketProcessingTimeMs) {
         if (bucketProcessingTimeMs < 0.0) {
-            throw new IllegalArgumentException("bucketProcessingTimeMs must be positive, was: " + bucketProcessingTimeMs);
+            throw new IllegalArgumentException("bucketProcessingTimeMs must be non-negative, was: " + bucketProcessingTimeMs);
         }
         if (minBucketProcessingTimeMs == null || bucketProcessingTimeMs < minBucketProcessingTimeMs) {
             minBucketProcessingTimeMs = bucketProcessingTimeMs;
@@ -127,9 +145,21 @@ public class TimingStats implements ToXContentObject, Writeable {
             // bucket processing times.
             avgBucketProcessingTimeMs = (bucketCount * avgBucketProcessingTimeMs + bucketProcessingTimeMs) / (bucketCount + 1);
         }
+        if (exponentialAvgBucketProcessingTimeMs == null) {
+            exponentialAvgBucketProcessingTimeMs = bucketProcessingTimeMs;
+        } else {
+            // Calculate the exponential moving average (see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) of
+            // bucket processing times.
+            exponentialAvgBucketProcessingTimeMs = (1 - ALPHA) * exponentialAvgBucketProcessingTimeMs + ALPHA * bucketProcessingTimeMs;
+        }
         bucketCount++;
     }
 
+    /**
+     * Constant smoothing factor used for calculating exponential moving average. Represents the degree of weighting decrease.
+     */
+    private static double ALPHA = 0.01;
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(jobId);
@@ -137,6 +167,7 @@ public class TimingStats implements ToXContentObject, Writeable {
         out.writeOptionalDouble(minBucketProcessingTimeMs);
         out.writeOptionalDouble(maxBucketProcessingTimeMs);
         out.writeOptionalDouble(avgBucketProcessingTimeMs);
+        out.writeOptionalDouble(exponentialAvgBucketProcessingTimeMs);
     }
 
     @Override
@@ -153,6 +184,9 @@ public class TimingStats implements ToXContentObject, Writeable {
         if (avgBucketProcessingTimeMs != null) {
             builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs);
         }
+        if (exponentialAvgBucketProcessingTimeMs != null) {
+            builder.field(EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), exponentialAvgBucketProcessingTimeMs);
+        }
         builder.endObject();
         return builder;
     }
@@ -166,12 +200,19 @@ public class TimingStats implements ToXContentObject, Writeable {
             && this.bucketCount == that.bucketCount
             && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs)
             && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs)
-            && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs);
+            && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs)
+            && Objects.equals(this.exponentialAvgBucketProcessingTimeMs, that.exponentialAvgBucketProcessingTimeMs);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs);
+        return Objects.hash(
+            jobId,
+            bucketCount,
+            minBucketProcessingTimeMs,
+            maxBucketProcessingTimeMs,
+            avgBucketProcessingTimeMs,
+            exponentialAvgBucketProcessingTimeMs);
     }
 
     @Override
@@ -185,7 +226,8 @@ public class TimingStats implements ToXContentObject, Writeable {
     public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) {
         return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs)
             || differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs)
-            || differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs);
+            || differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs)
+            || differSignificantly(stats1.exponentialAvgBucketProcessingTimeMs, stats2.exponentialAvgBucketProcessingTimeMs);
     }
 
     /**

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java

@@ -179,6 +179,7 @@ public final class ReservedFieldNames {
             TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
             TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
             TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
+            TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
 
             GetResult._ID,
             GetResult._INDEX,

+ 0 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java

@@ -77,7 +77,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
         // These are not reserved because they're data types, not field names
         overridden.add(Result.TYPE.getPreferredName());
         overridden.add(DataCounts.TYPE.getPreferredName());
-        overridden.add(TimingStats.TYPE.getPreferredName());
         overridden.add(CategoryDefinition.TYPE.getPreferredName());
         overridden.add(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
         overridden.add(ModelSnapshot.TYPE.getPreferredName());

+ 47 - 16
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java

@@ -8,7 +8,10 @@ package org.elasticsearch.xpack.core.ml.job.process.autodetect.state;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractSerializingTestCase;
+import org.hamcrest.CustomTypeSafeMatcher;
+import org.hamcrest.Matcher;
 
+import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
@@ -23,6 +26,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
             randomLong(),
             randomBoolean() ? null : randomDouble(),
             randomBoolean() ? null : randomDouble(),
+            randomBoolean() ? null : randomDouble(),
             randomBoolean() ? null : randomDouble());
     }
 
@@ -42,9 +46,9 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
     }
 
     public void testEquals() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
 
         assertTrue(stats1.equals(stats1));
         assertTrue(stats1.equals(stats2));
@@ -52,9 +56,9 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
     }
 
     public void testHashCode() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
-        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
+        TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23, 7.89);
 
         assertEquals(stats1.hashCode(), stats1.hashCode());
         assertEquals(stats1.hashCode(), stats2.hashCode());
@@ -69,20 +73,22 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
         assertThat(stats.getMinBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue());
         assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue());
+        assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), nullValue());
     }
 
     public void testConstructor() {
-        TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
+        TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
 
         assertThat(stats.getJobId(), equalTo(JOB_ID));
         assertThat(stats.getBucketCount(), equalTo(7L));
         assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0));
         assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0));
         assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23));
+        assertThat(stats.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
     }
 
     public void testCopyConstructor() {
-        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23);
+        TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23, 7.89);
         TimingStats stats2 = new TimingStats(stats1);
 
         assertThat(stats2.getJobId(), equalTo(JOB_ID));
@@ -90,6 +96,7 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
         assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0));
         assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0));
         assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23));
+        assertThat(stats2.getExponentialAvgBucketProcessingTimeMs(), equalTo(7.89));
         assertEquals(stats1, stats2);
         assertEquals(stats1.hashCode(), stats2.hashCode());
     }
@@ -98,19 +105,19 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
         TimingStats stats = new TimingStats(JOB_ID);
 
         stats.updateStats(3);
-        assertThat(stats, equalTo(new TimingStats(JOB_ID, 1, 3.0, 3.0, 3.0)));
+        assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 1, 3.0, 3.0, 3.0, 3.0), 1e-9));
 
         stats.updateStats(2);
-        assertThat(stats, equalTo(new TimingStats(JOB_ID, 2, 2.0, 3.0, 2.5)));
+        assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 2, 2.0, 3.0, 2.5, 2.99), 1e-9));
 
         stats.updateStats(4);
-        assertThat(stats, equalTo(new TimingStats(JOB_ID, 3, 2.0, 4.0, 3.0)));
+        assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 3, 2.0, 4.0, 3.0, 3.0001), 1e-9));
 
         stats.updateStats(1);
-        assertThat(stats, equalTo(new TimingStats(JOB_ID, 4, 1.0, 4.0, 2.5)));
+        assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 4, 1.0, 4.0, 2.5, 2.980099), 1e-9));
 
         stats.updateStats(5);
-        assertThat(stats, equalTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0)));
+        assertThat(stats, areCloseTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0, 3.00029801), 1e-9));
     }
 
     public void testDocumentId() {
@@ -120,15 +127,15 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
     public void testTimingStatsDifferSignificantly() {
         assertThat(
             TimingStats.differSignificantly(
-                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0)),
+                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0)),
             is(false));
         assertThat(
             TimingStats.differSignificantly(
-                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0)),
+                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0, 10.0)),
             is(false));
         assertThat(
             TimingStats.differSignificantly(
-                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0)),
+                new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0, 10.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0, 10.0)),
             is(true));
     }
 
@@ -143,4 +150,28 @@ public class TimingStatsTests extends AbstractSerializingTestCase<TimingStats> {
         assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true));
         assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true));
     }
+
+    /**
+     * Creates a matcher of {@link TimingStats}s that matches when an examined stats are equal
+     * to the specified <code>operand</code>, within a range of +/- <code>error</code>.
+     *
+     * @param operand
+     *     the expected value of matching stats
+     * @param error
+     *     the delta (+/-) within which matches will be allowed
+     */
+    private static Matcher<TimingStats> areCloseTo(TimingStats operand, double error) {
+        return new CustomTypeSafeMatcher<>("TimingStats close to " + operand) {
+            @Override
+            protected boolean matchesSafely(TimingStats item) {
+                return equalTo(operand.getJobId()).matches(item.getJobId())
+                    && equalTo(operand.getBucketCount()).matches(item.getBucketCount())
+                    && closeTo(operand.getMinBucketProcessingTimeMs(), error).matches(item.getMinBucketProcessingTimeMs())
+                    && closeTo(operand.getMaxBucketProcessingTimeMs(), error).matches(item.getMaxBucketProcessingTimeMs())
+                    && closeTo(operand.getAvgBucketProcessingTimeMs(), error).matches(item.getAvgBucketProcessingTimeMs())
+                    && closeTo(operand.getExponentialAvgBucketProcessingTimeMs(), error)
+                        .matches(item.getExponentialAvgBucketProcessingTimeMs());
+            }
+        };
+    }
 }

+ 43 - 10
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java

@@ -63,6 +63,8 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -164,6 +166,31 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         assertEquals(quantiles, persistedQuantiles.get());
     }
 
+    public void testProcessResults_TimingStats() throws Exception {
+        ResultsBuilder resultBuilder = new ResultsBuilder()
+                .addBucket(createBucket(true, 100))
+                .addBucket(createBucket(true, 1000))
+                .addBucket(createBucket(true, 100))
+                .addBucket(createBucket(true, 1000))
+                .addBucket(createBucket(true, 100))
+                .addBucket(createBucket(true, 1000))
+                .addBucket(createBucket(true, 100))
+                .addBucket(createBucket(true, 1000))
+                .addBucket(createBucket(true, 100))
+                .addBucket(createBucket(true, 1000));
+
+        resultProcessor.process(resultBuilder.buildTestProcess());
+        resultProcessor.awaitCompletion();
+
+        TimingStats timingStats = resultProcessor.timingStats();
+        assertThat(timingStats.getJobId(), equalTo(JOB_ID));
+        assertThat(timingStats.getBucketCount(), equalTo(10L));
+        assertThat(timingStats.getMinBucketProcessingTimeMs(), equalTo(100.0));
+        assertThat(timingStats.getMaxBucketProcessingTimeMs(), equalTo(1000.0));
+        assertThat(timingStats.getAvgBucketProcessingTimeMs(), equalTo(550.0));
+        assertThat(timingStats.getExponentialAvgBucketProcessingTimeMs(), closeTo(143.244, 1e-3));
+    }
+
     public void testParseQuantiles_GivenRenormalizationIsEnabled() throws Exception {
         when(renormalizer.isEnabled()).thenReturn(true);
 
@@ -284,18 +311,24 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         client().execute(PutJobAction.INSTANCE, request).actionGet();
     }
 
-    private Bucket createBucket(boolean isInterim) {
+    private static Bucket createBucket(boolean isInterim) {
         Bucket bucket = new BucketTests().createTestInstance(JOB_ID);
         bucket.setInterim(isInterim);
         return bucket;
     }
 
-    private Date randomDate() {
+    private static Bucket createBucket(boolean isInterim, long processingTimeMs) {
+        Bucket bucket = createBucket(isInterim);
+        bucket.setProcessingTimeMs(processingTimeMs);
+        return bucket;
+    }
+
+    private static Date randomDate() {
         // between 1970 and 2065
         return new Date(randomLongBetween(0, 3000000000000L));
     }
 
-    private List<AnomalyRecord> createRecords(boolean isInterim) {
+    private static List<AnomalyRecord> createRecords(boolean isInterim) {
         List<AnomalyRecord> records = new ArrayList<>();
 
         int count = randomIntBetween(0, 100);
@@ -310,7 +343,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         return records;
     }
 
-    private List<Influencer> createInfluencers(boolean isInterim) {
+    private static List<Influencer> createInfluencers(boolean isInterim) {
         List<Influencer> influencers = new ArrayList<>();
 
         int count = randomIntBetween(0, 100);
@@ -323,15 +356,15 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         return influencers;
     }
 
-    private CategoryDefinition createCategoryDefinition() {
+    private static CategoryDefinition createCategoryDefinition() {
         return new CategoryDefinitionTests().createTestInstance(JOB_ID);
     }
 
-    private ModelPlot createModelPlot() {
+    private static ModelPlot createModelPlot() {
         return new ModelPlotTests().createTestInstance(JOB_ID);
     }
 
-    private ModelSizeStats createModelSizeStats() {
+    private static ModelSizeStats createModelSizeStats() {
         ModelSizeStats.Builder builder = new ModelSizeStats.Builder(JOB_ID);
         builder.setTimestamp(randomDate());
         builder.setLogTime(randomDate());
@@ -344,15 +377,15 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {
         return builder.build();
     }
 
-    private ModelSnapshot createModelSnapshot() {
+    private static ModelSnapshot createModelSnapshot() {
         return new ModelSnapshot.Builder(JOB_ID).setSnapshotId(randomAlphaOfLength(12)).build();
     }
 
-    private Quantiles createQuantiles() {
+    private static Quantiles createQuantiles() {
         return new Quantiles(JOB_ID, randomDate(), randomAlphaOfLength(100));
     }
 
-    private FlushAcknowledgement createFlushAcknowledgement() {
+    private static FlushAcknowledgement createFlushAcknowledgement() {
         return new FlushAcknowledgement(randomAlphaOfLength(5), randomDate());
     }
 

+ 3 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java

@@ -203,7 +203,7 @@ public class JobResultsPersisterTests extends ESTestCase {
         Client client = mockClient(bulkRequestCaptor);
 
         JobResultsPersister persister = new JobResultsPersister(client);
-        TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23);
+        TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23, 7.89);
         persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest();
 
         verify(client, times(1)).bulk(bulkRequestCaptor.capture());
@@ -220,7 +220,8 @@ public class JobResultsPersisterTests extends ESTestCase {
                     "bucket_count", 7,
                     "minimum_bucket_processing_time_ms", 1.0,
                     "maximum_bucket_processing_time_ms", 2.0,
-                    "average_bucket_processing_time_ms", 1.23)));
+                    "average_bucket_processing_time_ms", 1.23,
+                    "exponential_average_bucket_processing_time_ms", 7.89)));
 
         verify(client, times(1)).threadPool();
         verifyNoMoreInteractions(client);

+ 3 - 2
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java

@@ -838,7 +838,8 @@ public class JobResultsProviderTests extends ESTestCase {
                     TimingStats.BUCKET_COUNT.getPreferredName(), 7,
                     TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0,
                     TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0,
-                    TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0));
+                    TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0,
+                    TimingStats.EXPONENTIAL_AVERAGE_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 777.0));
         SearchResponse response = createSearchResponse(source);
         Client client = getMockedClient(
             queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")),
@@ -848,7 +849,7 @@ public class JobResultsProviderTests extends ESTestCase {
         JobResultsProvider provider = createProvider(client);
         provider.timingStats(
             "foo",
-            stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0))),
+            stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0, 777.0))),
             e -> { throw new AssertionError(); });
 
         verify(client).prepareSearch(indexName);

+ 3 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java

@@ -15,13 +15,13 @@ public class AutodetectParamsTests extends ESTestCase {
     private static final String JOB_ID = "my-job";
 
     public void testBuilder_WithTimingStats() {
-        TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0);
+        TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0);
         AutodetectParams params = new AutodetectParams.Builder(JOB_ID).setTimingStats(timingStats).build();
         assertThat(params.timingStats(), equalTo(timingStats));
 
         timingStats.updateStats(2000.0);
-        assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75)));
-        assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0)));
+        assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75, 1010.0)));
+        assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0, 1000.0)));
     }
 
     public void testBuilder_WithoutTimingStats() {

+ 3 - 2
x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java

@@ -103,7 +103,7 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
 
         final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7);
         final ForecastStats forecastStats = new ForecastStats();
-        final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0);
+        final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0, 25.0);
         final JobStats jobStats = new JobStats(
             "_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, "_explanation", time, timingStats);
         final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);
@@ -177,7 +177,8 @@ public class JobStatsMonitoringDocTests extends BaseMonitoringDocTestCase<JobSta
                         + "\"bucket_count\":100,"
                         + "\"minimum_bucket_processing_time_ms\":10.0,"
                         + "\"maximum_bucket_processing_time_ms\":30.0,"
-                        + "\"average_bucket_processing_time_ms\":20.0"
+                        + "\"average_bucket_processing_time_ms\":20.0,"
+                        + "\"exponential_average_bucket_processing_time_ms\":25.0"
                        + "}"
                      + "}"
                     + "}", xContent.utf8ToString());

+ 66 - 44
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml

@@ -78,6 +78,7 @@ setup:
         body: >
           {"airline":"AAL","responsetime":"132.2046","time":"1403481600"}
           {"airline":"JZA","responsetime":"990.4628","time":"1403481600"}
+          {"airline":"JZA","responsetime":"244.1276","time":"1403485200"}
 
   - do:
       ml.flush_job:
@@ -88,18 +89,22 @@ setup:
   - do:
       ml.get_job_stats:
         job_id: job-stats-test
-  - match: { jobs.0.job_id : job-stats-test }
-  - match: { jobs.0.data_counts.processed_record_count: 2 }
-  - match: { jobs.0.data_counts.processed_field_count: 4 }
-  - match: { jobs.0.data_counts.input_field_count: 4 }
-  - match: { jobs.0.model_size_stats.model_bytes: 0 }
-  - match: { jobs.0.state: opened }
-  - is_true: jobs.0.node.name
-  - is_true: jobs.0.node.transport_address
-  - match: { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
-  - is_true: jobs.0.open_time
-  - match: { jobs.0.timing_stats.job_id: job-stats-test }
-  - gte:   { jobs.0.timing_stats.bucket_count: 0 }
+  - match:  { jobs.0.job_id : job-stats-test }
+  - match:  { jobs.0.data_counts.processed_record_count: 3 }
+  - match:  { jobs.0.data_counts.processed_field_count: 6 }
+  - match:  { jobs.0.data_counts.input_field_count: 6 }
+  - gte:    { jobs.0.model_size_stats.model_bytes: 0 }
+  - match:  { jobs.0.state: opened }
+  - is_true:  jobs.0.node.name
+  - is_true:  jobs.0.node.transport_address
+  - match:  { jobs.0.node.attributes.ml\.max_open_jobs: "20"}
+  - is_true:  jobs.0.open_time
+  - match:  { jobs.0.timing_stats.job_id: job-stats-test }
+  - match:  { jobs.0.timing_stats.bucket_count: 1 }  # Records are 1h apart and bucket span is 1h so 1 bucket is produced
+  - gte:    { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
+  - gte:    { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
+  - gte:    { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
+  - gte:    { jobs.0.timing_stats.exponential_average_bucket_processing_time_ms: 0.0 }
 
 ---
 "Test get job stats for closed job":
@@ -110,6 +115,7 @@ setup:
         body: >
           {"airline":"AAL","responsetime":"132.2046","time":"1403481600"}
           {"airline":"JZA","responsetime":"990.4628","time":"1403481600"}
+          {"airline":"JZA","responsetime":"244.1276","time":"1403485200"}
 
   - do:
       ml.flush_job:
@@ -124,16 +130,20 @@ setup:
   - do:
       ml.get_job_stats:
         job_id: job-stats-test
-  - match: { jobs.0.job_id : job-stats-test }
-  - match: { jobs.0.data_counts.processed_record_count: 2 }
-  - match: { jobs.0.data_counts.processed_field_count: 4}
-  - match: { jobs.0.data_counts.input_field_count: 4 }
-  - gt: { jobs.0.model_size_stats.model_bytes: 0 }
-  - match: { jobs.0.state: closed }
+  - match:  { jobs.0.job_id : job-stats-test }
+  - match:  { jobs.0.data_counts.processed_record_count: 3 }
+  - match:  { jobs.0.data_counts.processed_field_count: 6 }
+  - match:  { jobs.0.data_counts.input_field_count: 6 }
+  - gt:     { jobs.0.model_size_stats.model_bytes: 0 }
+  - match:  { jobs.0.state: closed }
   - is_false: jobs.0.node
   - is_false: jobs.0.open_time
-  - match: { jobs.0.timing_stats.job_id: job-stats-test }
-  - gte:   { jobs.0.timing_stats.bucket_count: 0 }
+  - match:  { jobs.0.timing_stats.job_id: job-stats-test }
+  - match:  { jobs.0.timing_stats.bucket_count: 1 }  # Records are 1h apart and bucket span is 1h so 1 bucket is produced
+  - gte:    { jobs.0.timing_stats.minimum_bucket_processing_time_ms: 0.0 }
+  - gte:    { jobs.0.timing_stats.maximum_bucket_processing_time_ms: 0.0 }
+  - gte:    { jobs.0.timing_stats.average_bucket_processing_time_ms: 0.0 }
+  - gte:    { jobs.0.timing_stats.exponential_average_bucket_processing_time_ms: 0.0 }
 
 ---
 "Test get job stats of datafeed job that has not received any data":
@@ -141,13 +151,17 @@ setup:
   - do:
       ml.get_job_stats:
         job_id: jobs-get-stats-datafeed-job
-  - match: { jobs.0.job_id : jobs-get-stats-datafeed-job }
-  - match: { jobs.0.data_counts.processed_record_count: 0 }
-  - match: { jobs.0.model_size_stats.model_bytes : 0 }
-  - match: { jobs.0.state: opened }
-  - is_true: jobs.0.open_time
-  - match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
-  - match: { jobs.0.timing_stats.bucket_count: 0 }
+  - match:  { jobs.0.job_id : jobs-get-stats-datafeed-job }
+  - match:  { jobs.0.data_counts.processed_record_count: 0 }
+  - match:  { jobs.0.model_size_stats.model_bytes : 0 }
+  - match:  { jobs.0.state: opened }
+  - is_true:  jobs.0.open_time
+  - match:  { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job }
+  - match:  { jobs.0.timing_stats.bucket_count: 0 }
+  - is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
+  - is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
+  - is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
+  - is_false: jobs.0.timing_stats.exponential_average_bucket_processing_time_ms
 
 ---
 "Test get all job stats with _all":
@@ -317,24 +331,32 @@ setup:
 
   - do:
       ml.get_job_stats: {}
-  - match: { count: 2 }
-  - match: { jobs.0.job_id : job-stats-test }
-  - match: { jobs.0.data_counts.processed_record_count: 0 }
-  - match: { jobs.0.data_counts.processed_field_count: 0 }
-  - match: { jobs.0.data_counts.input_field_count: 0 }
-  - match: { jobs.0.model_size_stats.model_bytes: 0 }
-  - match: { jobs.0.state: closed }
+  - match:  { count: 2 }
+  - match:  { jobs.0.job_id : job-stats-test }
+  - match:  { jobs.0.data_counts.processed_record_count: 0 }
+  - match:  { jobs.0.data_counts.processed_field_count: 0 }
+  - match:  { jobs.0.data_counts.input_field_count: 0 }
+  - match:  { jobs.0.model_size_stats.model_bytes: 0 }
+  - match:  { jobs.0.state: closed }
   - is_false: jobs.0.node
   - is_false: jobs.0.open_time
-  - match: { jobs.0.timing_stats.job_id: job-stats-test }
-  - gte:   { jobs.0.timing_stats.bucket_count: 0 }
-  - match: { jobs.1.job_id : jobs-get-stats-datafeed-job }
-  - match: { jobs.1.data_counts.processed_record_count: 0 }
-  - match: { jobs.1.data_counts.processed_field_count: 0 }
-  - match: { jobs.1.data_counts.input_field_count: 0 }
-  - match: { jobs.1.model_size_stats.model_bytes: 0 }
-  - match: { jobs.1.state: closed }
+  - match:  { jobs.0.timing_stats.job_id: job-stats-test }
+  - match:  { jobs.0.timing_stats.bucket_count: 0 }
+  - is_false: jobs.0.timing_stats.minimum_bucket_processing_time_ms
+  - is_false: jobs.0.timing_stats.maximum_bucket_processing_time_ms
+  - is_false: jobs.0.timing_stats.average_bucket_processing_time_ms
+  - is_false: jobs.0.timing_stats.exponential_average_bucket_processing_time_ms
+  - match:  { jobs.1.job_id : jobs-get-stats-datafeed-job }
+  - match:  { jobs.1.data_counts.processed_record_count: 0 }
+  - match:  { jobs.1.data_counts.processed_field_count: 0 }
+  - match:  { jobs.1.data_counts.input_field_count: 0 }
+  - match:  { jobs.1.model_size_stats.model_bytes: 0 }
+  - match:  { jobs.1.state: closed }
   - is_false: jobs.1.node
   - is_false: jobs.1.open_time
-  - match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
-  - gte:   { jobs.1.timing_stats.bucket_count: 0 }
+  - match:  { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job }
+  - match:  { jobs.1.timing_stats.bucket_count: 0 }
+  - is_false: jobs.1.timing_stats.minimum_bucket_processing_time_ms
+  - is_false: jobs.1.timing_stats.maximum_bucket_processing_time_ms
+  - is_false: jobs.1.timing_stats.average_bucket_processing_time_ms
+  - is_false: jobs.1.timing_stats.exponential_average_bucket_processing_time_ms