Browse Source

Add support for histogram fields to rate aggregation (#63289)

The rate aggregation now supports histogram fields. At the moment only sum
is supported. 

Closes #62939
Igor Motov 5 years ago
parent
commit
34bff3f776

+ 2 - 1
docs/reference/aggregations/metrics/rate-aggregation.asciidoc

@@ -4,7 +4,8 @@
 === Rate Aggregation
 
 A `rate` metrics aggregation can be used only inside a `date_histogram` and calculates a rate of documents or a field in each
-`date_histogram` bucket.
+`date_histogram` bucket. The field values can be generated by a provided script or extracted from specific numeric or
+<<histogram,histogram fields>> in the documents.
 
 ==== Syntax
 

+ 9 - 47
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregator.java → x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java

@@ -5,39 +5,33 @@
  */
 package org.elasticsearch.xpack.analytics.rate;
 
-import org.apache.lucene.index.LeafReaderContext;
+import java.io.IOException;
+import java.util.Map;
+
 import org.apache.lucene.search.ScoreMode;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.common.lease.Releasables;
-import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.DoubleArray;
-import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.Aggregator;
 import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.LeafBucketCollector;
-import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
 import org.elasticsearch.search.aggregations.bucket.histogram.SizedBucketAggregator;
-import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
 import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
 import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
 import org.elasticsearch.search.internal.SearchContext;
 
-import java.io.IOException;
-import java.util.Map;
-
-public class RateAggregator extends NumericMetricsAggregator.SingleValue {
+public abstract class AbstractRateAggregator extends NumericMetricsAggregator.SingleValue {
 
-    private final ValuesSource.Numeric valuesSource;
+    protected final ValuesSource valuesSource;
     private final DocValueFormat format;
     private final Rounding.DateTimeUnit rateUnit;
     private final SizedBucketAggregator sizedBucketAggregator;
 
-    private DoubleArray sums;
-    private DoubleArray compensations;
+    protected DoubleArray sums;
+    protected DoubleArray compensations;
 
-    public RateAggregator(
+    public AbstractRateAggregator(
         String name,
         ValuesSourceConfig valuesSourceConfig,
         Rounding.DateTimeUnit rateUnit,
@@ -46,7 +40,7 @@ public class RateAggregator extends NumericMetricsAggregator.SingleValue {
         Map<String, Object> metadata
     ) throws IOException {
         super(name, context, parent, metadata);
-        this.valuesSource = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
+        this.valuesSource = valuesSourceConfig.getValuesSource();
         this.format = valuesSourceConfig.format();
         if (valuesSource != null) {
             sums = context.bigArrays().newDoubleArray(1, true);
@@ -75,38 +69,6 @@ public class RateAggregator extends NumericMetricsAggregator.SingleValue {
         return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
     }
 
-    @Override
-    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
-        final BigArrays bigArrays = context.bigArrays();
-        final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
-        final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
-
-        return new LeafBucketCollectorBase(sub, values) {
-            @Override
-            public void collect(int doc, long bucket) throws IOException {
-                sums = bigArrays.grow(sums, bucket + 1);
-                compensations = bigArrays.grow(compensations, bucket + 1);
-
-                if (values.advanceExact(doc)) {
-                    final int valuesCount = values.docValueCount();
-                    // Compute the sum of double values with Kahan summation algorithm which is more
-                    // accurate than naive summation.
-                    double sum = sums.get(bucket);
-                    double compensation = compensations.get(bucket);
-                    kahanSummation.reset(sum, compensation);
-
-                    for (int i = 0; i < valuesCount; i++) {
-                        double value = values.nextValue();
-                        kahanSummation.add(value);
-                    }
-
-                    compensations.set(bucket, kahanSummation.delta());
-                    sums.set(bucket, kahanSummation.value());
-                }
-            }
-        };
-    }
-
     @Override
     public double metric(long owningBucketOrd) {
         if (sizedBucketAggregator == null || valuesSource == null || owningBucketOrd >= sums.size()) {

+ 62 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/HistogramRateAggregator.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.index.fielddata.HistogramValue;
+import org.elasticsearch.index.fielddata.HistogramValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.xpack.analytics.aggregations.support.HistogramValuesSource;
+
+public class HistogramRateAggregator extends AbstractRateAggregator {
+    public HistogramRateAggregator(
+        String name,
+        ValuesSourceConfig valuesSourceConfig,
+        Rounding.DateTimeUnit rateUnit,
+        SearchContext context,
+        Aggregator parent,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
+        final BigArrays bigArrays = context.bigArrays();
+        final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
+        final HistogramValues values = ((HistogramValuesSource.Histogram) valuesSource).getHistogramValues(ctx);
+        return new LeafBucketCollectorBase(sub, values) {
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                sums = bigArrays.grow(sums, bucket + 1);
+                compensations = bigArrays.grow(compensations, bucket + 1);
+
+                if (values.advanceExact(doc)) {
+                    final HistogramValue sketch = values.histogram();
+                    while (sketch.next()) {
+                        double sum = sums.get(bucket);
+                        double compensation = compensations.get(bucket);
+                        kahanSummation.reset(sum, compensation);
+                        kahanSummation.add(sketch.value());
+                        compensations.set(bucket, kahanSummation.delta());
+                        sums.set(bucket, kahanSummation.value());
+                    }
+                }
+            }
+        };
+    }
+}

+ 66 - 0
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java

@@ -0,0 +1,66 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.analytics.rate;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.Rounding;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.internal.SearchContext;
+
+public class NumericRateAggregator extends AbstractRateAggregator {
+    public NumericRateAggregator(
+        String name,
+        ValuesSourceConfig valuesSourceConfig,
+        Rounding.DateTimeUnit rateUnit,
+        SearchContext context,
+        Aggregator parent,
+        Map<String, Object> metadata
+    ) throws IOException {
+        super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
+    }
+
+    @Override
+    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
+        final BigArrays bigArrays = context.bigArrays();
+        final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
+        final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
+        return new LeafBucketCollectorBase(sub, values) {
+            @Override
+            public void collect(int doc, long bucket) throws IOException {
+                sums = bigArrays.grow(sums, bucket + 1);
+                compensations = bigArrays.grow(compensations, bucket + 1);
+
+                if (values.advanceExact(doc)) {
+                    final int valuesCount = values.docValueCount();
+                    // Compute the sum of double values with Kahan summation algorithm which is more
+                    // accurate than naive summation.
+                    double sum = sums.get(bucket);
+                    double compensation = compensations.get(bucket);
+                    kahanSummation.reset(sum, compensation);
+
+                    for (int i = 0; i < valuesCount; i++) {
+                        double value = values.nextValue();
+                        kahanSummation.add(value);
+                    }
+
+                    compensations.set(bucket, kahanSummation.delta());
+                    sums.set(bucket, kahanSummation.value());
+                }
+            }
+        };
+    }
+}

+ 1 - 1
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java

@@ -28,7 +28,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
-public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, RateAggregationBuilder> {
+public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, RateAggregationBuilder> {
     public static final String NAME = "rate";
     public static final ParseField UNIT_FIELD = new ParseField("unit");
     public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(

+ 14 - 7
x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java

@@ -6,6 +6,10 @@
 
 package org.elasticsearch.xpack.analytics.rate;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.lucene.index.LeafReaderContext;
 import org.elasticsearch.common.Rounding;
 import org.elasticsearch.index.query.QueryShardContext;
@@ -19,10 +23,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFacto
 import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
 import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
 import org.elasticsearch.search.internal.SearchContext;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
+import org.elasticsearch.xpack.analytics.aggregations.support.AnalyticsValuesSourceType;
 
 class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
 
@@ -44,15 +45,21 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
     static void registerAggregators(ValuesSourceRegistry.Builder builder) {
         builder.register(
             RateAggregationBuilder.REGISTRY_KEY,
-            List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
-            RateAggregator::new,
+            Collections.singletonList(CoreValuesSourceType.NUMERIC),
+            NumericRateAggregator::new,
+            true
+        );
+        builder.register(
+            RateAggregationBuilder.REGISTRY_KEY,
+            Collections.singletonList(AnalyticsValuesSourceType.HISTOGRAM),
+            HistogramRateAggregator::new,
             true
         );
     }
 
     @Override
     protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
-        return new RateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
+        return new AbstractRateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
             @Override
             public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
                 return LeafBucketCollector.NO_OP_COLLECTOR;

+ 78 - 15
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorTests.java

@@ -6,6 +6,22 @@
 
 package org.elasticsearch.xpack.analytics.rate;
 
+import static org.elasticsearch.xpack.analytics.AnalyticsTestsUtils.histogramFieldDocValues;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.NumericDocValuesField;
 import org.apache.lucene.document.SortedNumericDocValuesField;
@@ -40,21 +56,7 @@ import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.lookup.LeafDocLookup;
 import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static org.hamcrest.Matchers.closeTo;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
+import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
 
 public class RateAggregatorTests extends AggregatorTestCase {
 
@@ -386,6 +388,67 @@ public class RateAggregatorTests extends AggregatorTestCase {
         }, dateType, numType);
     }
 
+    public void testHistogramFieldMonthToMonth() throws IOException {
+        MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap());
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
+
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("month"))
+            .subAggregation(rateAggregationBuilder);
+
+        testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 })));
+            iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3, 4 })));
+        }, (Consumer<InternalDateHistogram>) dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValue(), closeTo(3.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).getValue(), closeTo(7.0, 0.000001));
+        }, dateType, histType);
+    }
+
+    public void testHistogramFieldMonthToYear() throws IOException {
+        MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap());
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
+
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("year"))
+            .subAggregation(rateAggregationBuilder);
+
+        testCase(dateHistogramAggregationBuilder, new MatchAllDocsQuery(), iw -> {
+            iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 })));
+            iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3, 4 })));
+        }, (Consumer<InternalDateHistogram>) dh -> {
+            assertThat(dh.getBuckets(), hasSize(1));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).getValue(), closeTo(10.0 / 12, 0.000001));
+        }, dateType, histType);
+    }
+
+    public void testFilterWithHistogramField() throws IOException {
+        MappedFieldType histType = new HistogramFieldMapper.HistogramFieldType("val", Collections.emptyMap());
+        MappedFieldType dateType = dateFieldType(DATE_FIELD);
+        MappedFieldType keywordType = new KeywordFieldMapper.KeywordFieldType("term");
+        RateAggregationBuilder rateAggregationBuilder = new RateAggregationBuilder("my_rate").rateUnit("month").field("val");
+
+        DateHistogramAggregationBuilder dateHistogramAggregationBuilder = new DateHistogramAggregationBuilder("my_date").field(DATE_FIELD)
+            .calendarInterval(new DateHistogramInterval("month"))
+            .subAggregation(rateAggregationBuilder);
+
+        testCase(dateHistogramAggregationBuilder, new TermQuery(new Term("term", "a")), iw -> {
+            iw.addDocument(doc("2010-03-01T00:00:00", histogramFieldDocValues("val", new double[] { 1, 2 }),
+                new StringField("term", "a", Field.Store.NO)));
+            iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 3 }),
+                new StringField("term", "a", Field.Store.NO)));
+            iw.addDocument(doc("2010-04-01T00:00:00", histogramFieldDocValues("val", new double[] { 4 }),
+                new StringField("term", "b", Field.Store.NO)));
+        }, (Consumer<InternalDateHistogram>) dh -> {
+            assertThat(dh.getBuckets(), hasSize(2));
+            assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+            assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(3.0, 0.000001));
+        }, dateType, histType, keywordType);
+    }
+
     private void testCase(
         Query query,
         String interval,