Browse Source

Add rate aggregation (#61369)

Adds a new rate aggregation that can calculate a document rate for buckets
of a date_histogram.

Closes #60674
Igor Motov 5 years ago
parent
commit
f107dba741
25 changed files with 1829 additions and 40 deletions
  1. 1 0
      docs/reference/aggregations/metrics.asciidoc
  2. 257 0
      docs/reference/aggregations/metrics/rate-aggregation.asciidoc
  3. 1 0
      docs/reference/rest-api/usage.asciidoc
  4. 133 32
      server/src/main/java/org/elasticsearch/common/Rounding.java
  5. 15 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
  6. 29 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/SizedBucketAggregator.java
  7. 29 0
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/SizedBucketAggregatorBuilder.java
  8. 17 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java
  9. 14 3
      server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceAggregationBuilder.java
  10. 71 0
      server/src/test/java/org/elasticsearch/common/RoundingTests.java
  11. 9 1
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java
  12. 108 0
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/InternalRate.java
  13. 20 0
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/Rate.java
  14. 173 0
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java
  15. 135 0
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregator.java
  16. 74 0
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java
  17. 26 0
      x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorSupplier.java
  18. 1 0
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java
  19. 103 0
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/InternalRateTests.java
  20. 89 0
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilderTests.java
  21. 445 0
      x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java
  22. 2 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java
  23. 36 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/rate.yml
  24. 39 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml
  25. 2 1
      x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

+ 1 - 0
docs/reference/aggregations/metrics.asciidoc

@@ -51,6 +51,7 @@ include::metrics/valuecount-aggregation.asciidoc[]
 
 include::metrics/t-test-aggregation.asciidoc[]
 
+include::metrics/rate-aggregation.asciidoc[]
 
 
 

+ 257 - 0
docs/reference/aggregations/metrics/rate-aggregation.asciidoc

@@ -0,0 +1,257 @@
+[role="xpack"]
+[testenv="basic"]
+[[search-aggregations-metrics-rate-aggregation]]
+=== Rate Aggregation
+
+A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each
+`date_histogram` bucket.
+
+==== Syntax
+
+A `rate` aggregation looks like this in isolation:
+
+[source,js]
+--------------------------------------------------
+{
+  "rate": {
+    "unit": "month",
+    "field": "requests"
+  }
+}
+--------------------------------------------------
+// NOTCONSOLE
+
+The following request will group all sales records into monthly bucket and than convert the number of sales transaction in each bucket
+into per annual sales rate.
+
+[source,console]
+--------------------------------------------------
+GET sales/_search
+{
+  "size": 0,
+  "aggs": {
+    "by_date": {
+      "date_histogram": {
+        "field": "date",
+        "calendar_interval": "month"  <1>
+      },
+      "aggs": {
+        "my_rate": {
+          "rate": {
+            "unit": "year"  <2>
+          }
+        }
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST[setup:sales]
+<1> Histogram is grouped by month.
+<2> But the rate is converted into annual rate.
+
+The response will return the annual rate of transaction in each bucket. Since there are 12 months per year, the annual rate will
+be automatically calculated by multiplying monthly rate by 12.
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+  "aggregations" : {
+    "by_date" : {
+      "buckets" : [
+        {
+          "key_as_string" : "2015/01/01 00:00:00",
+          "key" : 1420070400000,
+          "doc_count" : 3,
+          "my_rate" : {
+            "value" : 36.0
+          }
+        },
+        {
+          "key_as_string" : "2015/02/01 00:00:00",
+          "key" : 1422748800000,
+          "doc_count" : 2,
+          "my_rate" : {
+            "value" : 24.0
+          }
+        },
+        {
+          "key_as_string" : "2015/03/01 00:00:00",
+          "key" : 1425168000000,
+          "doc_count" : 2,
+          "my_rate" : {
+            "value" : 24.0
+          }
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+Instead of counting the number of documents, it is also possible to calculate a sum of all values of the fields in the documents in each
+bucket. The following request will group all sales records into monthly bucket and than calculate the total monthly sales and convert them
+into average daily sales.
+
+[source,console]
+--------------------------------------------------
+GET sales/_search
+{
+  "size": 0,
+  "aggs": {
+    "by_date": {
+      "date_histogram": {
+        "field": "date",
+        "calendar_interval": "month"  <1>
+      },
+      "aggs": {
+        "avg_price": {
+          "rate": {
+            "field": "price", <2>
+            "unit": "day"  <3>
+          }
+        }
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST[setup:sales]
+<1> Histogram is grouped by month.
+<2> Calculate sum of all sale prices
+<3> Convert to average daily sales
+
+The response will contain the average daily sale prices for each month.
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+  "aggregations" : {
+    "by_date" : {
+      "buckets" : [
+        {
+          "key_as_string" : "2015/01/01 00:00:00",
+          "key" : 1420070400000,
+          "doc_count" : 3,
+          "avg_price" : {
+            "value" : 17.741935483870968
+          }
+        },
+        {
+          "key_as_string" : "2015/02/01 00:00:00",
+          "key" : 1422748800000,
+          "doc_count" : 2,
+          "avg_price" : {
+            "value" : 2.142857142857143
+          }
+        },
+        {
+          "key_as_string" : "2015/03/01 00:00:00",
+          "key" : 1425168000000,
+          "doc_count" : 2,
+          "avg_price" : {
+            "value" : 12.096774193548388
+          }
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
+
+
+==== Relationship between bucket sizes and rate
+
+The `rate` aggregation supports all rate that can be used <<calendar_intervals,calendar_intervals parameter>> of `date_histogram`
+aggregation. The specified rate should compatible with the `date_histogram` aggregation interval, i.e. it should be possible to
+convert the bucket size into the rate. By default the interval of the `date_histogram` is used.
+
+`"rate": "second"`:: compatible with all intervals
+`"rate": "minute"`:: compatible with all intervals
+`"rate": "hour"`:: compatible with all intervals
+`"rate": "day"`:: compatible with all intervals
+`"rate": "week"`:: compatible with all intervals
+`"rate": "month"`:: compatible with only with `month`, `quarter` and `year` calendar intervals
+`"rate": "quarter"`:: compatible with only with `month`, `quarter` and `year` calendar intervals
+`"rate": "year"`:: compatible with only with `month`, `quarter` and `year` calendar intervals
+
+==== Script
+
+The `rate` aggregation also supports scripting. For example, if we need to adjust out prices before calculating rates, we could use
+a script to recalculate them on-the-fly:
+
+[source,console]
+--------------------------------------------------
+GET sales/_search
+{
+  "size": 0,
+  "aggs": {
+    "by_date": {
+      "date_histogram": {
+        "field": "date",
+        "calendar_interval": "month"
+      },
+      "aggs": {
+        "avg_price": {
+          "rate": {
+            "script": {  <1>
+              "lang": "painless",
+              "source": "doc['price'].value * params.adjustment",
+              "params": {
+                "adjustment": 0.9  <2>
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
+--------------------------------------------------
+// TEST[setup:sales]
+
+<1> The `field` parameter is replaced with a `script` parameter, which uses the
+script to generate values which percentiles are calculated on.
+<2> Scripting supports parameterized input just like any other script.
+
+[source,console-result]
+--------------------------------------------------
+{
+  ...
+  "aggregations" : {
+    "by_date" : {
+      "buckets" : [
+        {
+          "key_as_string" : "2015/01/01 00:00:00",
+          "key" : 1420070400000,
+          "doc_count" : 3,
+          "avg_price" : {
+            "value" : 495.0
+          }
+        },
+        {
+          "key_as_string" : "2015/02/01 00:00:00",
+          "key" : 1422748800000,
+          "doc_count" : 2,
+          "avg_price" : {
+            "value" : 54.0
+          }
+        },
+        {
+          "key_as_string" : "2015/03/01 00:00:00",
+          "key" : 1425168000000,
+          "doc_count" : 2,
+          "avg_price" : {
+            "value" : 337.5
+          }
+        }
+      ]
+    }
+  }
+}
+--------------------------------------------------
+// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

+ 1 - 0
docs/reference/rest-api/usage.asciidoc

@@ -275,6 +275,7 @@ GET /_xpack/usage
       "normalize_usage" : 0,
       "cumulative_cardinality_usage" : 0,
       "t_test_usage" : 0,
+      "rate_usage" : 0,
       "string_stats_usage" : 0,
       "moving_percentiles_usage" : 0
     }

+ 133 - 32
server/src/main/java/org/elasticsearch/common/Rounding.java

@@ -45,6 +45,7 @@ import java.time.temporal.TemporalQueries;
 import java.time.zone.ZoneOffsetTransition;
 import java.time.zone.ZoneRules;
 import java.util.List;
+import java.util.Locale;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -61,7 +62,13 @@ import java.util.concurrent.TimeUnit;
  */
 public abstract class Rounding implements Writeable {
     public enum DateTimeUnit {
-        WEEK_OF_WEEKYEAR((byte) 1, IsoFields.WEEK_OF_WEEK_BASED_YEAR) {
+        WEEK_OF_WEEKYEAR(
+            (byte) 1,
+            "week",
+            IsoFields.WEEK_OF_WEEK_BASED_YEAR,
+            true,
+            TimeUnit.DAYS.toMillis(7)
+        ) {
             private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(7);
 
             long roundFloor(long utcMillis) {
@@ -73,7 +80,13 @@ public abstract class Rounding implements Writeable {
                 return extraLocalOffsetLookup;
             }
         },
-        YEAR_OF_CENTURY((byte) 2, ChronoField.YEAR_OF_ERA) {
+        YEAR_OF_CENTURY(
+            (byte) 2,
+            "year",
+            ChronoField.YEAR_OF_ERA,
+            false,
+            12
+        ) {
             private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(366);
 
             long roundFloor(long utcMillis) {
@@ -84,7 +97,13 @@ public abstract class Rounding implements Writeable {
                 return extraLocalOffsetLookup;
             }
         },
-        QUARTER_OF_YEAR((byte) 3, IsoFields.QUARTER_OF_YEAR) {
+        QUARTER_OF_YEAR(
+            (byte) 3,
+            "quarter",
+            IsoFields.QUARTER_OF_YEAR,
+            false,
+            3
+        ) {
             private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(92);
 
             long roundFloor(long utcMillis) {
@@ -95,7 +114,13 @@ public abstract class Rounding implements Writeable {
                 return extraLocalOffsetLookup;
             }
         },
-        MONTH_OF_YEAR((byte) 4, ChronoField.MONTH_OF_YEAR) {
+        MONTH_OF_YEAR(
+            (byte) 4,
+            "month",
+            ChronoField.MONTH_OF_YEAR,
+            false,
+            1
+        ) {
             private final long extraLocalOffsetLookup = TimeUnit.DAYS.toMillis(31);
 
             long roundFloor(long utcMillis) {
@@ -106,53 +131,82 @@ public abstract class Rounding implements Writeable {
                 return extraLocalOffsetLookup;
             }
         },
-        DAY_OF_MONTH((byte) 5, ChronoField.DAY_OF_MONTH) {
-            final long unitMillis = ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis();
+        DAY_OF_MONTH(
+            (byte) 5,
+            "day",
+            ChronoField.DAY_OF_MONTH,
+            true,
+            ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis()
+        ) {
             long roundFloor(long utcMillis) {
-                return DateUtils.roundFloor(utcMillis, unitMillis);
+                return DateUtils.roundFloor(utcMillis, this.ratio);
             }
 
             long extraLocalOffsetLookup() {
-                return unitMillis;
+                return ratio;
             }
         },
-        HOUR_OF_DAY((byte) 6, ChronoField.HOUR_OF_DAY) {
-            final long unitMillis = ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis();
+        HOUR_OF_DAY(
+            (byte) 6,
+            "hour",
+            ChronoField.HOUR_OF_DAY,
+            true,
+            ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis()
+        ) {
             long roundFloor(long utcMillis) {
-                return DateUtils.roundFloor(utcMillis, unitMillis);
+                return DateUtils.roundFloor(utcMillis, ratio);
             }
 
             long extraLocalOffsetLookup() {
-                return unitMillis;
+                return ratio;
             }
         },
-        MINUTES_OF_HOUR((byte) 7, ChronoField.MINUTE_OF_HOUR) {
-            final long unitMillis = ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis();
+        MINUTES_OF_HOUR(
+            (byte) 7,
+            "minute",
+            ChronoField.MINUTE_OF_HOUR,
+            true,
+            ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis()
+        ) {
             long roundFloor(long utcMillis) {
-                return DateUtils.roundFloor(utcMillis, unitMillis);
+                return DateUtils.roundFloor(utcMillis, ratio);
             }
 
             long extraLocalOffsetLookup() {
-                return unitMillis;
+                return ratio;
             }
         },
-        SECOND_OF_MINUTE((byte) 8, ChronoField.SECOND_OF_MINUTE) {
-            final long unitMillis = ChronoField.SECOND_OF_MINUTE.getBaseUnit().getDuration().toMillis();
+        SECOND_OF_MINUTE(
+            (byte) 8,
+            "second",
+            ChronoField.SECOND_OF_MINUTE,
+            true,
+            ChronoField.SECOND_OF_MINUTE.getBaseUnit().getDuration().toMillis()
+        ) {
             long roundFloor(long utcMillis) {
-                return DateUtils.roundFloor(utcMillis, unitMillis);
+                return DateUtils.roundFloor(utcMillis, ratio);
             }
 
             long extraLocalOffsetLookup() {
-                return unitMillis;
+                return ratio;
             }
         };
 
         private final byte id;
         private final TemporalField field;
+        private final boolean isMillisBased;
+        private final String shortName;
+        /**
+         * ratio to milliseconds if isMillisBased == true or to month otherwise
+         */
+        protected final long ratio;
 
-        DateTimeUnit(byte id, TemporalField field) {
+        DateTimeUnit(byte id, String shortName, TemporalField field, boolean isMillisBased, long ratio) {
             this.id = id;
+            this.shortName = shortName;
             this.field = field;
+            this.isMillisBased = isMillisBased;
+            this.ratio = ratio;
         }
 
         /**
@@ -168,7 +222,7 @@ public abstract class Rounding implements Writeable {
          * When looking up {@link LocalTimeOffset} go this many milliseconds
          * in the past from the minimum millis since epoch that we plan to
          * look up so that we can see transitions that we might have rounded
-         * down beyond. 
+         * down beyond.
          */
         abstract long extraLocalOffsetLookup();
 
@@ -180,6 +234,14 @@ public abstract class Rounding implements Writeable {
             return field;
         }
 
+        public static DateTimeUnit resolve(String name) {
+            return DateTimeUnit.valueOf(name.toUpperCase(Locale.ROOT));
+        }
+
+        public String shortName() {
+            return shortName;
+        }
+
         public static DateTimeUnit resolve(byte id) {
             switch (id) {
                 case 1: return WEEK_OF_WEEKYEAR;
@@ -220,6 +282,11 @@ public abstract class Rounding implements Writeable {
          * 3, {@code nextRoundValue(6) = 9}.
          */
         long nextRoundingValue(long utcMillis);
+        /**
+         * Given the rounded value, returns the size between this value and the
+         * next rounded value in specified units if possible.
+         */
+        double roundingSize(long utcMillis, DateTimeUnit timeUnit);
     }
     /**
      * Prepare to round many times.
@@ -324,7 +391,6 @@ public abstract class Rounding implements Writeable {
             return this;
         }
 
-
         public Rounding build() {
             Rounding rounding;
             if (unit != null) {
@@ -411,7 +477,7 @@ public abstract class Rounding implements Writeable {
                 /*
                  * Units that round to midnight can round down from two
                  * units worth of millis in the future to find the
-                 * nextRoundingValue. 
+                 * nextRoundingValue.
                  */
                 unitMillis = unit.field.getBaseUnit().getDuration().toMillis();
                 maxLookup += 2 * unitMillis;
@@ -489,7 +555,24 @@ public abstract class Rounding implements Writeable {
             return "Rounding[" + unit + " in " + timeZone + "]";
         }
 
-        private class FixedToMidnightRounding implements Prepared {
+        private abstract class TimeUnitPreparedRounding implements Prepared {
+            @Override
+            public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
+                if (timeUnit.isMillisBased == unit.isMillisBased) {
+                    return (double) unit.ratio / timeUnit.ratio;
+                } else {
+                    if (unit.isMillisBased == false) {
+                        return (double) (nextRoundingValue(utcMillis) - utcMillis) / timeUnit.ratio;
+                    } else {
+                        throw new IllegalArgumentException("Cannot use month-based rate unit [" + timeUnit.shortName +
+                            "] with non-month based calendar interval histogram [" + unit.shortName +
+                            "] only week, day, hour, minute and second are supported for this histogram");
+                    }
+                }
+            }
+        }
+
+        private class FixedToMidnightRounding extends TimeUnitPreparedRounding {
             private final LocalTimeOffset offset;
 
             FixedToMidnightRounding(LocalTimeOffset offset) {
@@ -508,7 +591,7 @@ public abstract class Rounding implements Writeable {
             }
         }
 
-        private class FixedNotToMidnightRounding implements Prepared {
+        private class FixedNotToMidnightRounding extends TimeUnitPreparedRounding {
             private final LocalTimeOffset offset;
             private final long unitMillis;
 
@@ -528,7 +611,7 @@ public abstract class Rounding implements Writeable {
             }
         }
 
-        private class ToMidnightRounding implements Prepared, LocalTimeOffset.Strategy {
+        private class ToMidnightRounding extends TimeUnitPreparedRounding implements LocalTimeOffset.Strategy {
             private final LocalTimeOffset.Lookup lookup;
 
             ToMidnightRounding(LocalTimeOffset.Lookup lookup) {
@@ -609,7 +692,7 @@ public abstract class Rounding implements Writeable {
             }
         }
 
-        private class JavaTimeToMidnightRounding implements Prepared {
+        private class JavaTimeToMidnightRounding extends TimeUnitPreparedRounding {
             @Override
             public long round(long utcMillis) {
                 LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(utcMillis), timeZone);
@@ -718,7 +801,7 @@ public abstract class Rounding implements Writeable {
             }
         }
 
-        private abstract class AbstractNotToMidnightRounding implements Prepared {
+        private abstract class AbstractNotToMidnightRounding extends TimeUnitPreparedRounding {
             protected final long unitMillis;
 
             AbstractNotToMidnightRounding(long unitMillis) {
@@ -835,6 +918,19 @@ public abstract class Rounding implements Writeable {
             }
         }
 
+        private abstract class TimeIntervalPreparedRounding implements Prepared {
+            @Override
+            public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
+                if (timeUnit.isMillisBased) {
+                    return (double) interval / timeUnit.ratio;
+                } else {
+                    throw new IllegalArgumentException("Cannot use month-based rate unit [" + timeUnit.shortName +
+                        "] with fixed interval based histogram, only week, day, hour, minute and second are supported for " +
+                        "this histogram");
+                }
+            }
+        }
+
         /**
          * Rounds to down inside of a time zone with an "effectively fixed"
          * time zone. A time zone can be "effectively fixed" if:
@@ -844,7 +940,7 @@ public abstract class Rounding implements Writeable {
          * <li>It is fixed over the entire range of dates that will be rounded</li>
          * </ul>
          */
-        private class FixedRounding implements Prepared {
+        private class FixedRounding extends TimeIntervalPreparedRounding {
             private final LocalTimeOffset offset;
 
             FixedRounding(LocalTimeOffset offset) {
@@ -868,7 +964,7 @@ public abstract class Rounding implements Writeable {
          * "effectively fixed". See {@link FixedRounding} for a description of
          * "effectively fixed".
          */
-        private class VariableRounding implements Prepared, LocalTimeOffset.Strategy {
+        private class VariableRounding extends TimeIntervalPreparedRounding implements LocalTimeOffset.Strategy {
             private final LocalTimeOffset.Lookup lookup;
 
             VariableRounding(LocalTimeOffset.Lookup lookup) {
@@ -923,7 +1019,7 @@ public abstract class Rounding implements Writeable {
          * of dates with the same {@link Prepared} instance.</li>
          * </ul>
          */
-        private class JavaTimeRounding implements Prepared {
+        private class JavaTimeRounding extends TimeIntervalPreparedRounding {
             @Override
             public long round(long utcMillis) {
                 final Instant utcInstant = Instant.ofEpochMilli(utcMillis);
@@ -1039,6 +1135,11 @@ public abstract class Rounding implements Writeable {
                 public long nextRoundingValue(long utcMillis) {
                     return delegatePrepared.nextRoundingValue(utcMillis - offset) + offset;
                 }
+
+                @Override
+                public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
+                    return delegatePrepared.roundingSize(utcMillis, timeUnit);
+                }
             };
         }
 

+ 15 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

@@ -50,7 +50,7 @@ import java.util.function.BiConsumer;
  *
  * @see Rounding
  */
-class DateHistogramAggregator extends BucketsAggregator {
+class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator {
 
     private final ValuesSource.Numeric valuesSource;
     private final DocValueFormat formatter;
@@ -182,4 +182,18 @@ class DateHistogramAggregator extends BucketsAggregator {
     public void collectDebugInfo(BiConsumer<String, Object> add) {
         add.accept("total_buckets", bucketOrds.size());
     }
+
+    /**
+     * Returns the size of the bucket in specified units.
+     *
+     * If unitSize is null, returns 1.0
+     */
+    @Override
+    public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
+        if (unitSize != null) {
+            return preparedRounding.roundingSize(bucketOrds.get(bucket), unitSize);
+        } else {
+            return 1.0;
+        }
+    }
 }

+ 29 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/SizedBucketAggregator.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.histogram;
+
+import org.elasticsearch.common.Rounding;
+
+/**
+ * An aggregator capable of reporting bucket sizes in milliseconds. Used by RateAggregator for calendar-based buckets.
+ */
+public interface SizedBucketAggregator {
+    double bucketSize(long bucket, Rounding.DateTimeUnit unit);
+}

+ 29 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/SizedBucketAggregatorBuilder.java

@@ -0,0 +1,29 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.histogram;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An aggregator capable of reporting bucket sizes in milliseconds. Used by RateAggregator for calendar-based buckets.
+ */
+public interface SizedBucketAggregatorBuilder {
+    double calendarDivider(TimeUnit timeUnit);
+}

+ 17 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java

@@ -60,6 +60,11 @@ public abstract class LongKeyedBucketOrds implements Releasable {
      */
    public abstract long find(long owningBucketOrd, long value);
 
+    /**
+     * Returns the value currently associated with the bucket ordinal
+     */
+    public abstract long get(long ordinal);
+
     /**
      * The number of collected buckets.
      */
@@ -98,7 +103,7 @@ public abstract class LongKeyedBucketOrds implements Releasable {
         long value();
 
         /**
-         * An {@linkplain BucketOrdsEnum} that is empty. 
+         * An {@linkplain BucketOrdsEnum} that is empty.
          */
         BucketOrdsEnum EMPTY = new BucketOrdsEnum() {
             @Override
@@ -133,6 +138,12 @@ public abstract class LongKeyedBucketOrds implements Releasable {
             return ords.find(value);
         }
 
+
+        @Override
+        public long get(long ordinal) {
+            return ords.get(ordinal);
+        }
+
         @Override
         public long bucketsInOrd(long owningBucketOrd) {
             assert owningBucketOrd == 0;
@@ -205,6 +216,11 @@ public abstract class LongKeyedBucketOrds implements Releasable {
             return ords.find(owningBucketOrd, value);
         }
 
+        @Override
+        public long get(long ordinal) {
+            return ords.getKey2(ordinal);
+        }
+
         @Override
         public long bucketsInOrd(long owningBucketOrd) {
             // TODO it'd be faster to count the number of buckets in a list of these ords rather than one at a time

+ 14 - 3
server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceAggregationBuilder.java

@@ -45,6 +45,12 @@ public abstract class ValuesSourceAggregationBuilder<AB extends ValuesSourceAggr
     public static <T> void declareFields(
         AbstractObjectParser<? extends ValuesSourceAggregationBuilder<?>, T> objectParser,
         boolean scriptable, boolean formattable, boolean timezoneAware) {
+        declareFields(objectParser, scriptable, formattable, timezoneAware, true);
+
+    }
+    public static <T> void declareFields(
+        AbstractObjectParser<? extends ValuesSourceAggregationBuilder<?>, T> objectParser,
+        boolean scriptable, boolean formattable, boolean timezoneAware, boolean fieldRequired) {
 
 
         objectParser.declareField(ValuesSourceAggregationBuilder::field, XContentParser::text,
@@ -71,10 +77,15 @@ public abstract class ValuesSourceAggregationBuilder<AB extends ValuesSourceAggr
             objectParser.declareField(ValuesSourceAggregationBuilder::script,
                     (parser, context) -> Script.parse(parser),
                     Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING);
-            String[] fields = new String[]{ParseField.CommonFields.FIELD.getPreferredName(), Script.SCRIPT_PARSE_FIELD.getPreferredName()};
-            objectParser.declareRequiredFieldSet(fields);
+            if (fieldRequired) {
+                String[] fields = new String[]{ParseField.CommonFields.FIELD.getPreferredName(),
+                    Script.SCRIPT_PARSE_FIELD.getPreferredName()};
+                objectParser.declareRequiredFieldSet(fields);
+            }
         } else {
-            objectParser.declareRequiredFieldSet(ParseField.CommonFields.FIELD.getPreferredName());
+            if (fieldRequired) {
+                objectParser.declareRequiredFieldSet(ParseField.CommonFields.FIELD.getPreferredName());
+            }
         }
 
         if (timezoneAware) {

+ 71 - 0
server/src/test/java/org/elasticsearch/common/RoundingTests.java

@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -831,6 +832,76 @@ public class RoundingTests extends ESTestCase {
         assertThat(rounding.round(time("1982-11-10T02:51:22.662Z")), isDate(time("1982-03-23T05:00:00Z"), tz));
     }
 
+    public void testFixedIntervalRoundingSize() {
+        Rounding unitRounding = Rounding.builder(TimeValue.timeValueHours(10)).build();
+        Rounding.Prepared prepared = unitRounding.prepare(time("2010-01-01T00:00:00.000Z"), time("2020-01-01T00:00:00.000Z"));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.SECOND_OF_MINUTE),
+            closeTo(36000.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MINUTES_OF_HOUR),
+            closeTo(600.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.HOUR_OF_DAY),
+            closeTo(10.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.DAY_OF_MONTH),
+            closeTo(10.0 / 24.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR),
+            closeTo(10.0 / 168.0, 0.000001));
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
+            () -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MONTH_OF_YEAR));
+        assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [month] with fixed interval based histogram, " +
+            "only week, day, hour, minute and second are supported for this histogram"));
+        ex = expectThrows(IllegalArgumentException.class,
+            () -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.QUARTER_OF_YEAR));
+        assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [quarter] with fixed interval based histogram, " +
+            "only week, day, hour, minute and second are supported for this histogram"));
+        ex = expectThrows(IllegalArgumentException.class,
+            () -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.YEAR_OF_CENTURY));
+        assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [year] with fixed interval based histogram, " +
+            "only week, day, hour, minute and second are supported for this histogram"));
+    }
+
+    public void testMillisecondsBasedUnitCalendarRoundingSize() {
+        Rounding unitRounding = Rounding.builder(Rounding.DateTimeUnit.HOUR_OF_DAY).build();
+        Rounding.Prepared prepared = unitRounding.prepare(time("2010-01-01T00:00:00.000Z"), time("2020-01-01T00:00:00.000Z"));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.SECOND_OF_MINUTE),
+            closeTo(3600.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MINUTES_OF_HOUR), closeTo(60.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.HOUR_OF_DAY), closeTo(1.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.DAY_OF_MONTH),
+            closeTo(1 / 24.0, 0.000001));
+        assertThat(prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.WEEK_OF_WEEKYEAR),
+            closeTo(1 / 168.0, 0.000001));
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
+            () -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.MONTH_OF_YEAR));
+        assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [month] with non-month based calendar interval " +
+            "histogram [hour] only week, day, hour, minute and second are supported for this histogram"));
+        ex = expectThrows(IllegalArgumentException.class,
+            () -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.QUARTER_OF_YEAR));
+        assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [quarter] with non-month based calendar interval " +
+            "histogram [hour] only week, day, hour, minute and second are supported for this histogram"));
+        ex = expectThrows(IllegalArgumentException.class,
+            () -> prepared.roundingSize(time("2015-01-01T00:00:00.000Z"), Rounding.DateTimeUnit.YEAR_OF_CENTURY));
+        assertThat(ex.getMessage(), equalTo("Cannot use month-based rate unit [year] with non-month based calendar interval " +
+            "histogram [hour] only week, day, hour, minute and second are supported for this histogram"));
+    }
+
+    public void testNonMillisecondsBasedUnitCalendarRoundingSize() {
+        Rounding unitRounding = Rounding.builder(Rounding.DateTimeUnit.QUARTER_OF_YEAR).build();
+        Rounding.Prepared prepared = unitRounding.prepare(time("2010-01-01T00:00:00.000Z"), time("2020-01-01T00:00:00.000Z"));
+        long firstQuarter = prepared.round(time("2015-01-01T00:00:00.000Z"));
+        // Ratio based
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.MONTH_OF_YEAR), closeTo(3.0, 0.000001));
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.QUARTER_OF_YEAR), closeTo(1.0, 0.000001));
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.YEAR_OF_CENTURY), closeTo(0.25, 0.000001));
+        // Real interval based
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.SECOND_OF_MINUTE), closeTo(7776000.0, 0.000001));
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.MINUTES_OF_HOUR), closeTo(129600.0, 0.000001));
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.HOUR_OF_DAY), closeTo(2160.0, 0.000001));
+        assertThat(prepared.roundingSize(firstQuarter, Rounding.DateTimeUnit.DAY_OF_MONTH), closeTo(90.0, 0.000001));
+        long thirdQuarter = prepared.round(time("2015-07-01T00:00:00.000Z"));
+        assertThat(prepared.roundingSize(thirdQuarter, Rounding.DateTimeUnit.DAY_OF_MONTH), closeTo(92.0, 0.000001));
+        assertThat(prepared.roundingSize(thirdQuarter, Rounding.DateTimeUnit.HOUR_OF_DAY), closeTo(2208.0, 0.000001));
+    }
+
     private void assertInterval(long rounded, long nextRoundingValue, Rounding rounding, int minutes,
                                 ZoneId tz) {
         assertInterval(rounded, dateBetween(rounded, nextRoundingValue), nextRoundingValue, rounding, tz);

+ 9 - 1
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java

@@ -38,6 +38,8 @@ import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
 import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
 import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
 import org.elasticsearch.xpack.analytics.movingPercentiles.MovingPercentilesPipelineAggregationBuilder;
+import org.elasticsearch.xpack.analytics.rate.InternalRate;
+import org.elasticsearch.xpack.analytics.rate.RateAggregationBuilder;
 import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
 import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
 import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
@@ -118,7 +120,13 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
                 TTestAggregationBuilder::new,
                 usage.track(AnalyticsStatsAction.Item.T_TEST, checkLicense(TTestAggregationBuilder.PARSER)))
                 .addResultReader(InternalTTest::new)
-                .setAggregatorRegistrar(TTestAggregationBuilder::registerUsage)
+                .setAggregatorRegistrar(TTestAggregationBuilder::registerUsage),
+            new AggregationSpec(
+                RateAggregationBuilder.NAME,
+                RateAggregationBuilder::new,
+                usage.track(AnalyticsStatsAction.Item.RATE, checkLicense(RateAggregationBuilder.PARSER)))
+                .addResultReader(InternalRate::new)
+                .setAggregatorRegistrar(RateAggregationBuilder::registerAggregators)
         );
     }
 

+ 108 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/InternalRate.java

@@ -0,0 +1,108 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
+import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class InternalRate extends InternalNumericMetricsAggregation.SingleValue implements Rate {
+    final double sum;
+    final double divisor;
+
+    public InternalRate(String name, double sum, double divisor, DocValueFormat formatter, Map<String, Object> metadata) {
+        super(name, metadata);
+        this.sum = sum;
+        this.divisor = divisor;
+        this.format = formatter;
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public InternalRate(StreamInput in) throws IOException {
+        super(in);
+        format = in.readNamedWriteable(DocValueFormat.class);
+        sum = in.readDouble();
+        divisor = in.readDouble();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(format);
+        out.writeDouble(sum);
+        out.writeDouble(divisor);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return RateAggregationBuilder.NAME;
+    }
+
+    @Override
+    public double value() {
+        return sum / divisor;
+    }
+
+    @Override
+    public double getValue() {
+        return sum / divisor;
+    }
+
+    // for testing only
+    DocValueFormat format() {
+        return format;
+    }
+
+    @Override
+    public InternalRate reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        // Compute the sum of double values with Kahan summation algorithm which is more
+        // accurate than naive summation.
+        CompensatedSum kahanSummation = new CompensatedSum(0, 0);
+        Double divisor = null;
+        for (InternalAggregation aggregation : aggregations) {
+            double value = ((InternalRate) aggregation).sum;
+            kahanSummation.add(value);
+            if (divisor == null) {
+                divisor = ((InternalRate) aggregation).divisor;
+            }
+        }
+        return new InternalRate(name, kahanSummation.value(), divisor, format, getMetadata());
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        builder.field(CommonFields.VALUE.getPreferredName(), value());
+        if (format != DocValueFormat.RAW) {
+            builder.field(CommonFields.VALUE_AS_STRING.getPreferredName(), format.format(value()).toString());
+        }
+        return builder;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), sum, divisor);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null || getClass() != obj.getClass()) return false;
+        if (super.equals(obj) == false) return false;
+
+        InternalRate that = (InternalRate) obj;
+        return Objects.equals(sum, that.sum) && Objects.equals(divisor, that.divisor);
+    }
+}

+ 20 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/Rate.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
+
+/**
+ * An aggregation that computes the rate of the values in the current bucket by adding all values in the bucket and dividing
+ * it by the size of the bucket.
+ */
+public interface Rate extends NumericMetricsAggregation.SingleValue {
+
+    /**
+     * The rate.
+     */
+    double getValue();
+}

+ 173 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java

@@ -0,0 +1,173 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryShardContext;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, RateAggregationBuilder> {
+    public static final String NAME = "rate";
+    public static final ParseField UNIT_FIELD = new ParseField("unit");
+    public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
+        NAME,
+        RateAggregatorSupplier.class
+    );
+    public static final ObjectParser<RateAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(NAME, RateAggregationBuilder::new);
+
+    static {
+        ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, false, false);
+        PARSER.declareString(RateAggregationBuilder::rateUnit, UNIT_FIELD);
+    }
+
+    Rounding.DateTimeUnit rateUnit;
+
+    public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
+        RateAggregatorFactory.registerAggregators(builder);
+    }
+
+    public RateAggregationBuilder(String name) {
+        super(name);
+    }
+
+    protected RateAggregationBuilder(
+        RateAggregationBuilder clone,
+        AggregatorFactories.Builder factoriesBuilder,
+        Map<String, Object> metadata
+    ) {
+        super(clone, factoriesBuilder, metadata);
+    }
+
+    @Override
+    protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metadata) {
+        return new RateAggregationBuilder(this, factoriesBuilder, metadata);
+    }
+
+    /**
+     * Read from a stream.
+     */
+    public RateAggregationBuilder(StreamInput in) throws IOException {
+        super(in);
+        byte b = in.readByte();
+        if (b > 0) {
+            rateUnit = Rounding.DateTimeUnit.resolve(b);
+        } else {
+            rateUnit = null;
+        }
+    }
+
+    @Override
+    protected ValuesSourceType defaultValueSourceType() {
+        return CoreValuesSourceType.NUMERIC;
+    }
+
+    @Override
+    protected void innerWriteTo(StreamOutput out) throws IOException {
+        if (rateUnit != null) {
+            out.writeByte(rateUnit.getId());
+        } else {
+            out.writeByte((byte) 0);
+        }
+    }
+
+    @Override
+    protected ValuesSourceRegistry.RegistryKey<?> getRegistryKey() {
+        return REGISTRY_KEY;
+    }
+
+    @Override
+    protected RateAggregatorFactory innerBuild(
+        QueryShardContext queryShardContext,
+        ValuesSourceConfig config,
+        AggregatorFactory parent,
+        AggregatorFactories.Builder subFactoriesBuilder
+    ) throws IOException {
+
+        return new RateAggregatorFactory(name, config, rateUnit, queryShardContext, parent, subFactoriesBuilder, metadata);
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        if (rateUnit != null) {
+            builder.field(UNIT_FIELD.getPreferredName(), rateUnit.shortName());
+        }
+        return builder;
+    }
+
+    @Override
+    public String getType() {
+        return NAME;
+    }
+
+    public RateAggregationBuilder rateUnit(String rateUnit) {
+        return rateUnit(parse(rateUnit));
+    }
+
+    public RateAggregationBuilder rateUnit(Rounding.DateTimeUnit rateUnit) {
+        this.rateUnit = rateUnit;
+        return this;
+    }
+
+    static Rounding.DateTimeUnit parse(String rateUnit) {
+        Rounding.DateTimeUnit parsedRate = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(rateUnit);
+        if (parsedRate == null) {
+            throw new IllegalArgumentException("Unsupported unit " + rateUnit);
+        }
+        return parsedRate;
+    }
+
+    @Override
+    protected ValuesSourceConfig resolveConfig(QueryShardContext queryShardContext) {
+        if (field() == null && script() == null) {
+            return new ValuesSourceConfig(
+                CoreValuesSourceType.NUMERIC,
+                null,
+                true,
+                null,
+                null,
+                1.0,
+                null,
+                DocValueFormat.RAW,
+                queryShardContext::nowInMillis
+            );
+        } else {
+            return super.resolveConfig(queryShardContext);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
+        RateAggregationBuilder that = (RateAggregationBuilder) o;
+        return rateUnit == that.rateUnit;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), rateUnit);
+    }
+}

+ 135 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregator.java

@@ -0,0 +1,135 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.ScoreMode;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.DoubleArray;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
+import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RateAggregator extends NumericMetricsAggregator.SingleValue {
+
+    private final ValuesSource.Numeric valuesSource;
+    private final DocValueFormat format;
+    private final Rounding.DateTimeUnit rateUnit;
+    private final SizedBucketAggregator sizedBucketAggregator;
+
+    private DoubleArray sums;
+    private DoubleArray compensations;
+
+    public RateAggregator(
+        String name,
+        ValuesSourceConfig valuesSourceConfig,
+        Rounding.DateTimeUnit rateUnit,
+        SearchContext context,
+        Aggregator parent,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, context, parent, metadata);
+        this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
+        this.format = valuesSourceConfig.format();
+        if (valuesSource != null) {
+            sums = context.bigArrays().newDoubleArray(1, true);
+            compensations = context.bigArrays().newDoubleArray(1, true);
+        }
+        this.rateUnit = rateUnit;
+        this.sizedBucketAggregator = findSizedBucketAncestor();
+    }
+
+    private SizedBucketAggregator findSizedBucketAncestor() {
+        SizedBucketAggregator sizedBucketAggregator = null;
+        for (Aggregator ancestor = parent; ancestor != null; ancestor = ancestor.parent()) {
+            if (ancestor instanceof SizedBucketAggregator) {
+                sizedBucketAggregator = (SizedBucketAggregator) ancestor;
+                break;
+            }
+        }
+        if (sizedBucketAggregator == null) {
+            throw new IllegalArgumentException("The rate aggregation can only be used inside a date histogram");
+        }
+        return sizedBucketAggregator;
+    }
+
+    @Override
+    public ScoreMode scoreMode() {
+        return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
+        final BigArrays bigArrays = context.bigArrays();
+        final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
+        final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
+
+        return new LeafBucketCollectorBase(sub, values) {
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                sums = bigArrays.grow(sums, bucket + 1);
+                compensations = bigArrays.grow(compensations, bucket + 1);
+
+                if (values.advanceExact(doc)) {
+                    final int valuesCount = values.docValueCount();
+                    // Compute the sum of double values with Kahan summation algorithm which is more
+                    // accurate than naive summation.
+                    double sum = sums.get(bucket);
+                    double compensation = compensations.get(bucket);
+                    kahanSummation.reset(sum, compensation);
+
+                    for (int i = 0; i < valuesCount; i++) {
+                        double value = values.nextValue();
+                        kahanSummation.add(value);
+                    }
+
+                    compensations.set(bucket, kahanSummation.delta());
+                    sums.set(bucket, kahanSummation.value());
+                }
+            }
+        };
+    }
+
+    @Override
+    public double metric(long owningBucketOrd) {
+        if (sizedBucketAggregator == null || valuesSource == null || owningBucketOrd >= sums.size()) {
+            return 0.0;
+        }
+        return sums.get(owningBucketOrd) / sizedBucketAggregator.bucketSize(owningBucketOrd, rateUnit);
+    }
+
+    @Override
+    public InternalAggregation buildAggregation(long bucket) {
+        if (valuesSource == null || bucket >= sums.size()) {
+            return buildEmptyAggregation();
+        }
+        return new InternalRate(name, sums.get(bucket), sizedBucketAggregator.bucketSize(bucket, rateUnit), format, metadata());
+    }
+
+    @Override
+    public InternalAggregation buildEmptyAggregation() {
+        return new InternalRate(name, 0.0, 1.0, format, metadata());
+    }
+
+    @Override
+    public void doClose() {
+        Releasables.close(sums, compensations);
+    }
+}

+ 74 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java

@@ -0,0 +1,74 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.index.query.QueryShardContext;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
+
+    private final Rounding.DateTimeUnit rateUnit;
+
+    RateAggregatorFactory(
+        String name,
+        ValuesSourceConfig config,
+        Rounding.DateTimeUnit rateUnit,
+        QueryShardContext queryShardContext,
+        AggregatorFactory parent,
+        AggregatorFactories.Builder subFactoriesBuilder,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, config, queryShardContext, parent, subFactoriesBuilder, metadata);
+        this.rateUnit = rateUnit;
+    }
+
+    static void registerAggregators(ValuesSourceRegistry.Builder builder) {
+        builder.register(
+            RateAggregationBuilder.REGISTRY_KEY,
+            List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
+            RateAggregator::new,
+            true
+        );
+    }
+
+    @Override
+    protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
+        return new RateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
+            @Override
+            public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
+                return LeafBucketCollector.NO_OP_COLLECTOR;
+            }
+        };
+    }
+
+    @Override
+    protected Aggregator doCreateInternal(
+        SearchContext searchContext,
+        Aggregator parent,
+        CardinalityUpperBound bucketCardinality,
+        Map<String, Object> metadata
+    ) throws IOException {
+        return queryShardContext.getValuesSourceRegistry()
+            .getAggregator(RateAggregationBuilder.REGISTRY_KEY, config)
+            .build(name, config, rateUnit, searchContext, parent, metadata);
+    }
+}

+ 26 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorSupplier.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+public interface RateAggregatorSupplier {
+    Aggregator build(
+        String name,
+        ValuesSourceConfig valuesSourceConfig,
+        Rounding.DateTimeUnit rateUnit,
+        SearchContext context,
+        Aggregator parent,
+        Map<String, Object> metadata
+    ) throws IOException;
+}

+ 1 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java

@@ -45,6 +45,7 @@ public class AnalyticsStatsActionNodeResponseTests extends AbstractWireSerializi
         assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++));
         assertThat(AnalyticsStatsAction.Item.MOVING_PERCENTILES.ordinal(), equalTo(i++));
         assertThat(AnalyticsStatsAction.Item.NORMALIZE.ordinal(), equalTo(i++));
+        assertThat(AnalyticsStatsAction.Item.RATE.ordinal(), equalTo(i++));
         // Please add tests for newly added items here
         assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i));
     }

+ 103 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/InternalRateTests.java

@@ -0,0 +1,103 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.ParsedAggregation;
+import org.elasticsearch.test.InternalAggregationTestCase;
+import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InternalRateTests extends InternalAggregationTestCase<InternalRate> {
+
+    @Override
+    protected SearchPlugin registerPlugin() {
+        return new AnalyticsPlugin();
+    }
+
+    @Override
+    protected InternalRate createTestInstance(String name, Map<String, Object> metadata) {
+        double sum = randomDouble();
+        double divider = randomDoubleBetween(0.0, 100000.0, false);
+        DocValueFormat formatter = randomNumericDocValueFormat();
+        return new InternalRate(name, sum, divider, formatter, metadata);
+    }
+
+    @Override
+    protected List<InternalRate> randomResultsToReduce(String name, int size) {
+        double divider = randomDoubleBetween(0.0, 100000.0, false);
+        List<InternalRate> inputs = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            // Make sure the sum of all the counts doesn't wrap and type and tail parameters are consistent
+            DocValueFormat formatter = randomNumericDocValueFormat();
+            inputs.add(new InternalRate(name, randomDouble(), divider, formatter, null));
+        }
+        return inputs;
+    }
+
+    @Override
+    protected void assertReduced(InternalRate reduced, List<InternalRate> inputs) {
+        double expected = inputs.stream().mapToDouble(a -> a.sum).sum() / reduced.divisor;
+        assertEquals(expected, reduced.getValue(), 0.00001);
+    }
+
+    @Override
+    protected void assertFromXContent(InternalRate min, ParsedAggregation parsedAggregation) {
+        // There is no ParsedRate yet so we cannot test it here
+    }
+
+    @Override
+    protected InternalRate mutateInstance(InternalRate instance) {
+        String name = instance.getName();
+        double sum = instance.sum;
+        double divider = instance.divisor;
+        DocValueFormat formatter = instance.format();
+        Map<String, Object> metadata = instance.getMetadata();
+        switch (between(0, 3)) {
+            case 0:
+                name += randomAlphaOfLength(5);
+                break;
+            case 1:
+                sum = randomDouble();
+                break;
+            case 2:
+                divider = randomDouble();
+                break;
+            case 3:
+                if (metadata == null) {
+                    metadata = new HashMap<>(1);
+                } else {
+                    metadata = new HashMap<>(instance.getMetadata());
+                }
+                metadata.put(randomAlphaOfLength(15), randomInt());
+                break;
+            default:
+                throw new AssertionError("Illegal randomisation branch");
+        }
+        return new InternalRate(name, sum, divider, formatter, metadata);
+    }
+
+    @Override
+    protected List<NamedXContentRegistry.Entry> getNamedXContents() {
+        List<NamedXContentRegistry.Entry> extendedNamedXContents = new ArrayList<>(super.getNamedXContents());
+        extendedNamedXContents.add(
+            new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(RateAggregationBuilder.NAME), (p, c) -> {
+                assumeTrue("There is no ParsedRate yet", false);
+                return null;
+            })
+        );
+        return extendedNamedXContents;
+    }
+}

+ 89 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilderTests.java

@@ -0,0 +1,89 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.hasSize;
+
+public class RateAggregationBuilderTests extends AbstractSerializingTestCase<RateAggregationBuilder> {
+    String aggregationName;
+
+    @Before
+    public void setupName() {
+        aggregationName = randomAlphaOfLength(10);
+    }
+
+    @Override
+    protected RateAggregationBuilder doParseInstance(XContentParser parser) throws IOException {
+        assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
+        AggregatorFactories.Builder parsed = AggregatorFactories.parseAggregators(parser);
+        assertThat(parsed.getAggregatorFactories(), hasSize(1));
+        assertThat(parsed.getPipelineAggregatorFactories(), hasSize(0));
+        RateAggregationBuilder agg = (RateAggregationBuilder) parsed.getAggregatorFactories().iterator().next();
+        assertNull(parser.nextToken());
+        assertNotNull(agg);
+        return agg;
+    }
+
+    @Override
+    protected RateAggregationBuilder createTestInstance() {
+        RateAggregationBuilder aggregationBuilder = new RateAggregationBuilder(aggregationName);
+        if (randomBoolean()) {
+            if (randomBoolean()) {
+                aggregationBuilder.field(randomAlphaOfLength(10));
+            } else {
+                aggregationBuilder.script(new Script(randomAlphaOfLength(10)));
+            }
+        }
+        if (randomBoolean()) {
+            aggregationBuilder.rateUnit(randomFrom(Rounding.DateTimeUnit.values()));
+        }
+        return aggregationBuilder;
+    }
+
+    @Override
+    protected Writeable.Reader<RateAggregationBuilder> instanceReader() {
+        return RateAggregationBuilder::new;
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        return new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables());
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        List<NamedXContentRegistry.Entry> namedXContent = new ArrayList<>();
+        namedXContent.add(
+            new NamedXContentRegistry.Entry(
+                BaseAggregationBuilder.class,
+                new ParseField(RateAggregationBuilder.NAME),
+                (p, n) -> RateAggregationBuilder.PARSER.apply(p, (String) n)
+            )
+        );
+        namedXContent.addAll(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());
+        return new NamedXContentRegistry(namedXContent);
+    }
+}

+ 445 - 0
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java

@@ -0,0 +1,445 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.document.SortedSetDocValuesField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.fielddata.ScriptDocValues;
+import org.elasticsearch.index.mapper.DateFieldMapper;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.script.MockScriptEngine;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.script.ScriptModule;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
+import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.lookup.LeafDocLookup;
+import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class RateAggregatorTests extends AggregatorTestCase {
+
+    /**
+     * Script to return the {@code _value} provided by aggs framework.
+     */
+    public static final String ADD_ONE_SCRIPT = "add_one";
+
+    public static final String TERM_FILTERING = "term_filtering";
+
+    public static final String DATE_FIELD = "t";
+
+    @Override
+    protected ScriptService getMockScriptService() {
+        Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
+
+        scripts.put(ADD_ONE_SCRIPT, vars -> {
+            LeafDocLookup leafDocLookup = (LeafDocLookup) vars.get("doc");
+            String fieldname = (String) vars.get("fieldname");
+            ScriptDocValues<?> scriptDocValues = leafDocLookup.get(fieldname);
+            return ((Number) scriptDocValues.get(0)).doubleValue() + 1.0;
+        });
+
+        scripts.put(TERM_FILTERING, vars -> {
+            LeafDocLookup leafDocLookup = (LeafDocLookup) vars.get("doc");
+            int term = (Integer) vars.get("term");
+            ScriptDocValues<?> termDocValues = leafDocLookup.get("term");
+            int currentTerm = ((Number) termDocValues.get(0)).intValue();
+            if (currentTerm == term) {
+                return ((Number) leafDocLookup.get("field").get(0)).doubleValue();
+            }
+            return null;
+        });
+
+        MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME, scripts, Collections.emptyMap());
+        Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
+
+        return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
+    }
+
+    public void testNoMatchingField() throws IOException {
+        testCase(new MatchAllDocsQuery(), "month", true, "month", "val", iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("wrong_val", 102)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("wrong_val", 103)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("wrong_val", 103)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(dh.getBuckets().get(0).getAggregations().asList(), hasSize(1));
+            assertThat(dh.getBuckets().get(0).getAggregations().asList().get(0), instanceOf(InternalRate.class));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(0.0, 0.000001));
+
+            assertThat(dh.getBuckets().get(1).getAggregations().asList(), hasSize(1));
+            assertThat(dh.getBuckets().get(1).getAggregations().asList().get(0), instanceOf(InternalRate.class));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(0.0, 0.000001));
+        });
+    }
+
+    public void testSortedNumericDocValuesMonthToMonth() throws IOException {
+        testCase(new MatchAllDocsQuery(), "month", true, "month", "val", iw -> {
+            iw.addDocument(
+                doc("2010-03-12T01:07:45", new SortedNumericDocValuesField("val", 1), new SortedNumericDocValuesField("val", 2))
+            );
+            iw.addDocument(doc("2010-04-01T03:43:34", new SortedNumericDocValuesField("val", 3)));
+            iw.addDocument(
+                doc("2010-04-27T03:43:34", new SortedNumericDocValuesField("val", 4), new SortedNumericDocValuesField("val", 5))
+            );
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(12.0, 0.000001));
+        });
+    }
+
+    public void testDocValuesMonthToMonth() throws IOException {
+        testCase(new MatchAllDocsQuery(), "month", true, "month", "val", iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(7.0, 0.000001));
+        });
+    }
+
+    public void testDocValuesMonthToMonthDefaultRate() throws IOException {
+        testCase(new MatchAllDocsQuery(), "month", true, null, "val", iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(7.0, 0.000001));
+        });
+    }
+
+    public void testDocValuesYearToMonth() throws IOException {
+        testCase(new MatchAllDocsQuery(), "year", true, "month", "val", iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(1));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
+        });
+    }
+
+    public void testDocValuesMonthToYear() throws IOException {
+        testCase(new MatchAllDocsQuery(), "month", true, "year", "val", iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(12.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(132.0, 0.000001));
+        });
+    }
+
+    public void testDocValues50DaysToDays() throws IOException {
+        testCase(new MatchAllDocsQuery(), "50d", false, "day", "val", iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(0.02, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(0.22, 0.000001));
+        });
+    }
+
+    public void testIncompatibleCalendarRate() {
+        String interval = randomFrom("second", "minute", "hour", "day", "week", "1s", "1m", "1h", "1d", "1w");
+        String rate = randomFrom("month", "quarter", "year", "1M", "1q", "1y");
+        IllegalArgumentException ex = expectThrows(
+            IllegalArgumentException.class,
+            () -> testCase(new MatchAllDocsQuery(), interval, true, rate, "val", iw -> {
+                iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+                iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+                iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
+            }, dh -> { fail("Shouldn't be here"); })
+        );
+        assertEquals(
+            "Cannot use month-based rate unit ["
+                + RateAggregationBuilder.parse(rate).shortName()
+                + "] with non-month based calendar interval histogram ["
+                + RateAggregationBuilder.parse(interval).shortName()
+                + "] only week, day, hour, minute and second are supported for this histogram",
+            ex.getMessage()
+        );
+    }
+
+    public void testIncompatibleIntervalRate() {
+        String interval = randomFrom("1s", "2m", "4h", "5d", "6w");
+        String rate = randomFrom("month", "quarter", "year", "1M", "1q", "1y");
+        IllegalArgumentException ex = expectThrows(
+            IllegalArgumentException.class,
+            () -> testCase(new MatchAllDocsQuery(), interval, false, rate, "val", iw -> {
+                iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+                iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+                iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 8)));
+            }, dh -> { fail("Shouldn't be here"); })
+        );
+        assertEquals(
+            "Cannot use month-based rate unit ["
+                + RateAggregationBuilder.parse(rate).shortName()
+                + "] with fixed interval based histogram, only week, day, hour, minute and second are supported for this histogram",
+            ex.getMessage()
+        );
+    }
+
+    public void testNoFieldMonthToDay() throws IOException {
+        testCase(new MatchAllDocsQuery(), "month", true, "day", null, iw -> {
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+        }, dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1 / 31.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(2 / 30.0, 0.000001));
+        });
+    }
+
+    public void testNoWrapping() throws IOException {
+        MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("day");
+        IllegalArgumentException ex = expectThrows(
+            IllegalArgumentException.class,
+            () -> testCase(rateAggregationBuilder, new MatchAllDocsQuery(), iw -> {
+                iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+                iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+                iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+            }, h -> { fail("Shouldn't be here"); }, dateType, numType)
+        );
+        assertEquals("The rate aggregation can only be used inside a date histogram", ex.getMessage());
+    }
+
+    public void testDoubleWrapping() throws IOException {
+        MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("month"))
+            .subAggregation(rateAggregationBuilder);
+        DateHistogramAggregationBuilder topDateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date");
+        topDateHistogramAggregationBuilder.field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("year"))
+            .subAggregation(dateHistogramAggregationBuilder);
+
+        testCase(topDateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            iw.addDocument(doc("2009-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+        }, (Consumer<InternalDateHistogram>) tdh -> {
+            assertThat(tdh.getBuckets(), hasSize(2));
+            InternalDateHistogram dh1 = (InternalDateHistogram) tdh.getBuckets().get(0).getAggregations().asList().get(0);
+            assertThat(dh1.getBuckets(), hasSize(1));
+            assertThat(((InternalRate) dh1.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(1.0, 0.000001));
+
+            InternalDateHistogram dh2 = (InternalDateHistogram) tdh.getBuckets().get(1).getAggregations().asList().get(0);
+            assertThat(dh2.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh2.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(2.0, 0.000001));
+            assertThat(((InternalRate) dh2.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(7.0, 0.000001));
+        }, dateType, numType);
+    }
+
+    public void testKeywordSandwich() throws IOException {
+        MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term");
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
+        TermsAggregationBuilder termsAggregationBuilder = new TermsAggregationBuilder("my_term").field("term")
+            .subAggregation(rateAggregationBuilder);
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("month"))
+            .subAggregation(termsAggregationBuilder);
+
+        testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            iw.addDocument(
+                doc("2010-03-11T01:07:45", new NumericDocValuesField("val", 1), new SortedSetDocValuesField("term", new BytesRef("a")))
+            );
+            iw.addDocument(
+                doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2), new SortedSetDocValuesField("term", new BytesRef("a")))
+            );
+            iw.addDocument(
+                doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3), new SortedSetDocValuesField("term", new BytesRef("a")))
+            );
+            iw.addDocument(
+                doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4), new SortedSetDocValuesField("term", new BytesRef("b")))
+            );
+        }, (Consumer<InternalDateHistogram>) dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            StringTerms st1 = (StringTerms) dh.getBuckets().get(0).getAggregations().asList().get(0);
+            assertThat(st1.getBuckets(), hasSize(1));
+            assertThat(((InternalRate) st1.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+
+            StringTerms st2 = (StringTerms) dh.getBuckets().get(1).getAggregations().asList().get(0);
+            assertThat(st2.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) st2.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+            assertThat(((InternalRate) st2.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(4.0, 0.000001));
+        }, dateType, numType, keywordType);
+    }
+
+    public void testScriptMonthToDay() throws IOException {
+        testCase(
+            new MatchAllDocsQuery(),
+            "month",
+            true,
+            "day",
+            new Script(ScriptType.INLINE, MockScriptEngine.NAME, ADD_ONE_SCRIPT, Collections.singletonMap("fieldname", "val")),
+            iw -> {
+                iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 1)));
+                iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+                iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+            },
+            dh -> {
+                assertThat(dh.getBuckets(), hasSize(2));
+                assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(2 / 31.0, 0.000001));
+                assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(9 / 30.0, 0.000001));
+            }
+        );
+    }
+
+    public void testFilter() throws IOException {
+        MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term");
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
+
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("month"))
+            .subAggregation(rateAggregationBuilder);
+
+        testCase(dateHistogramAggregationBuilder, new TermQuery(new Term("term", "a")), iw -> {
+            iw.addDocument(doc("2010-03-11T01:07:45", new NumericDocValuesField("val", 1), new StringField("term", "a", Field.Store.NO)));
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2), new StringField("term", "a", Field.Store.NO)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3), new StringField("term", "a", Field.Store.NO)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4), new StringField("term", "b", Field.Store.NO)));
+        }, (Consumer<InternalDateHistogram>) dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+        }, dateType, numType, keywordType);
+    }
+
+    public void testFormatter() throws IOException {
+        MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month")
+            .field("val")
+            .format("00.0/M");
+
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("month"))
+            .subAggregation(rateAggregationBuilder);
+
+        testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            iw.addDocument(doc("2010-03-11T01:07:45", new NumericDocValuesField("val", 1)));
+            iw.addDocument(doc("2010-03-12T01:07:45", new NumericDocValuesField("val", 2)));
+            iw.addDocument(doc("2010-04-01T03:43:34", new NumericDocValuesField("val", 3)));
+            iw.addDocument(doc("2010-04-27T03:43:34", new NumericDocValuesField("val", 4)));
+        }, (Consumer<InternalDateHistogram>) dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValueAsString(), equalTo("03.0/M"));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).getValueAsString(), equalTo("07.0/M"));
+        }, dateType, numType);
+    }
+
+    private void testCase(
+        Query query,
+        String interval,
+        boolean isCalendar,
+        String unit,
+        Object field,
+        CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
+        Consumer<InternalDateHistogram> verify
+    ) throws IOException {
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        MappedFieldType numType = new NumberFieldMapper.NumberFieldType("val", NumberFieldMapper.NumberType.INTEGER);
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate");
+        if (unit != null) {
+            rateAggregationBuilder.rateUnit(unit);
+        }
+        if (field != null) {
+            if (field instanceof Script) {
+                rateAggregationBuilder.script((Script) field);
+            } else {
+                rateAggregationBuilder.field((String) field);
+            }
+        }
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date");
+        dateHistogramAggregationBuilder.field(DATE_FIELD);
+        if (isCalendar) {
+            dateHistogramAggregationBuilder.calendarInterval(new DateHistogramInterval(interval));
+        } else {
+            dateHistogramAggregationBuilder.fixedInterval(new DateHistogramInterval(interval));
+        }
+        dateHistogramAggregationBuilder.subAggregation(rateAggregationBuilder);
+        testCase(dateHistogramAggregationBuilder, query, buildIndex, verify, dateType, numType);
+    }
+
+    @Override
+    protected List<SearchPlugin> getSearchPlugins() {
+        return Collections.singletonList(new AnalyticsPlugin());
+    }
+
+    private DateFieldMapper.DateFieldType dateFieldType(String name) {
+        return new DateFieldMapper.DateFieldType(
+            name,
+            true,
+            true,
+            DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER,
+            DateFieldMapper.Resolution.MILLISECONDS,
+            Collections.emptyMap()
+        );
+    }
+
+    private Iterable<IndexableField> doc(String date, IndexableField... fields) {
+        List<IndexableField> indexableFields = new ArrayList<>();
+        long instant = dateFieldType(DATE_FIELD).parse(date);
+        indexableFields.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
+        indexableFields.addAll(Arrays.asList(fields));
+        return indexableFields;
+    }
+}

+ 2 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java

@@ -45,7 +45,8 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
         TOP_METRICS,
         T_TEST,
         MOVING_PERCENTILES,
-        NORMALIZE;
+        NORMALIZE,
+        RATE;
     }
 
     public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {

+ 36 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/rate.yml

@@ -0,0 +1,36 @@
+---
+setup:
+  - do:
+      bulk:
+        index: test
+        refresh: true
+        body:
+          - '{"index": {}}'
+          - '{"timestamp": "2020-02-03T10:00:00Z", "val": 3}'
+          - '{"index": {}}'
+          - '{"timestamp": "2020-02-04T10:00:00Z", "val": 4}'
+          - '{"index": {}}'
+          - '{"timestamp": "2020-02-11T10:00:00Z", "val": 6}'
+          - '{"index": {}}'
+          - '{"timestamp": "2020-02-12T10:00:00Z", "val": 8}'
+---
+"value rate":
+  - do:
+      search:
+        size: 0
+        index: "test"
+        body:
+          aggs:
+            by_date:
+              date_histogram:
+                field: timestamp
+                calendar_interval: week
+              aggs:
+                rate:
+                  rate:
+                    field: val
+                    unit: day
+
+  - length: { aggregations.by_date.buckets: 2 }
+  - match: { aggregations.by_date.buckets.0.rate.value: 1.0 }
+  - match: { aggregations.by_date.buckets.1.rate.value: 2.0 }

+ 39 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml

@@ -28,6 +28,7 @@ setup:
   - set: {analytics.stats.string_stats_usage: string_stats_usage}
   - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
   - set: { analytics.stats.normalize_usage: normalize_usage }
+  - set: { analytics.stats.rate_usage: rate_usage }
 
   # use boxplot agg
   - do:
@@ -54,6 +55,7 @@ setup:
   - match: {analytics.stats.string_stats_usage: $string_stats_usage}
   - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
   - match: { analytics.stats.normalize_usage: $normalize_usage }
+  - match: { analytics.stats.rate_usage: $rate_usage }
 
   # use top_metrics agg
   - do:
@@ -83,6 +85,7 @@ setup:
   - match: {analytics.stats.string_stats_usage: $string_stats_usage}
   - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
   - match: { analytics.stats.normalize_usage: $normalize_usage }
+  - match: { analytics.stats.rate_usage: $rate_usage }
 
   # use cumulative_cardinality agg
   - do:
@@ -116,6 +119,7 @@ setup:
   - match: {analytics.stats.string_stats_usage: $string_stats_usage}
   - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
   - match: { analytics.stats.normalize_usage: $normalize_usage }
+  - match: { analytics.stats.rate_usage: $rate_usage }
 
   # use t-test agg
   - do:
@@ -143,6 +147,7 @@ setup:
   - match: {analytics.stats.string_stats_usage: $string_stats_usage}
   - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
   - match: { analytics.stats.normalize_usage: $normalize_usage }
+  - match: { analytics.stats.rate_usage: $rate_usage }
 
   - do:
       search:
@@ -166,6 +171,7 @@ setup:
   - set: {analytics.stats.string_stats_usage: string_stats_usage}
   - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
   - match: { analytics.stats.normalize_usage: $normalize_usage }
+  - match: { analytics.stats.rate_usage: $rate_usage }
 
   # use moving_percentile agg
   - do:
@@ -200,6 +206,7 @@ setup:
   - gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage }
   - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
   - match: { analytics.stats.normalize_usage: $normalize_usage }
+  - match: { analytics.stats.rate_usage: $rate_usage }
 
   # use normalize agg
   - do:
@@ -234,3 +241,35 @@ setup:
   - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
   - gt: { analytics.stats.normalize_usage: $normalize_usage }
   - set: {analytics.stats.normalize_usage: normalize_usage}
+  - match: { analytics.stats.rate_usage: $rate_usage }
+
+  # use rate agg
+  - do:
+      search:
+        index: "test"
+        body:
+          size: 0
+          aggs:
+            histo:
+              date_histogram:
+                field: "timestamp"
+                calendar_interval: "day"
+              aggs:
+                avg_users:
+                  rate:
+                    field: "s"
+                    unit: "hour"
+
+  - length: { aggregations.histo.buckets: 1 }
+
+  - do: {xpack.usage: {}}
+  - match: { analytics.available: true }
+  - match: { analytics.enabled: true }
+  - match: {analytics.stats.boxplot_usage: $boxplot_usage}
+  - match: {analytics.stats.top_metrics_usage: $top_metrics_usage}
+  - match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage}
+  - match: {analytics.stats.t_test_usage: $t_test_usage}
+  - match: {analytics.stats.string_stats_usage: $string_stats_usage}
+  - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
+  - gt: { analytics.stats.rate_usage: $rate_usage }
+  - set: {analytics.stats.rate_usage: rate_usage}

+ 2 - 1
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

@@ -79,7 +79,8 @@ public final class TransformAggregations {
         "top_hits",
         "top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
         "t_test", // https://github.com/elastic/elasticsearch/issues/54503,
-        "variable_width_histogram" // https://github.com/elastic/elasticsearch/issues/58140
+        "variable_width_histogram", // https://github.com/elastic/elasticsearch/issues/58140
+        "rate" // https://github.com/elastic/elasticsearch/issues/61351
     );
 
     private TransformAggregations() {}