Browse Source

Use long in Centroid count (#99491)

* Use long in Centroid count

Centroids currently use integers to track how many samples their mean
tracks. This can overflow in case the digest tracks billions of samples
or more.

TDigestState already serializes the count as VLong, so it can be read as
VInt without compatibility issues.

Fixes #80153

* Update docs/changelog/99491.yaml

* More test fixes

* Bump TransportVersion

* Revert TransportVersion change
Kostas Krikellas 2 years ago
parent
commit
0247cfe442

+ 6 - 0
docs/changelog/99491.yaml

@@ -0,0 +1,6 @@
+pr: 99491
+summary: Use long in Centroid count
+area: Aggregations
+type: bug
+issues:
+ - 80153

+ 9 - 9
libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java

@@ -31,10 +31,10 @@ import java.util.Iterator;
 final class AVLGroupTree extends AbstractCollection<Centroid> {
 final class AVLGroupTree extends AbstractCollection<Centroid> {
     /* For insertions into the tree */
     /* For insertions into the tree */
     private double centroid;
     private double centroid;
-    private int count;
+    private long count;
     private double[] centroids;
     private double[] centroids;
-    private int[] counts;
-    private int[] aggregatedCounts;
+    private long[] counts;
+    private long[] aggregatedCounts;
     private final IntAVLTree tree;
     private final IntAVLTree tree;
 
 
     AVLGroupTree() {
     AVLGroupTree() {
@@ -78,8 +78,8 @@ final class AVLGroupTree extends AbstractCollection<Centroid> {
 
 
         };
         };
         centroids = new double[tree.capacity()];
         centroids = new double[tree.capacity()];
-        counts = new int[tree.capacity()];
-        aggregatedCounts = new int[tree.capacity()];
+        counts = new long[tree.capacity()];
+        aggregatedCounts = new long[tree.capacity()];
     }
     }
 
 
     /**
     /**
@@ -113,14 +113,14 @@ final class AVLGroupTree extends AbstractCollection<Centroid> {
     /**
     /**
      * Return the count for the provided node.
      * Return the count for the provided node.
      */
      */
-    public int count(int node) {
+    public long count(int node) {
         return counts[node];
         return counts[node];
     }
     }
 
 
     /**
     /**
      * Add the provided centroid to the tree.
      * Add the provided centroid to the tree.
      */
      */
-    public void add(double centroid, int count) {
+    public void add(double centroid, long count) {
         this.centroid = centroid;
         this.centroid = centroid;
         this.count = count;
         this.count = count;
         tree.add();
         tree.add();
@@ -135,7 +135,7 @@ final class AVLGroupTree extends AbstractCollection<Centroid> {
     /**
     /**
      * Update values associated with a node, readjusting the tree if necessary.
      * Update values associated with a node, readjusting the tree if necessary.
      */
      */
-    public void update(int node, double centroid, int count) {
+    public void update(int node, double centroid, long count) {
         // have to do full scale update
         // have to do full scale update
         this.centroid = centroid;
         this.centroid = centroid;
         this.count = count;
         this.count = count;
@@ -242,7 +242,7 @@ final class AVLGroupTree extends AbstractCollection<Centroid> {
     /**
     /**
      * Return the total count of points that have been added to the tree.
      * Return the total count of points that have been added to the tree.
      */
      */
-    public int sum() {
+    public long sum() {
         return aggregatedCounts[tree.root()];
         return aggregatedCounts[tree.root()];
     }
     }
 
 

+ 7 - 7
libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java

@@ -68,7 +68,7 @@ public class AVLTreeDigest extends AbstractTDigest {
     }
     }
 
 
     @Override
     @Override
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         checkValue(x);
         checkValue(x);
         needsCompression = true;
         needsCompression = true;
 
 
@@ -84,7 +84,7 @@ public class AVLTreeDigest extends AbstractTDigest {
         }
         }
 
 
         if (start == NIL) { // empty summary
         if (start == NIL) { // empty summary
-            assert summary.size() == 0;
+            assert summary.isEmpty();
             summary.add(x, w);
             summary.add(x, w);
             count = w;
             count = w;
         } else {
         } else {
@@ -127,7 +127,7 @@ public class AVLTreeDigest extends AbstractTDigest {
                 // if the nearest point was not unique, then we may not be modifying the first copy
                 // if the nearest point was not unique, then we may not be modifying the first copy
                 // which means that ordering can change
                 // which means that ordering can change
                 double centroid = summary.mean(closest);
                 double centroid = summary.mean(closest);
-                int count = summary.count(closest);
+                long count = summary.count(closest);
                 centroid = weightedAverage(centroid, count, x, w);
                 centroid = weightedAverage(centroid, count, x, w);
                 count += w;
                 count += w;
                 summary.update(closest, centroid, count);
                 summary.update(closest, centroid, count);
@@ -189,7 +189,7 @@ public class AVLTreeDigest extends AbstractTDigest {
     @Override
     @Override
     public double cdf(double x) {
     public double cdf(double x) {
         AVLGroupTree values = summary;
         AVLGroupTree values = summary;
-        if (values.size() == 0) {
+        if (values.isEmpty()) {
             return Double.NaN;
             return Double.NaN;
         }
         }
         if (values.size() == 1) {
         if (values.size() == 1) {
@@ -272,7 +272,7 @@ public class AVLTreeDigest extends AbstractTDigest {
         }
         }
 
 
         AVLGroupTree values = summary;
         AVLGroupTree values = summary;
-        if (values.size() == 0) {
+        if (values.isEmpty()) {
             // no centroids means no data, no way to get a quantile
             // no centroids means no data, no way to get a quantile
             return Double.NaN;
             return Double.NaN;
         } else if (values.size() == 1) {
         } else if (values.size() == 1) {
@@ -293,7 +293,7 @@ public class AVLTreeDigest extends AbstractTDigest {
         }
         }
 
 
         int currentNode = values.first();
         int currentNode = values.first();
-        int currentWeight = values.count(currentNode);
+        long currentWeight = values.count(currentNode);
 
 
         // Total mass to the left of the center of the current node.
         // Total mass to the left of the center of the current node.
         double weightSoFar = currentWeight / 2.0;
         double weightSoFar = currentWeight / 2.0;
@@ -305,7 +305,7 @@ public class AVLTreeDigest extends AbstractTDigest {
 
 
         for (int i = 0; i < values.size() - 1; i++) {
         for (int i = 0; i < values.size() - 1; i++) {
             int nextNode = values.next(currentNode);
             int nextNode = values.next(currentNode);
-            int nextWeight = values.count(nextNode);
+            long nextWeight = values.count(nextNode);
             // this is the mass between current center and next center
             // this is the mass between current center and next center
             double dw = (currentWeight + nextWeight) / 2.0;
             double dw = (currentWeight + nextWeight) / 2.0;
 
 

+ 6 - 6
libs/tdigest/src/main/java/org/elasticsearch/tdigest/Centroid.java

@@ -30,7 +30,7 @@ public class Centroid implements Comparable<Centroid> {
     private static final AtomicInteger uniqueCount = new AtomicInteger(1);
     private static final AtomicInteger uniqueCount = new AtomicInteger(1);
 
 
     private double centroid = 0;
     private double centroid = 0;
-    private int count = 0;
+    private long count = 0;
 
 
     // The ID is transient because it must be unique within a given JVM. A new
     // The ID is transient because it must be unique within a given JVM. A new
     // ID should be generated from uniqueCount when a Centroid is deserialized.
     // ID should be generated from uniqueCount when a Centroid is deserialized.
@@ -45,22 +45,22 @@ public class Centroid implements Comparable<Centroid> {
         start(x, 1, uniqueCount.getAndIncrement());
         start(x, 1, uniqueCount.getAndIncrement());
     }
     }
 
 
-    public Centroid(double x, int w) {
+    public Centroid(double x, long w) {
         this();
         this();
         start(x, w, uniqueCount.getAndIncrement());
         start(x, w, uniqueCount.getAndIncrement());
     }
     }
 
 
-    public Centroid(double x, int w, int id) {
+    public Centroid(double x, long w, int id) {
         this();
         this();
         start(x, w, id);
         start(x, w, id);
     }
     }
 
 
-    private void start(double x, int w, int id) {
+    private void start(double x, long w, int id) {
         this.id = id;
         this.id = id;
         add(x, w);
         add(x, w);
     }
     }
 
 
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         count += w;
         count += w;
         centroid += w * (x - centroid) / count;
         centroid += w * (x - centroid) / count;
     }
     }
@@ -69,7 +69,7 @@ public class Centroid implements Comparable<Centroid> {
         return centroid;
         return centroid;
     }
     }
 
 
-    public int count() {
+    public long count() {
         return count;
         return count;
     }
     }
 
 

+ 1 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java

@@ -70,7 +70,7 @@ public class HybridDigest extends AbstractTDigest {
     }
     }
 
 
     @Override
     @Override
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         reserve(w);
         reserve(w);
         if (mergingDigest != null) {
         if (mergingDigest != null) {
             mergingDigest.add(x, w);
             mergingDigest.add(x, w);

+ 2 - 2
libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java

@@ -216,7 +216,7 @@ public class MergingDigest extends AbstractTDigest {
     }
     }
 
 
     @Override
     @Override
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         checkValue(x);
         checkValue(x);
         if (tempUsed >= tempWeight.length - lastUsedCell - 1) {
         if (tempUsed >= tempWeight.length - lastUsedCell - 1) {
             mergeNewValues();
             mergeNewValues();
@@ -514,7 +514,7 @@ public class MergingDigest extends AbstractTDigest {
 
 
                     @Override
                     @Override
                     public Centroid next() {
                     public Centroid next() {
-                        Centroid rc = new Centroid(mean[i], (int) weight[i]);
+                        Centroid rc = new Centroid(mean[i], (long) weight[i]);
                         i++;
                         i++;
                         return rc;
                         return rc;
                     }
                     }

+ 1 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java

@@ -39,7 +39,7 @@ public class SortingDigest extends AbstractTDigest {
     private boolean isSorted = true;
     private boolean isSorted = true;
 
 
     @Override
     @Override
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         checkValue(x);
         checkValue(x);
         isSorted = isSorted && (values.isEmpty() || values.get(values.size() - 1) <= x);
         isSorted = isSorted && (values.isEmpty() || values.get(values.size() - 1) <= x);
         for (int i = 0; i < w; i++) {
         for (int i = 0; i < w; i++) {

+ 1 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java

@@ -94,7 +94,7 @@ public abstract class TDigest {
      * @param x The value to add.
      * @param x The value to add.
      * @param w The weight of this point.
      * @param w The weight of this point.
      */
      */
-    public abstract void add(double x, int w);
+    public abstract void add(double x, long w);
 
 
     /**
     /**
      * Add a single sample to this TDigest.
      * Add a single sample to this TDigest.

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java

@@ -118,7 +118,7 @@ public class MergingDigestTests extends TDigestTests {
                 d.add(x);
                 d.add(x);
             }
             }
         }
         }
-        int last = 0;
+        long last = 0;
         for (Centroid centroid : d.centroids()) {
         for (Centroid centroid : d.centroids()) {
             if (last == 0) {
             if (last == 0) {
                 assertEquals(1, centroid.count());
                 assertEquals(1, centroid.count());

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java

@@ -15,7 +15,7 @@ public final class EmptyTDigestState extends TDigestState {
     }
     }
 
 
     @Override
     @Override
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         throw new UnsupportedOperationException("Immutable Empty TDigest");
         throw new UnsupportedOperationException("Immutable Empty TDigest");
     }
     }
 
 

+ 3 - 3
server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java

@@ -138,7 +138,7 @@ public class TDigestState {
             state.tdigest.reserve(size);
             state.tdigest.reserve(size);
         }
         }
         for (int i = 0; i < n; i++) {
         for (int i = 0; i < n; i++) {
-            state.add(in.readDouble(), in.readVInt());
+            state.add(in.readDouble(), in.readVLong());
         }
         }
         return state;
         return state;
     }
     }
@@ -189,7 +189,7 @@ public class TDigestState {
         h = 31 * h + Integer.hashCode(centroidCount());
         h = 31 * h + Integer.hashCode(centroidCount());
         for (Centroid centroid : centroids()) {
         for (Centroid centroid : centroids()) {
             h = 31 * h + Double.hashCode(centroid.mean());
             h = 31 * h + Double.hashCode(centroid.mean());
-            h = 31 * h + centroid.count();
+            h = 31 * h + (int) centroid.count();
         }
         }
         h = 31 * h + Double.hashCode(getMax());
         h = 31 * h + Double.hashCode(getMax());
         h = 31 * h + Double.hashCode(getMin());
         h = 31 * h + Double.hashCode(getMin());
@@ -205,7 +205,7 @@ public class TDigestState {
         tdigest.add(other.tdigest);
         tdigest.add(other.tdigest);
     }
     }
 
 
-    public void add(double x, int w) {
+    public void add(double x, long w) {
         tdigest.add(x, w);
         tdigest.add(x, w);
     }
     }
 
 

+ 1 - 1
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/AnalyticsTestsUtils.java

@@ -29,7 +29,7 @@ public final class AnalyticsTestsUtils {
         BytesStreamOutput streamOutput = new BytesStreamOutput();
         BytesStreamOutput streamOutput = new BytesStreamOutput();
         histogram.compress();
         histogram.compress();
         for (Centroid centroid : histogram.centroids()) {
         for (Centroid centroid : histogram.centroids()) {
-            streamOutput.writeVInt(centroid.count());
+            streamOutput.writeVLong(centroid.count());
             streamOutput.writeDouble(centroid.mean());
             streamOutput.writeDouble(centroid.mean());
         }
         }
         return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());
         return new BinaryDocValuesField(fieldName, streamOutput.bytes().toBytesRef());

+ 2 - 2
x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/aggregations/metrics/HistogramPercentileAggregationTests.java

@@ -185,7 +185,7 @@ public class HistogramPercentileAggregationTests extends ESSingleNodeTestCase {
                 client().bulk(bulkRequest);
                 client().bulk(bulkRequest);
                 bulkRequest = new BulkRequest();
                 bulkRequest = new BulkRequest();
                 List<Double> values = new ArrayList<>();
                 List<Double> values = new ArrayList<>();
-                List<Integer> counts = new ArrayList<>();
+                List<Long> counts = new ArrayList<>();
                 Collection<Centroid> centroids = histogram.centroids();
                 Collection<Centroid> centroids = histogram.centroids();
                 for (Centroid centroid : centroids) {
                 for (Centroid centroid : centroids) {
                     values.add(centroid.mean());
                     values.add(centroid.mean());
@@ -196,7 +196,7 @@ public class HistogramPercentileAggregationTests extends ESSingleNodeTestCase {
                     .startObject("inner")
                     .startObject("inner")
                     .startObject("data")
                     .startObject("data")
                     .field("values", values.toArray(new Double[values.size()]))
                     .field("values", values.toArray(new Double[values.size()]))
-                    .field("counts", counts.toArray(new Integer[counts.size()]))
+                    .field("counts", counts.toArray(new Long[counts.size()]))
                     .endObject()
                     .endObject()
                     .endObject()
                     .endObject()
                     .endObject();
                     .endObject();