浏览代码

[TSDB] Add Kahan support to downsampling summation (#87554)

This PR adds support for Kahan summation when downsampling time series indices

Relates to #74660
Christos Soulios 3 年之前
父节点
当前提交
aebb7bc3cb

+ 5 - 0
docs/changelog/87554.yaml

@@ -0,0 +1,5 @@
+pr: 87554
+summary: "[TSDB] Add Kahan support to downsampling summation"
+area: "Rollup"
+type: enhancement
+issues: []

+ 4 - 0
server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java

@@ -35,6 +35,10 @@ public class CompensatedSum {
         this.delta = delta;
     }
 
+    public CompensatedSum() {
+        this(0, 0);
+    }
+
     /**
      * The value of the sum.
      */

+ 5 - 5
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducer.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.rollup.v2;
 
 import org.elasticsearch.index.mapper.MappedFieldType;
 import org.elasticsearch.index.query.SearchExecutionContext;
+import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -145,7 +146,7 @@ abstract class MetricFieldProducer {
      * Metric implementation that computes the sum of all values of a field
      */
     static class Sum extends Metric {
-        private double sum = 0;
+        private final CompensatedSum kahanSummation = new CompensatedSum();
 
         Sum() {
             super("sum");
@@ -153,18 +154,17 @@ abstract class MetricFieldProducer {
 
         @Override
         void collect(double value) {
-            // TODO: switch to Kahan summation ?
-            this.sum += value;
+            kahanSummation.add(value);
         }
 
         @Override
         Number get() {
-            return sum;
+            return kahanSummation.value();
         }
 
         @Override
         void reset() {
-            sum = 0;
+            kahanSummation.reset(0, 0);
         }
     }
 

+ 41 - 0
x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/MetricFieldProducerTests.java

@@ -56,6 +56,47 @@ public class MetricFieldProducerTests extends AggregatorTestCase {
         assertEquals(0d, metric.get());
     }
 
+    /**
+     * Testing summation accuracy.
+     * Tests stolen from SumAggregatorTests#testSummationAccuracy
+     */
+    public void testSummationAccuracy() {
+        MetricFieldProducer.Metric metric = new MetricFieldProducer.Sum();
+        // Summing up a normal array and expect an accurate value
+        double[] values = new double[] { 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7 };
+        for (int i = 0; i < values.length; i++) {
+            metric.collect(values[i]);
+        }
+        assertEquals(metric.get().doubleValue(), 15.3, Double.MIN_NORMAL);
+
+        // Summing up an array which contains NaN and infinities and expect a result same as naive summation
+        metric.reset();
+        int n = randomIntBetween(5, 10);
+        double sum = 0;
+        for (int i = 0; i < n; i++) {
+            double d = frequently()
+                ? randomFrom(Double.NaN, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY)
+                : randomDoubleBetween(Double.MIN_VALUE, Double.MAX_VALUE, true);
+            sum += d;
+            metric.collect(d);
+        }
+        assertEquals(metric.get().doubleValue(), sum, 1e-10);
+
+        // Summing up some big double values and expect infinity result
+        metric.reset();
+        n = randomIntBetween(5, 10);
+        for (int i = 0; i < n; i++) {
+            metric.collect(Double.MAX_VALUE);
+        }
+        assertEquals(metric.get().doubleValue(), Double.POSITIVE_INFINITY, 0d);
+
+        metric.reset();
+        for (int i = 0; i < n; i++) {
+            metric.collect(-Double.MAX_VALUE);
+        }
+        assertEquals(metric.get().doubleValue(), Double.NEGATIVE_INFINITY, 0d);
+    }
+
     public void testValueCountMetric() {
         MetricFieldProducer.Metric metric = new MetricFieldProducer.ValueCount();
         assertEquals(0L, metric.get());