Browse Source

Add CircuitBreaker to TDigest, Step 2: Add CB to array wrappers (#113105) (#113608)

Part of https://github.com/elastic/elasticsearch/issues/99815

## Steps
1. Migrate TDigest classes to use a custom Array implementation. Temporarily use a simple array wrapper (https://github.com/elastic/elasticsearch/pull/112810)
2. Implement CircuitBreaking in the `MemoryTrackingTDigestArrays` class. Add `Releasable` and ensure it's always closed within TDigest (This PR)
3. Pass the CircuitBreaker as a parameter to TDigestState from wherever it's being used
4. Account remaining TDigest classes size ("SHALLOW_SIZE")

Every step should be safely mergeable to main:
- The first and second steps should have no impact.
- The third and fourth ones will start increasing the CB count partially.

## Remarks
To simplify testing the CircuitBreaker, added a helper method + `@After` to ESTestCase.

Right now CBs are usually tested through MockBigArrays. E.g:
https://github.com/elastic/elasticsearch/blob/f7a0196b454b17f7928728a26084000238c4efaa/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java#L1263-L1265
So I guess there was no need for this yet. But I may have missed something somewhere.

Also, I'm separating this PR from the "step 3" as integrating this (CB) in the current usages may require some refactor of external code, which may be somewhat more _dangerous_
Iván Cea Fontenla 1 year ago
parent
commit
b8dcdd6303
34 changed files with 1147 additions and 483 deletions
  1. 6 3
      benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/SortBench.java
  2. 7 4
      benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/TDigestBench.java
  3. 2 0
      libs/tdigest/build.gradle
  4. 2 0
      libs/tdigest/src/main/java/module-info.java
  5. 7 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java
  6. 24 17
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java
  7. 7 0
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java
  8. 7 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/IntAVLTree.java
  9. 6 7
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java
  10. 6 0
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java
  11. 2 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java
  12. 3 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestByteArray.java
  13. 3 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestDoubleArray.java
  14. 3 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestIntArray.java
  15. 3 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestLongArray.java
  16. 0 258
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/WrapperTDigestArrays.java
  17. 6 9
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLGroupTreeTests.java
  18. 1 3
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLTreeDigestTests.java
  19. 3 6
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/AlternativeMergeTests.java
  20. 1 3
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java
  21. 1 3
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java
  22. 1 3
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java
  23. 5 8
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/ComparisonTests.java
  24. 6 7
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/IntAVLTreeTests.java
  25. 5 8
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/MedianTests.java
  26. 6 8
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/SortTests.java
  27. 109 0
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTestCase.java
  28. 1 9
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTests.java
  29. 1 3
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java
  30. 401 0
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArrays.java
  31. 12 6
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java
  32. 360 0
      server/src/test/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArraysTests.java
  33. 124 111
      server/src/test/java/org/elasticsearch/search/aggregations/metrics/TDigestStateTests.java
  34. 16 0
      test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

+ 6 - 3
benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/SortBench.java

@@ -21,10 +21,12 @@
 
 package org.elasticsearch.benchmark.tdigest;
 
+import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
 import org.elasticsearch.tdigest.Sort;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
 import org.elasticsearch.tdigest.arrays.TDigestIntArray;
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -51,7 +53,8 @@ import java.util.concurrent.TimeUnit;
 @State(Scope.Thread)
 public class SortBench {
     private final int size = 100000;
-    private final TDigestDoubleArray values = WrapperTDigestArrays.INSTANCE.newDoubleArray(size);
+    private final TDigestArrays arrays = new MemoryTrackingTDigestArrays(new NoopCircuitBreaker("default-wrapper-tdigest-arrays"));
+    private final TDigestDoubleArray values = arrays.newDoubleArray(size);
 
     @Param({ "0", "1", "-1" })
     public int sortDirection;
@@ -72,7 +75,7 @@ public class SortBench {
 
     @Benchmark
     public void stableSort() {
-        TDigestIntArray order = WrapperTDigestArrays.INSTANCE.newIntArray(size);
+        TDigestIntArray order = arrays.newIntArray(size);
         for (int i = 0; i < size; i++) {
             order.set(i, i);
         }

+ 7 - 4
benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/TDigestBench.java

@@ -21,9 +21,11 @@
 
 package org.elasticsearch.benchmark.tdigest;
 
+import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
 import org.elasticsearch.tdigest.MergingDigest;
 import org.elasticsearch.tdigest.TDigest;
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -56,24 +58,25 @@ import java.util.function.Supplier;
 @Threads(1)
 @State(Scope.Thread)
 public class TDigestBench {
+    private static final TDigestArrays arrays = new MemoryTrackingTDigestArrays(new NoopCircuitBreaker("default-wrapper-tdigest-arrays"));
 
     public enum TDigestFactory {
         MERGE {
             @Override
             TDigest create(double compression) {
-                return new MergingDigest(WrapperTDigestArrays.INSTANCE, compression, (int) (10 * compression));
+                return new MergingDigest(arrays, compression, (int) (10 * compression));
             }
         },
         AVL_TREE {
             @Override
             TDigest create(double compression) {
-                return TDigest.createAvlTreeDigest(WrapperTDigestArrays.INSTANCE, compression);
+                return TDigest.createAvlTreeDigest(arrays, compression);
             }
         },
         HYBRID {
             @Override
             TDigest create(double compression) {
-                return TDigest.createHybridDigest(WrapperTDigestArrays.INSTANCE, compression);
+                return TDigest.createHybridDigest(arrays, compression);
             }
         };
 

+ 2 - 0
libs/tdigest/build.gradle

@@ -22,6 +22,8 @@ apply plugin: 'elasticsearch.build'
 apply plugin: 'elasticsearch.publish'
 
 dependencies {
+  api project(':libs:elasticsearch-core')
+
   testImplementation(project(":test:framework")) {
     exclude group: 'org.elasticsearch', module: 'elasticsearch-tdigest'
   }

+ 2 - 0
libs/tdigest/src/main/java/module-info.java

@@ -18,6 +18,8 @@
  */
 
 module org.elasticsearch.tdigest {
+    requires org.elasticsearch.base;
+
     exports org.elasticsearch.tdigest;
     exports org.elasticsearch.tdigest.arrays;
 }

+ 7 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java

@@ -21,6 +21,8 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
 import org.elasticsearch.tdigest.arrays.TDigestLongArray;
@@ -31,7 +33,7 @@ import java.util.Iterator;
 /**
  * A tree of t-digest centroids.
  */
-final class AVLGroupTree extends AbstractCollection<Centroid> {
+final class AVLGroupTree extends AbstractCollection<Centroid> implements Releasable {
     /* For insertions into the tree */
     private double centroid;
     private long count;
@@ -267,4 +269,8 @@ final class AVLGroupTree extends AbstractCollection<Centroid> {
         }
     }
 
+    @Override
+    public void close() {
+        Releasables.close(centroids, counts, aggregatedCounts, tree);
+    }
 }

+ 24 - 17
libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
 import java.util.Collection;
@@ -153,26 +154,27 @@ public class AVLTreeDigest extends AbstractTDigest {
         }
         needsCompression = false;
 
-        AVLGroupTree centroids = summary;
-        this.summary = new AVLGroupTree(arrays);
+        try (AVLGroupTree centroids = summary) {
+            this.summary = new AVLGroupTree(arrays);
 
-        final int[] nodes = new int[centroids.size()];
-        nodes[0] = centroids.first();
-        for (int i = 1; i < nodes.length; ++i) {
-            nodes[i] = centroids.next(nodes[i - 1]);
-            assert nodes[i] != IntAVLTree.NIL;
-        }
-        assert centroids.next(nodes[nodes.length - 1]) == IntAVLTree.NIL;
+            final int[] nodes = new int[centroids.size()];
+            nodes[0] = centroids.first();
+            for (int i = 1; i < nodes.length; ++i) {
+                nodes[i] = centroids.next(nodes[i - 1]);
+                assert nodes[i] != IntAVLTree.NIL;
+            }
+            assert centroids.next(nodes[nodes.length - 1]) == IntAVLTree.NIL;
 
-        for (int i = centroids.size() - 1; i > 0; --i) {
-            final int other = gen.nextInt(i + 1);
-            final int tmp = nodes[other];
-            nodes[other] = nodes[i];
-            nodes[i] = tmp;
-        }
+            for (int i = centroids.size() - 1; i > 0; --i) {
+                final int other = gen.nextInt(i + 1);
+                final int tmp = nodes[other];
+                nodes[other] = nodes[i];
+                nodes[i] = tmp;
+            }
 
-        for (int node : nodes) {
-            add(centroids.mean(node), centroids.count(node));
+            for (int node : nodes) {
+                add(centroids.mean(node), centroids.count(node));
+            }
         }
     }
 
@@ -356,4 +358,9 @@ public class AVLTreeDigest extends AbstractTDigest {
         compress();
         return 64 + summary.size() * 13;
     }
+
+    @Override
+    public void close() {
+        Releasables.close(summary);
+    }
 }

+ 7 - 0
libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
 import java.util.Collection;
@@ -110,6 +111,7 @@ public class HybridDigest extends AbstractTDigest {
             }
             mergingDigest.reserve(size);
             // Release the allocated SortingDigest.
+            sortingDigest.close();
             sortingDigest = null;
         } else {
             sortingDigest.reserve(size);
@@ -196,4 +198,9 @@ public class HybridDigest extends AbstractTDigest {
         }
         return sortingDigest.byteSize();
     }
+
+    @Override
+    public void close() {
+        Releasables.close(sortingDigest, mergingDigest);
+    }
 }

+ 7 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/IntAVLTree.java

@@ -21,6 +21,8 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestByteArray;
 import org.elasticsearch.tdigest.arrays.TDigestIntArray;
@@ -33,7 +35,7 @@ import java.util.Arrays;
  * want to add data to the nodes, typically by using arrays and node
  * identifiers as indices.
  */
-abstract class IntAVLTree {
+abstract class IntAVLTree implements Releasable {
     /**
      * We use <code>0</code> instead of <code>-1</code> so that left(NIL) works without
      * condition.
@@ -586,4 +588,8 @@ abstract class IntAVLTree {
 
     }
 
+    @Override
+    public void close() {
+        Releasables.close(parent, left, right, depth);
+    }
 }

+ 6 - 7
libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
 import org.elasticsearch.tdigest.arrays.TDigestIntArray;
@@ -66,8 +67,6 @@ import java.util.Iterator;
  * what the AVLTreeDigest uses and no dynamic allocation is required at all.
  */
 public class MergingDigest extends AbstractTDigest {
-    private final TDigestArrays arrays;
-
     private int mergeCount = 0;
 
     private final double publicCompression;
@@ -138,8 +137,6 @@ public class MergingDigest extends AbstractTDigest {
      * @param size        Size of main buffer
      */
     public MergingDigest(TDigestArrays arrays, double compression, int bufferSize, int size) {
-        this.arrays = arrays;
-
         // ensure compression >= 10
         // default size = 2 * ceil(compression)
         // default bufferSize = 5 * size
@@ -274,9 +271,6 @@ public class MergingDigest extends AbstractTDigest {
         incomingWeight.set(incomingCount, weight, 0, lastUsedCell);
         incomingCount += lastUsedCell;
 
-        if (incomingOrder == null) {
-            incomingOrder = arrays.newIntArray(incomingCount);
-        }
         Sort.stableSort(incomingOrder, incomingMean, incomingCount);
 
         totalWeight += unmergedWeight;
@@ -581,4 +575,9 @@ public class MergingDigest extends AbstractTDigest {
             + "-"
             + (useTwoLevelCompression ? "twoLevel" : "oneLevel");
     }
+
+    @Override
+    public void close() {
+        Releasables.close(weight, mean, tempWeight, tempMean, order);
+    }
 }

+ 6 - 0
libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
 
@@ -137,4 +138,9 @@ public class SortingDigest extends AbstractTDigest {
     public int byteSize() {
         return values.size() * 8;
     }
+
+    @Override
+    public void close() {
+        Releasables.close(values);
+    }
 }

+ 2 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasable;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
 import java.util.Collection;
@@ -37,7 +38,7 @@ import java.util.Locale;
  * - test coverage roughly at 90%
  * - easy to adapt for use with map-reduce
  */
-public abstract class TDigest {
+public abstract class TDigest implements Releasable {
     protected ScaleFunction scale = ScaleFunction.K_2;
     double min = Double.POSITIVE_INFINITY;
     double max = Double.NEGATIVE_INFINITY;

+ 3 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestByteArray.java

@@ -21,10 +21,12 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.elasticsearch.core.Releasable;
+
 /**
  * Minimal interface for ByteArray-like classes used within TDigest.
  */
-public interface TDigestByteArray {
+public interface TDigestByteArray extends Releasable {
     int size();
 
     byte get(int index);

+ 3 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestDoubleArray.java

@@ -21,10 +21,12 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.elasticsearch.core.Releasable;
+
 /**
  * Minimal interface for DoubleArray-like classes used within TDigest.
  */
-public interface TDigestDoubleArray {
+public interface TDigestDoubleArray extends Releasable {
     int size();
 
     double get(int index);

+ 3 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestIntArray.java

@@ -21,10 +21,12 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.elasticsearch.core.Releasable;
+
 /**
  * Minimal interface for IntArray-like classes used within TDigest.
  */
-public interface TDigestIntArray {
+public interface TDigestIntArray extends Releasable {
     int size();
 
     int get(int index);

+ 3 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestLongArray.java

@@ -21,10 +21,12 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.elasticsearch.core.Releasable;
+
 /**
  * Minimal interface for LongArray-like classes used within TDigest.
  */
-public interface TDigestLongArray {
+public interface TDigestLongArray extends Releasable {
     int size();
 
     long get(int index);

+ 0 - 258
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/WrapperTDigestArrays.java

@@ -1,258 +0,0 @@
-/*
- * Licensed to Elasticsearch B.V. under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch B.V. licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- * This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License.
- */
-
-package org.elasticsearch.tdigest.arrays;
-
-import java.util.Arrays;
-
-/**
- * Temporal TDigestArrays with raw arrays.
- *
- * <p>
- *     Delete after the right implementation for BigArrays is made.
- * </p>
- */
-public class WrapperTDigestArrays implements TDigestArrays {
-
-    public static final WrapperTDigestArrays INSTANCE = new WrapperTDigestArrays();
-
-    private WrapperTDigestArrays() {}
-
-    @Override
-    public WrapperTDigestDoubleArray newDoubleArray(int initialCapacity) {
-        return new WrapperTDigestDoubleArray(initialCapacity);
-    }
-
-    @Override
-    public WrapperTDigestIntArray newIntArray(int initialSize) {
-        return new WrapperTDigestIntArray(initialSize);
-    }
-
-    @Override
-    public TDigestLongArray newLongArray(int initialSize) {
-        return new WrapperTDigestLongArray(initialSize);
-    }
-
-    @Override
-    public TDigestByteArray newByteArray(int initialSize) {
-        return new WrapperTDigestByteArray(initialSize);
-    }
-
-    public WrapperTDigestDoubleArray newDoubleArray(double[] array) {
-        return new WrapperTDigestDoubleArray(array);
-    }
-
-    public WrapperTDigestIntArray newIntArray(int[] array) {
-        return new WrapperTDigestIntArray(array);
-    }
-
-    public static class WrapperTDigestDoubleArray implements TDigestDoubleArray {
-        private double[] array;
-        private int size;
-
-        public WrapperTDigestDoubleArray(int initialSize) {
-            this(new double[initialSize]);
-        }
-
-        public WrapperTDigestDoubleArray(double[] array) {
-            this.array = array;
-            this.size = array.length;
-        }
-
-        @Override
-        public int size() {
-            return size;
-        }
-
-        @Override
-        public double get(int index) {
-            assert index >= 0 && index < size;
-            return array[index];
-        }
-
-        @Override
-        public void set(int index, double value) {
-            assert index >= 0 && index < size;
-            array[index] = value;
-        }
-
-        @Override
-        public void add(double value) {
-            ensureCapacity(size + 1);
-            array[size++] = value;
-        }
-
-        @Override
-        public void sort() {
-            Arrays.sort(array, 0, size);
-        }
-
-        @Override
-        public void ensureCapacity(int requiredCapacity) {
-            if (requiredCapacity > array.length) {
-                int newSize = array.length + (array.length >> 1);
-                if (newSize < requiredCapacity) {
-                    newSize = requiredCapacity;
-                }
-                double[] newArray = new double[newSize];
-                System.arraycopy(array, 0, newArray, 0, size);
-                array = newArray;
-            }
-        }
-
-        @Override
-        public void resize(int newSize) {
-            if (newSize > array.length) {
-                array = Arrays.copyOf(array, newSize);
-            }
-            if (newSize > size) {
-                Arrays.fill(array, size, newSize, 0);
-            }
-            size = newSize;
-        }
-    }
-
-    public static class WrapperTDigestIntArray implements TDigestIntArray {
-        private int[] array;
-        private int size;
-
-        public WrapperTDigestIntArray(int initialSize) {
-            this(new int[initialSize]);
-        }
-
-        public WrapperTDigestIntArray(int[] array) {
-            this.array = array;
-            this.size = array.length;
-        }
-
-        @Override
-        public int size() {
-            return size;
-        }
-
-        @Override
-        public int get(int index) {
-            assert index >= 0 && index < size;
-            return array[index];
-        }
-
-        @Override
-        public void set(int index, int value) {
-            assert index >= 0 && index < size;
-            array[index] = value;
-        }
-
-        @Override
-        public void resize(int newSize) {
-            if (newSize > array.length) {
-                array = Arrays.copyOf(array, newSize);
-            }
-            if (newSize > size) {
-                Arrays.fill(array, size, newSize, 0);
-            }
-            size = newSize;
-        }
-    }
-
-    public static class WrapperTDigestLongArray implements TDigestLongArray {
-        private long[] array;
-        private int size;
-
-        public WrapperTDigestLongArray(int initialSize) {
-            this(new long[initialSize]);
-        }
-
-        public WrapperTDigestLongArray(long[] array) {
-            this.array = array;
-            this.size = array.length;
-        }
-
-        @Override
-        public int size() {
-            return size;
-        }
-
-        @Override
-        public long get(int index) {
-            assert index >= 0 && index < size;
-            return array[index];
-        }
-
-        @Override
-        public void set(int index, long value) {
-            assert index >= 0 && index < size;
-            array[index] = value;
-        }
-
-        @Override
-        public void resize(int newSize) {
-            if (newSize > array.length) {
-                array = Arrays.copyOf(array, newSize);
-            }
-            if (newSize > size) {
-                Arrays.fill(array, size, newSize, 0);
-            }
-            size = newSize;
-        }
-    }
-
-    public static class WrapperTDigestByteArray implements TDigestByteArray {
-        private byte[] array;
-        private int size;
-
-        public WrapperTDigestByteArray(int initialSize) {
-            this(new byte[initialSize]);
-        }
-
-        public WrapperTDigestByteArray(byte[] array) {
-            this.array = array;
-            this.size = array.length;
-        }
-
-        @Override
-        public int size() {
-            return size;
-        }
-
-        @Override
-        public byte get(int index) {
-            assert index >= 0 && index < size;
-            return array[index];
-        }
-
-        @Override
-        public void set(int index, byte value) {
-            assert index >= 0 && index < size;
-            array[index] = value;
-        }
-
-        @Override
-        public void resize(int newSize) {
-            if (newSize > array.length) {
-                array = Arrays.copyOf(array, newSize);
-            }
-            if (newSize > size) {
-                Arrays.fill(array, size, newSize, (byte) 0);
-            }
-            size = newSize;
-        }
-    }
-}

+ 6 - 9
libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLGroupTreeTests.java

@@ -21,13 +21,10 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
-
-public class AVLGroupTreeTests extends ESTestCase {
+public class AVLGroupTreeTests extends TDigestTestCase {
 
     public void testSimpleAdds() {
-        AVLGroupTree x = new AVLGroupTree(WrapperTDigestArrays.INSTANCE);
+        AVLGroupTree x = new AVLGroupTree(arrays());
         assertEquals(IntAVLTree.NIL, x.floor(34));
         assertEquals(IntAVLTree.NIL, x.first());
         assertEquals(IntAVLTree.NIL, x.last());
@@ -46,7 +43,7 @@ public class AVLGroupTreeTests extends ESTestCase {
     }
 
     public void testBalancing() {
-        AVLGroupTree x = new AVLGroupTree(WrapperTDigestArrays.INSTANCE);
+        AVLGroupTree x = new AVLGroupTree(arrays());
         for (int i = 0; i < 101; i++) {
             x.add(new Centroid(i));
         }
@@ -60,7 +57,7 @@ public class AVLGroupTreeTests extends ESTestCase {
 
     public void testFloor() {
         // mostly tested in other tests
-        AVLGroupTree x = new AVLGroupTree(WrapperTDigestArrays.INSTANCE);
+        AVLGroupTree x = new AVLGroupTree(arrays());
         for (int i = 0; i < 101; i++) {
             x.add(new Centroid(i / 2));
         }
@@ -73,7 +70,7 @@ public class AVLGroupTreeTests extends ESTestCase {
     }
 
     public void testHeadSum() {
-        AVLGroupTree x = new AVLGroupTree(WrapperTDigestArrays.INSTANCE);
+        AVLGroupTree x = new AVLGroupTree(arrays());
         for (int i = 0; i < 1000; ++i) {
             x.add(randomDouble(), randomIntBetween(1, 10));
         }
@@ -88,7 +85,7 @@ public class AVLGroupTreeTests extends ESTestCase {
     }
 
     public void testFloorSum() {
-        AVLGroupTree x = new AVLGroupTree(WrapperTDigestArrays.INSTANCE);
+        AVLGroupTree x = new AVLGroupTree(arrays());
         int total = 0;
         for (int i = 0; i < 1000; ++i) {
             int count = randomIntBetween(1, 10);

+ 1 - 3
libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLTreeDigestTests.java

@@ -21,13 +21,11 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-
 public class AVLTreeDigestTests extends TDigestTests {
 
     protected DigestFactory factory(final double compression) {
         return () -> {
-            AVLTreeDigest digest = new AVLTreeDigest(WrapperTDigestArrays.INSTANCE, compression);
+            AVLTreeDigest digest = new AVLTreeDigest(arrays(), compression);
             digest.setRandomSeed(randomLong());
             return digest;
         };

+ 3 - 6
libs/tdigest/src/test/java/org/elasticsearch/tdigest/AlternativeMergeTests.java

@@ -21,15 +21,12 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
-public class AlternativeMergeTests extends ESTestCase {
+public class AlternativeMergeTests extends TDigestTestCase {
     /**
      * Computes size using the alternative scaling limit for both an idealized merge and for
      * a MergingDigest.
@@ -37,8 +34,8 @@ public class AlternativeMergeTests extends ESTestCase {
     public void testMerges() {
         for (int n : new int[] { 100, 1000, 10000, 100000 }) {
             for (double compression : new double[] { 50, 100, 200, 400 }) {
-                MergingDigest mergingDigest = new MergingDigest(WrapperTDigestArrays.INSTANCE, compression);
-                AVLTreeDigest treeDigest = new AVLTreeDigest(WrapperTDigestArrays.INSTANCE, compression);
+                MergingDigest mergingDigest = new MergingDigest(arrays(), compression);
+                AVLTreeDigest treeDigest = new AVLTreeDigest(arrays(), compression);
                 List<Double> data = new ArrayList<>();
                 Random gen = random();
                 for (int i = 0; i < n; i++) {

+ 1 - 3
libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java

@@ -21,9 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.test.ESTestCase;
-
-public abstract class BigCountTests extends ESTestCase {
+public abstract class BigCountTests extends TDigestTestCase {
 
     public void testBigMerge() {
         TDigest digest = createDigest();

+ 1 - 3
libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java

@@ -21,11 +21,9 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-
 public class BigCountTestsMergingDigestTests extends BigCountTests {
     @Override
     public TDigest createDigest() {
-        return new MergingDigest(WrapperTDigestArrays.INSTANCE, 100);
+        return new MergingDigest(arrays(), 100);
     }
 }

+ 1 - 3
libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java

@@ -21,11 +21,9 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-
 public class BigCountTestsTreeDigestTests extends BigCountTests {
     @Override
     public TDigest createDigest() {
-        return new AVLTreeDigest(WrapperTDigestArrays.INSTANCE, 100);
+        return new AVLTreeDigest(arrays(), 100);
     }
 }

+ 5 - 8
libs/tdigest/src/test/java/org/elasticsearch/tdigest/ComparisonTests.java

@@ -21,13 +21,10 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
-
 import java.util.Arrays;
 import java.util.function.Supplier;
 
-public class ComparisonTests extends ESTestCase {
+public class ComparisonTests extends TDigestTestCase {
 
     private static final int SAMPLE_COUNT = 1_000_000;
 
@@ -40,10 +37,10 @@ public class ComparisonTests extends ESTestCase {
 
     private void loadData(Supplier<Double> sampleGenerator) {
         final int COMPRESSION = 100;
-        avlTreeDigest = TDigest.createAvlTreeDigest(WrapperTDigestArrays.INSTANCE, COMPRESSION);
-        mergingDigest = TDigest.createMergingDigest(WrapperTDigestArrays.INSTANCE, COMPRESSION);
-        sortingDigest = TDigest.createSortingDigest(WrapperTDigestArrays.INSTANCE);
-        hybridDigest = TDigest.createHybridDigest(WrapperTDigestArrays.INSTANCE, COMPRESSION);
+        avlTreeDigest = TDigest.createAvlTreeDigest(arrays(), COMPRESSION);
+        mergingDigest = TDigest.createMergingDigest(arrays(), COMPRESSION);
+        sortingDigest = TDigest.createSortingDigest(arrays());
+        hybridDigest = TDigest.createHybridDigest(arrays(), COMPRESSION);
         samples = new double[SAMPLE_COUNT];
 
         for (int i = 0; i < SAMPLE_COUNT; i++) {

+ 6 - 7
libs/tdigest/src/test/java/org/elasticsearch/tdigest/IntAVLTreeTests.java

@@ -21,8 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
 import java.util.Arrays;
 import java.util.Iterator;
@@ -30,7 +29,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 
-public class IntAVLTreeTests extends ESTestCase {
+public class IntAVLTreeTests extends TDigestTestCase {
 
     static class IntegerBag extends IntAVLTree {
 
@@ -38,8 +37,8 @@ public class IntAVLTreeTests extends ESTestCase {
         int[] values;
         int[] counts;
 
-        IntegerBag() {
-            super(WrapperTDigestArrays.INSTANCE);
+        IntegerBag(TDigestArrays arrays) {
+            super(arrays);
             values = new int[capacity()];
             counts = new int[capacity()];
         }
@@ -89,7 +88,7 @@ public class IntAVLTreeTests extends ESTestCase {
     public void testDualAdd() {
         Random r = random();
         TreeMap<Integer, Integer> map = new TreeMap<>();
-        IntegerBag bag = new IntegerBag();
+        IntegerBag bag = new IntegerBag(arrays());
         for (int i = 0; i < 100000; ++i) {
             final int v = r.nextInt(100000);
             if (map.containsKey(v)) {
@@ -112,7 +111,7 @@ public class IntAVLTreeTests extends ESTestCase {
     public void testDualAddRemove() {
         Random r = random();
         TreeMap<Integer, Integer> map = new TreeMap<>();
-        IntegerBag bag = new IntegerBag();
+        IntegerBag bag = new IntegerBag(arrays());
         for (int i = 0; i < 100000; ++i) {
             final int v = r.nextInt(1000);
             if (r.nextBoolean()) {

+ 5 - 8
libs/tdigest/src/test/java/org/elasticsearch/tdigest/MedianTests.java

@@ -21,14 +21,11 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
-
-public class MedianTests extends ESTestCase {
+public class MedianTests extends TDigestTestCase {
 
     public void testAVL() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new AVLTreeDigest(WrapperTDigestArrays.INSTANCE, 100);
+        TDigest digest = new AVLTreeDigest(arrays(), 100);
         for (double value : data) {
             digest.add(value);
         }
@@ -39,7 +36,7 @@ public class MedianTests extends ESTestCase {
 
     public void testMergingDigest() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new MergingDigest(WrapperTDigestArrays.INSTANCE, 100);
+        TDigest digest = new MergingDigest(arrays(), 100);
         for (double value : data) {
             digest.add(value);
         }
@@ -50,7 +47,7 @@ public class MedianTests extends ESTestCase {
 
     public void testSortingDigest() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new SortingDigest(WrapperTDigestArrays.INSTANCE);
+        TDigest digest = new SortingDigest(arrays());
         for (double value : data) {
             digest.add(value);
         }
@@ -61,7 +58,7 @@ public class MedianTests extends ESTestCase {
 
     public void testHybridDigest() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new HybridDigest(WrapperTDigestArrays.INSTANCE, 100);
+        TDigest digest = new HybridDigest(arrays(), 100);
         for (double value : data) {
             digest.add(value);
         }

+ 6 - 8
libs/tdigest/src/test/java/org/elasticsearch/tdigest/SortTests.java

@@ -22,22 +22,20 @@
 package org.elasticsearch.tdigest;
 
 import org.elasticsearch.tdigest.arrays.TDigestIntArray;
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-public class SortTests extends ESTestCase {
+public class SortTests extends TDigestTestCase {
     public void testReverse() {
-        TDigestIntArray x = WrapperTDigestArrays.INSTANCE.newIntArray(0);
+        TDigestIntArray x = arrays().newIntArray(0);
 
         // don't crash with no input
         Sort.reverse(x, 0, x.size());
 
         // reverse stuff!
-        x = WrapperTDigestArrays.INSTANCE.newIntArray(new int[] { 1, 2, 3, 4, 5 });
+        x = arrays().newIntArray(new int[] { 1, 2, 3, 4, 5 });
         Sort.reverse(x, 0, x.size());
         for (int i = 0; i < 5; i++) {
             assertEquals(5 - i, x.get(i));
@@ -59,7 +57,7 @@ public class SortTests extends ESTestCase {
         assertEquals(4, x.get(3));
         assertEquals(1, x.get(4));
 
-        x = WrapperTDigestArrays.INSTANCE.newIntArray(new int[] { 1, 2, 3, 4, 5, 6 });
+        x = arrays().newIntArray(new int[] { 1, 2, 3, 4, 5, 6 });
         Sort.reverse(x, 0, x.size());
         for (int i = 0; i < 6; i++) {
             assertEquals(6 - i, x.get(i));
@@ -229,8 +227,8 @@ public class SortTests extends ESTestCase {
     }
 
     private void sort(int[] order, double[] values, int n) {
-        var wrappedOrder = WrapperTDigestArrays.INSTANCE.newIntArray(order);
-        var wrappedValues = WrapperTDigestArrays.INSTANCE.newDoubleArray(values);
+        var wrappedOrder = arrays().newIntArray(order);
+        var wrappedValues = arrays().newDoubleArray(values);
 
         Sort.stableSort(wrappedOrder, wrappedValues, n);
     }

+ 109 - 0
libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTestCase.java

@@ -0,0 +1,109 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License.
+ */
+
+package org.elasticsearch.tdigest;
+
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
+import org.elasticsearch.tdigest.arrays.TDigestByteArray;
+import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
+import org.elasticsearch.tdigest.arrays.TDigestIntArray;
+import org.elasticsearch.tdigest.arrays.TDigestLongArray;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.After;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for TDigest tests that require {@link TDigestArrays} instances.
+ * <p>
+ *     This class provides arrays that will be automatically closed after the test.
+ *     It will also test that all memory have been freed, as the arrays use a counting CircuitBreaker.
+ * </p>
+ */
+public abstract class TDigestTestCase extends ESTestCase {
+    private final Collection<Releasable> trackedArrays = ConcurrentHashMap.newKeySet();
+
+    /**
+     * Create a new TDigestArrays instance with a limited breaker. This method may be called multiple times.
+     *
+     * <p>
+     *     The arrays created by this method will be automatically released after the test.
+     * </p>
+     */
+    protected DelegatingTDigestArrays arrays() {
+        return new DelegatingTDigestArrays();
+    }
+
+    /**
+     * Release all arrays before {@link ESTestCase} checks for unreleased bytes.
+     */
+    @After
+    public void releaseArrays() {
+        Releasables.close(trackedArrays);
+        trackedArrays.clear();
+    }
+
+    private <T extends Releasable> T register(T releasable) {
+        trackedArrays.add(releasable);
+        return releasable;
+    }
+
+    protected final class DelegatingTDigestArrays implements TDigestArrays {
+        private final MemoryTrackingTDigestArrays delegate;
+
+        DelegatingTDigestArrays() {
+            this.delegate = new MemoryTrackingTDigestArrays(newLimitedBreaker(ByteSizeValue.ofMb(100)));
+        }
+
+        public TDigestDoubleArray newDoubleArray(double[] data) {
+            return register(delegate.newDoubleArray(data));
+        }
+
+        @Override
+        public TDigestDoubleArray newDoubleArray(int size) {
+            return register(delegate.newDoubleArray(size));
+        }
+
+        public TDigestIntArray newIntArray(int[] data) {
+            return register(delegate.newIntArray(data));
+        }
+
+        @Override
+        public TDigestIntArray newIntArray(int size) {
+            return register(delegate.newIntArray(size));
+        }
+
+        @Override
+        public TDigestLongArray newLongArray(int size) {
+            return register(delegate.newLongArray(size));
+        }
+
+        @Override
+        public TDigestByteArray newByteArray(int initialSize) {
+            return register(delegate.newByteArray(initialSize));
+        }
+    }
+}

+ 1 - 9
libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTests.java

@@ -21,10 +21,6 @@
 
 package org.elasticsearch.tdigest;
 
-import org.elasticsearch.tdigest.arrays.TDigestArrays;
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-import org.elasticsearch.test.ESTestCase;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -34,7 +30,7 @@ import java.util.Random;
 /**
  * Base test case for TDigests, just extend this class and implement the abstract methods.
  */
-public abstract class TDigestTests extends ESTestCase {
+public abstract class TDigestTests extends TDigestTestCase {
 
     public interface DigestFactory {
         TDigest create();
@@ -544,8 +540,4 @@ public abstract class TDigestTests extends ESTestCase {
             lastQuantile = q;
         }
     }
-
-    protected static TDigestArrays arrays() {
-        return WrapperTDigestArrays.INSTANCE;
-    }
 }

+ 1 - 3
server/src/main/java/org/elasticsearch/search/aggregations/metrics/EmptyTDigestState.java

@@ -9,12 +9,10 @@
 
 package org.elasticsearch.search.aggregations.metrics;
 
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
-
 public final class EmptyTDigestState extends TDigestState {
     public EmptyTDigestState() {
         // Use the sorting implementation to minimize memory allocation.
-        super(WrapperTDigestArrays.INSTANCE, Type.SORTING, 1.0D);
+        super(MemoryTrackingTDigestArrays.INSTANCE, Type.SORTING, 1.0D);
     }
 
     @Override

+ 401 - 0
server/src/main/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArrays.java

@@ -0,0 +1,401 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.search.aggregations.metrics;
+
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
+import org.elasticsearch.tdigest.arrays.TDigestByteArray;
+import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
+import org.elasticsearch.tdigest.arrays.TDigestIntArray;
+import org.elasticsearch.tdigest.arrays.TDigestLongArray;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * TDigestArrays with raw arrays and circuit breaking.
+ */
+public class MemoryTrackingTDigestArrays implements TDigestArrays {
+
+    /**
+     * Default no-op CB instance of the wrapper.
+     *
+     * @deprecated This instance shouldn't be used, and will be removed after all usages are replaced.
+     */
+    @Deprecated
+    public static final MemoryTrackingTDigestArrays INSTANCE = new MemoryTrackingTDigestArrays(
+        new NoopCircuitBreaker("default-wrapper-tdigest-arrays")
+    );
+
+    private final CircuitBreaker breaker;
+
+    public MemoryTrackingTDigestArrays(CircuitBreaker breaker) {
+        this.breaker = breaker;
+    }
+
+    @Override
+    public MemoryTrackingTDigestDoubleArray newDoubleArray(int initialSize) {
+        breaker.addEstimateBytesAndMaybeBreak(
+            MemoryTrackingTDigestDoubleArray.estimatedRamBytesUsed(initialSize),
+            "tdigest-new-double-array"
+        );
+        return new MemoryTrackingTDigestDoubleArray(breaker, initialSize);
+    }
+
+    @Override
+    public MemoryTrackingTDigestIntArray newIntArray(int initialSize) {
+        breaker.addEstimateBytesAndMaybeBreak(MemoryTrackingTDigestIntArray.estimatedRamBytesUsed(initialSize), "tdigest-new-int-array");
+        return new MemoryTrackingTDigestIntArray(breaker, initialSize);
+    }
+
+    @Override
+    public TDigestLongArray newLongArray(int initialSize) {
+        breaker.addEstimateBytesAndMaybeBreak(MemoryTrackingTDigestLongArray.estimatedRamBytesUsed(initialSize), "tdigest-new-long-array");
+        return new MemoryTrackingTDigestLongArray(breaker, initialSize);
+    }
+
+    @Override
+    public TDigestByteArray newByteArray(int initialSize) {
+        breaker.addEstimateBytesAndMaybeBreak(MemoryTrackingTDigestByteArray.estimatedRamBytesUsed(initialSize), "tdigest-new-byte-array");
+        return new MemoryTrackingTDigestByteArray(breaker, initialSize);
+    }
+
+    public MemoryTrackingTDigestDoubleArray newDoubleArray(double[] array) {
+        breaker.addEstimateBytesAndMaybeBreak(
+            MemoryTrackingTDigestDoubleArray.estimatedRamBytesUsed(array.length),
+            "tdigest-new-double-array"
+        );
+        return new MemoryTrackingTDigestDoubleArray(breaker, array);
+    }
+
+    public MemoryTrackingTDigestIntArray newIntArray(int[] array) {
+        breaker.addEstimateBytesAndMaybeBreak(MemoryTrackingTDigestIntArray.estimatedRamBytesUsed(array.length), "tdigest-new-int-array");
+        return new MemoryTrackingTDigestIntArray(breaker, array);
+    }
+
+    private static long estimatedArraySize(long arrayLength, long bytesPerElement) {
+        return RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + arrayLength * bytesPerElement);
+    }
+
+    private abstract static class AbstractMemoryTrackingArray implements Releasable, Accountable {
+        protected final CircuitBreaker breaker;
+        private final AtomicBoolean closed = new AtomicBoolean(false);
+
+        AbstractMemoryTrackingArray(CircuitBreaker breaker) {
+            this.breaker = breaker;
+        }
+
+        @Override
+        public final void close() {
+            if (closed.compareAndSet(false, true)) {
+                breaker.addWithoutBreaking(-ramBytesUsed());
+            }
+        }
+    }
+
+    public static class MemoryTrackingTDigestDoubleArray extends AbstractMemoryTrackingArray implements TDigestDoubleArray {
+        static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(MemoryTrackingTDigestDoubleArray.class);
+
+        private double[] array;
+        private int size;
+
+        public MemoryTrackingTDigestDoubleArray(CircuitBreaker breaker, int initialSize) {
+            this(breaker, new double[initialSize]);
+        }
+
+        public MemoryTrackingTDigestDoubleArray(CircuitBreaker breaker, double[] array) {
+            super(breaker);
+            this.array = array;
+            this.size = array.length;
+        }
+
+        public static long estimatedRamBytesUsed(int size) {
+            return SHALLOW_SIZE + estimatedArraySize(size, Double.BYTES);
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return estimatedRamBytesUsed(array.length);
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public double get(int index) {
+            assert index >= 0 && index < size;
+            return array[index];
+        }
+
+        @Override
+        public void set(int index, double value) {
+            assert index >= 0 && index < size;
+            array[index] = value;
+        }
+
+        @Override
+        public void add(double value) {
+            ensureCapacity(size + 1);
+            array[size++] = value;
+        }
+
+        @Override
+        public void sort() {
+            Arrays.sort(array, 0, size);
+        }
+
+        @Override
+        public void resize(int newSize) {
+            ensureCapacity(newSize);
+
+            if (newSize > size) {
+                Arrays.fill(array, size, newSize, 0);
+            }
+
+            size = newSize;
+        }
+
+        @Override
+        public void ensureCapacity(int requiredCapacity) {
+            if (requiredCapacity > array.length) {
+                double[] oldArray = array;
+                // Used for used bytes assertion
+                long oldRamBytesUsed = ramBytesUsed();
+                long oldArraySize = RamUsageEstimator.sizeOf(oldArray);
+
+                int newSize = ArrayUtil.oversize(requiredCapacity, Double.BYTES);
+                long newArraySize = estimatedArraySize(newSize, Double.BYTES);
+                breaker.addEstimateBytesAndMaybeBreak(newArraySize, "tdigest-new-capacity-double-array");
+                array = Arrays.copyOf(array, newSize);
+                breaker.addWithoutBreaking(-RamUsageEstimator.sizeOf(oldArray));
+
+                assert ramBytesUsed() - oldRamBytesUsed == newArraySize - oldArraySize
+                    : "ramBytesUsed() should be aligned with manual array calculations";
+            }
+        }
+    }
+
+    public static class MemoryTrackingTDigestIntArray extends AbstractMemoryTrackingArray implements TDigestIntArray {
+        static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(MemoryTrackingTDigestIntArray.class);
+
+        private int[] array;
+        private int size;
+
+        public MemoryTrackingTDigestIntArray(CircuitBreaker breaker, int initialSize) {
+            this(breaker, new int[initialSize]);
+        }
+
+        public MemoryTrackingTDigestIntArray(CircuitBreaker breaker, int[] array) {
+            super(breaker);
+            this.array = array;
+            this.size = array.length;
+        }
+
+        public static long estimatedRamBytesUsed(int size) {
+            return SHALLOW_SIZE + estimatedArraySize(size, Integer.BYTES);
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return estimatedRamBytesUsed(array.length);
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public int get(int index) {
+            assert index >= 0 && index < size;
+            return array[index];
+        }
+
+        @Override
+        public void set(int index, int value) {
+            assert index >= 0 && index < size;
+            array[index] = value;
+        }
+
+        @Override
+        public void resize(int newSize) {
+            ensureCapacity(newSize);
+            if (newSize > size) {
+                Arrays.fill(array, size, newSize, 0);
+            }
+            size = newSize;
+        }
+
+        private void ensureCapacity(int requiredCapacity) {
+            if (requiredCapacity > array.length) {
+                int[] oldArray = array;
+                // Used for used bytes assertion
+                long oldRamBytesUsed = ramBytesUsed();
+                long oldArraySize = RamUsageEstimator.sizeOf(oldArray);
+
+                int newSize = ArrayUtil.oversize(requiredCapacity, Integer.BYTES);
+                long newArraySize = estimatedArraySize(newSize, Integer.BYTES);
+                breaker.addEstimateBytesAndMaybeBreak(newArraySize, "tdigest-new-capacity-int-array");
+                array = Arrays.copyOf(array, newSize);
+                breaker.addWithoutBreaking(-RamUsageEstimator.sizeOf(oldArray));
+
+                assert ramBytesUsed() - oldRamBytesUsed == newArraySize - oldArraySize
+                    : "ramBytesUsed() should be aligned with manual array calculations";
+            }
+        }
+    }
+
+    public static class MemoryTrackingTDigestLongArray extends AbstractMemoryTrackingArray implements TDigestLongArray {
+        static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(MemoryTrackingTDigestLongArray.class);
+
+        private long[] array;
+        private int size;
+
+        public MemoryTrackingTDigestLongArray(CircuitBreaker breaker, int initialSize) {
+            this(breaker, new long[initialSize]);
+        }
+
+        public MemoryTrackingTDigestLongArray(CircuitBreaker breaker, long[] array) {
+            super(breaker);
+            this.array = array;
+            this.size = array.length;
+        }
+
+        public static long estimatedRamBytesUsed(int size) {
+            return SHALLOW_SIZE + estimatedArraySize(size, Long.BYTES);
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return estimatedRamBytesUsed(array.length);
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public long get(int index) {
+            assert index >= 0 && index < size;
+            return array[index];
+        }
+
+        @Override
+        public void set(int index, long value) {
+            assert index >= 0 && index < size;
+            array[index] = value;
+        }
+
+        @Override
+        public void resize(int newSize) {
+            ensureCapacity(newSize);
+            if (newSize > size) {
+                Arrays.fill(array, size, newSize, 0);
+            }
+            size = newSize;
+        }
+
+        private void ensureCapacity(int requiredCapacity) {
+            if (requiredCapacity > array.length) {
+                long[] oldArray = array;
+                // Used for used bytes assertion
+                long oldRamBytesUsed = ramBytesUsed();
+                long oldArraySize = RamUsageEstimator.sizeOf(oldArray);
+
+                int newSize = ArrayUtil.oversize(requiredCapacity, Long.BYTES);
+                long newArraySize = estimatedArraySize(newSize, Long.BYTES);
+                breaker.addEstimateBytesAndMaybeBreak(newArraySize, "tdigest-new-capacity-long-array");
+                array = Arrays.copyOf(array, newSize);
+                breaker.addWithoutBreaking(-RamUsageEstimator.sizeOf(oldArray));
+
+                assert ramBytesUsed() - oldRamBytesUsed == newArraySize - oldArraySize
+                    : "ramBytesUsed() should be aligned with manual array calculations";
+            }
+        }
+    }
+
+    public static class MemoryTrackingTDigestByteArray extends AbstractMemoryTrackingArray implements TDigestByteArray {
+        static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(MemoryTrackingTDigestByteArray.class);
+
+        private byte[] array;
+        private int size;
+
+        public MemoryTrackingTDigestByteArray(CircuitBreaker breaker, int initialSize) {
+            this(breaker, new byte[initialSize]);
+        }
+
+        public MemoryTrackingTDigestByteArray(CircuitBreaker breaker, byte[] array) {
+            super(breaker);
+            this.array = array;
+            this.size = array.length;
+        }
+
+        public static long estimatedRamBytesUsed(int size) {
+            return SHALLOW_SIZE + estimatedArraySize(size, Byte.BYTES);
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return estimatedRamBytesUsed(array.length);
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public byte get(int index) {
+            assert index >= 0 && index < size;
+            return array[index];
+        }
+
+        @Override
+        public void set(int index, byte value) {
+            assert index >= 0 && index < size;
+            array[index] = value;
+        }
+
+        @Override
+        public void resize(int newSize) {
+            ensureCapacity(newSize);
+            if (newSize > size) {
+                Arrays.fill(array, size, newSize, (byte) 0);
+            }
+            size = newSize;
+        }
+
+        private void ensureCapacity(int requiredCapacity) {
+            if (requiredCapacity > array.length) {
+                byte[] oldArray = array;
+                // Used for used bytes assertion
+                long oldRamBytesUsed = ramBytesUsed();
+                long oldArraySize = RamUsageEstimator.sizeOf(oldArray);
+
+                int newSize = ArrayUtil.oversize(requiredCapacity, Byte.BYTES);
+                long newArraySize = estimatedArraySize(newSize, Byte.BYTES);
+                breaker.addEstimateBytesAndMaybeBreak(newArraySize, "tdigest-new-capacity-byte-array");
+                array = Arrays.copyOf(array, newSize);
+                breaker.addWithoutBreaking(-RamUsageEstimator.sizeOf(oldArray));
+
+                assert ramBytesUsed() - oldRamBytesUsed == newArraySize - oldArraySize
+                    : "ramBytesUsed() should be aligned with manual array calculations";
+            }
+        }
+    }
+}

+ 12 - 6
server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java

@@ -11,10 +11,11 @@ package org.elasticsearch.search.aggregations.metrics;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.Centroid;
 import org.elasticsearch.tdigest.TDigest;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -25,7 +26,7 @@ import java.util.Iterator;
  * through factory method params, providing one optimized for performance (e.g. MergingDigest or HybridDigest) by default, or optionally one
  * that produces highly accurate results regardless of input size but its construction over the sample population takes 2x-10x longer.
  */
-public class TDigestState {
+public class TDigestState implements Releasable {
 
     private final double compression;
 
@@ -54,7 +55,7 @@ public class TDigestState {
      */
     @Deprecated
     public static TDigestState create(double compression) {
-        return create(WrapperTDigestArrays.INSTANCE, compression);
+        return create(MemoryTrackingTDigestArrays.INSTANCE, compression);
     }
 
     /**
@@ -81,7 +82,7 @@ public class TDigestState {
      */
     @Deprecated
     public static TDigestState create(double compression, TDigestExecutionHint executionHint) {
-        return create(WrapperTDigestArrays.INSTANCE, compression, executionHint);
+        return create(MemoryTrackingTDigestArrays.INSTANCE, compression, executionHint);
     }
 
     /**
@@ -106,7 +107,7 @@ public class TDigestState {
      * @return a TDigestState object
      */
     public static TDigestState createUsingParamsFrom(TDigestState state) {
-        return new TDigestState(WrapperTDigestArrays.INSTANCE, state.type, state.compression);
+        return new TDigestState(MemoryTrackingTDigestArrays.INSTANCE, state.type, state.compression);
     }
 
     protected TDigestState(TDigestArrays arrays, Type type, double compression) {
@@ -143,7 +144,7 @@ public class TDigestState {
      */
     @Deprecated
     public static TDigestState read(StreamInput in) throws IOException {
-        return read(WrapperTDigestArrays.INSTANCE, in);
+        return read(MemoryTrackingTDigestArrays.INSTANCE, in);
     }
 
     public static TDigestState read(TDigestArrays arrays, StreamInput in) throws IOException {
@@ -267,4 +268,9 @@ public class TDigestState {
     public final double getMax() {
         return tdigest.getMax();
     }
+
+    @Override
+    public void close() {
+        Releasables.close(tdigest);
+    }
 }

+ 360 - 0
server/src/test/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArraysTests.java

@@ -0,0 +1,360 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.search.aggregations.metrics;
+
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
+import org.elasticsearch.tdigest.arrays.TDigestByteArray;
+import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
+import org.elasticsearch.tdigest.arrays.TDigestIntArray;
+import org.elasticsearch.tdigest.arrays.TDigestLongArray;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
+public class MemoryTrackingTDigestArraysTests extends ESTestCase {
+    // Int arrays
+
+    public void testIntEmpty() {
+        try (TDigestIntArray array = intArray(0)) {
+            assertThat(array.size(), equalTo(0));
+        }
+    }
+
+    public void testIntGetAndSet() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestIntArray array = intArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            int value = randomInt();
+            for (int i = 9; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            for (int i = 0; i < initialSize; i++) {
+                if (i < 9) {
+                    assertThat(array.get(i), equalTo(0));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    public void testIntResize() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestIntArray array = intArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            // Fill with a non-zero value
+            int value = randomBoolean() ? randomIntBetween(Integer.MIN_VALUE, -1) : randomIntBetween(1, Integer.MAX_VALUE);
+            for (int i = 0; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            // Resize to a size-1
+            array.resize(initialSize - 1);
+            assertThat(array.size(), equalTo(initialSize - 1));
+
+            for (int i = 0; i < initialSize - 1; i++) {
+                assertThat(array.get(i), equalTo(value));
+            }
+
+            // Resize to the original size + 1
+            array.resize(initialSize + 1);
+            assertThat(array.size(), equalTo(initialSize + 1));
+
+            // Ensure all new elements are 0
+            for (int i = 0; i < initialSize - 1; i++) {
+                if (i < initialSize) {
+                    assertThat(array.get(i), equalTo(value));
+                } else {
+                    assertThat(array.get(i), equalTo(0));
+                }
+            }
+        }
+    }
+
+    public void testIntBulkSet() {
+        int initialSize = randomIntBetween(10, 1000);
+        int sourceArraySize = randomIntBetween(0, initialSize);
+
+        try (TDigestIntArray array = intArray(initialSize); TDigestIntArray source = intArray(sourceArraySize)) {
+            assertThat(array.size(), equalTo(initialSize));
+            assertThat(source.size(), equalTo(sourceArraySize));
+
+            int value = randomInt();
+            for (int i = 0; i < sourceArraySize; i++) {
+                source.set(i, value);
+            }
+
+            int initialOffset = randomIntBetween(0, initialSize - sourceArraySize);
+            int sourceOffset = randomIntBetween(0, sourceArraySize - 1);
+            int elementsToCopy = randomIntBetween(1, sourceArraySize - sourceOffset);
+
+            array.set(initialOffset, source, sourceOffset, elementsToCopy);
+
+            for (int i = 0; i < initialSize; i++) {
+                if (i < initialOffset || i >= initialOffset + elementsToCopy) {
+                    assertThat(array.get(i), equalTo(0));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    // Long arrays
+
+    public void testLongEmpty() {
+        try (TDigestIntArray array = intArray(0)) {
+            assertThat(array.size(), equalTo(0));
+        }
+    }
+
+    public void testLongGetAndSet() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestLongArray array = longArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            long value = randomLong();
+            for (int i = 9; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            for (int i = 0; i < initialSize; i++) {
+                if (i < 9) {
+                    assertThat(array.get(i), equalTo(0L));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    public void testLongResize() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestLongArray array = longArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            // Fill with a non-zero value
+            long value = randomBoolean() ? randomLongBetween(Long.MIN_VALUE, -1) : randomLongBetween(1, Long.MAX_VALUE);
+            for (int i = 0; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            // Resize to a size-1
+            array.resize(initialSize - 1);
+            assertThat(array.size(), equalTo(initialSize - 1));
+
+            for (int i = 0; i < initialSize - 1; i++) {
+                assertThat(array.get(i), equalTo(value));
+            }
+
+            // Resize to the original size + 1
+            array.resize(initialSize + 1);
+            assertThat(array.size(), equalTo(initialSize + 1));
+
+            // Ensure all new elements are 0
+            for (int i = 0; i < initialSize - 1; i++) {
+                if (i < initialSize) {
+                    assertThat(array.get(i), equalTo(value));
+                } else {
+                    assertThat(array.get(i), equalTo(0L));
+                }
+            }
+        }
+    }
+
+    // Byte arrays
+
+    public void testByteEmpty() {
+        try (TDigestByteArray array = byteArray(0)) {
+            assertThat(array.size(), equalTo(0));
+        }
+    }
+
+    public void testByteGetAndSet() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestByteArray array = byteArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            byte value = randomByte();
+            for (int i = 9; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            for (int i = 0; i < initialSize; i++) {
+                if (i < 9) {
+                    assertThat(array.get(i), equalTo((byte) 0));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    public void testByteResize() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestByteArray array = byteArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            // Fill with a non-zero value
+            byte value = randomBoolean() ? randomByteBetween(Byte.MIN_VALUE, (byte) -1) : randomByteBetween((byte) 1, Byte.MAX_VALUE);
+            for (int i = 0; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            // Resize to a size-1
+            array.resize(initialSize - 1);
+            assertThat(array.size(), equalTo(initialSize - 1));
+
+            for (int i = 0; i < initialSize - 1; i++) {
+                assertThat(array.get(i), equalTo(value));
+            }
+
+            // Resize to the original size + 1
+            array.resize(initialSize + 1);
+            assertThat(array.size(), equalTo(initialSize + 1));
+
+            // Ensure all new elements are 0
+            for (int i = 0; i < initialSize - 1; i++) {
+                if (i < initialSize) {
+                    assertThat(array.get(i), equalTo(value));
+                } else {
+                    assertThat(array.get(i), equalTo((byte) 0));
+                }
+            }
+        }
+    }
+
+    // Double arrays
+
+    public void testDoubleEmpty() {
+        try (TDigestDoubleArray array = doubleArray(0)) {
+            assertThat(array.size(), equalTo(0));
+        }
+    }
+
+    public void testDoubleGetAndSet() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestDoubleArray array = doubleArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            double value = randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true);
+            for (int i = 9; i < initialSize; i++) {
+                array.set(i, value);
+            }
+
+            for (int i = 0; i < initialSize; i++) {
+                if (i < 9) {
+                    assertThat(array.get(i), equalTo(0.0));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    public void testDoubleAdd() {
+        int initialSize = randomIntBetween(10, 1000);
+        try (TDigestDoubleArray array = doubleArray(initialSize)) {
+            assertThat(array.size(), equalTo(initialSize));
+
+            int newValueCount = randomIntBetween(1, 100);
+            if (randomBoolean()) {
+                array.ensureCapacity(initialSize + newValueCount);
+            }
+            double value = randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true);
+            for (int i = 0; i < newValueCount; i++) {
+                array.add(value);
+            }
+
+            for (int i = 0; i < newValueCount; i++) {
+                if (i < initialSize) {
+                    assertThat(array.get(i), equalTo(0.0));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    public void testDoubleBulkSet() {
+        int initialSize = randomIntBetween(10, 1000);
+        int sourceArraySize = randomIntBetween(0, initialSize);
+
+        try (TDigestDoubleArray array = doubleArray(initialSize); TDigestDoubleArray source = doubleArray(sourceArraySize)) {
+            assertThat(array.size(), equalTo(initialSize));
+            assertThat(source.size(), equalTo(sourceArraySize));
+
+            double value = randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true);
+            for (int i = 0; i < sourceArraySize; i++) {
+                source.set(i, value);
+            }
+
+            int initialOffset = randomIntBetween(0, initialSize - sourceArraySize);
+            int sourceOffset = randomIntBetween(0, sourceArraySize - 1);
+            int elementsToCopy = randomIntBetween(1, sourceArraySize - sourceOffset);
+
+            array.set(initialOffset, source, sourceOffset, elementsToCopy);
+
+            for (int i = 0; i < initialSize; i++) {
+                if (i < initialOffset || i >= initialOffset + elementsToCopy) {
+                    assertThat(array.get(i), equalTo(0.0));
+                } else {
+                    assertThat(array.get(i), equalTo(value));
+                }
+            }
+        }
+    }
+
+    public void testDoubleSort() {
+        try (TDigestDoubleArray array = doubleArray(0)) {
+            int elementsToAdd = randomIntBetween(0, 100);
+            array.ensureCapacity(elementsToAdd);
+            for (int i = 0; i < elementsToAdd; i++) {
+                array.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
+            }
+
+            array.sort();
+
+            double previous = -Double.MAX_VALUE;
+            for (int i = 0; i < array.size(); i++) {
+                double current = array.get(i);
+                assertThat(current, greaterThanOrEqualTo(previous));
+                previous = current;
+            }
+        }
+    }
+
+    // Helpers
+
+    private TDigestIntArray intArray(int initialSize) {
+        return arrays().newIntArray(initialSize);
+    }
+
+    private TDigestLongArray longArray(int initialSize) {
+        return arrays().newLongArray(initialSize);
+    }
+
+    private TDigestByteArray byteArray(int initialSize) {
+        return arrays().newByteArray(initialSize);
+    }
+
+    private TDigestDoubleArray doubleArray(int initialSize) {
+        return arrays().newDoubleArray(initialSize);
+    }
+
+    private TDigestArrays arrays() {
+        return new MemoryTrackingTDigestArrays(newLimitedBreaker(ByteSizeValue.ofMb(100)));
+    }
+}

+ 124 - 111
server/src/test/java/org/elasticsearch/search/aggregations/metrics/TDigestStateTests.java

@@ -16,8 +16,9 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
-import org.elasticsearch.tdigest.arrays.WrapperTDigestArrays;
 import org.elasticsearch.test.ESTestCase;
 import org.hamcrest.Matchers;
 
@@ -33,140 +34,150 @@ public class TDigestStateTests extends ESTestCase {
     public void testMoreThan4BValues() {
         // Regression test for #19528
         // See https://github.com/tdunning/t-digest/pull/70/files#diff-4487072cee29b939694825647928f742R439
-        TDigestState digest = TDigestState.create(arrays(), 100);
-        for (int i = 0; i < 1000; ++i) {
-            digest.add(randomDouble());
-        }
-        final int count = 1 << 29;
-        for (int i = 0; i < 10; ++i) {
-            digest.add(randomDouble(), count);
-        }
-        assertEquals(1000 + 10L * (1 << 29), digest.size());
-        assertTrue(digest.size() > 2L * Integer.MAX_VALUE);
-        final double[] quantiles = new double[] { 0, 0.1, 0.5, 0.9, 1, randomDouble() };
-        Arrays.sort(quantiles);
-        double prev = Double.NEGATIVE_INFINITY;
-        for (double q : quantiles) {
-            final double v = digest.quantile(q);
-            logger.trace("q=" + q + ", v=" + v);
-            assertThat(v, Matchers.either(Matchers.closeTo(prev, 0.0000001D)).or(Matchers.greaterThan(prev)));
-            assertTrue("Unexpectedly low value: " + v, v >= 0.0);
-            assertTrue("Unexpectedly high value: " + v, v <= 1.0);
-            prev = v;
+        try (TDigestState digest = TDigestState.create(arrays(), 100)) {
+            for (int i = 0; i < 1000; ++i) {
+                digest.add(randomDouble());
+            }
+            final int count = 1 << 29;
+            for (int i = 0; i < 10; ++i) {
+                digest.add(randomDouble(), count);
+            }
+            assertEquals(1000 + 10L * (1 << 29), digest.size());
+            assertTrue(digest.size() > 2L * Integer.MAX_VALUE);
+            final double[] quantiles = new double[] { 0, 0.1, 0.5, 0.9, 1, randomDouble() };
+            Arrays.sort(quantiles);
+            double prev = Double.NEGATIVE_INFINITY;
+            for (double q : quantiles) {
+                final double v = digest.quantile(q);
+                logger.trace("q=" + q + ", v=" + v);
+                assertThat(v, Matchers.either(Matchers.closeTo(prev, 0.0000001D)).or(Matchers.greaterThan(prev)));
+                assertTrue("Unexpectedly low value: " + v, v >= 0.0);
+                assertTrue("Unexpectedly high value: " + v, v <= 1.0);
+                prev = v;
+            }
         }
     }
 
     public void testEqualsHashCode() {
-        final TDigestState empty1 = new EmptyTDigestState();
-        final TDigestState empty2 = new EmptyTDigestState();
-        final TDigestState a = TDigestState.create(arrays(), 200);
-        final TDigestState b = TDigestState.create(arrays(), 100);
-        final TDigestState c = TDigestState.create(arrays(), 100);
+        try (
+            TDigestState empty1 = new EmptyTDigestState();
+            TDigestState empty2 = new EmptyTDigestState();
+            TDigestState a = TDigestState.create(arrays(), 200);
+            TDigestState b = TDigestState.create(arrays(), 100);
+            TDigestState c = TDigestState.create(arrays(), 100);
+        ) {
 
-        assertEquals(empty1, empty2);
-        assertEquals(empty1.hashCode(), empty2.hashCode());
+            assertEquals(empty1, empty2);
+            assertEquals(empty1.hashCode(), empty2.hashCode());
 
-        assertNotEquals(a, b);
-        assertNotEquals(a.hashCode(), b.hashCode());
+            assertNotEquals(a, b);
+            assertNotEquals(a.hashCode(), b.hashCode());
 
-        assertNotEquals(a, c);
-        assertNotEquals(a.hashCode(), c.hashCode());
+            assertNotEquals(a, c);
+            assertNotEquals(a.hashCode(), c.hashCode());
 
-        assertEquals(b, c);
-        assertEquals(b.hashCode(), c.hashCode());
+            assertEquals(b, c);
+            assertEquals(b.hashCode(), c.hashCode());
 
-        for (int i = 0; i < 100; i++) {
-            double value = randomDouble();
-            a.add(value);
-            b.add(value);
-            c.add(value);
-        }
+            for (int i = 0; i < 100; i++) {
+                double value = randomDouble();
+                a.add(value);
+                b.add(value);
+                c.add(value);
+            }
 
-        assertNotEquals(a, b);
-        assertNotEquals(a.hashCode(), b.hashCode());
+            assertNotEquals(a, b);
+            assertNotEquals(a.hashCode(), b.hashCode());
 
-        assertNotEquals(a, c);
-        assertNotEquals(a.hashCode(), c.hashCode());
+            assertNotEquals(a, c);
+            assertNotEquals(a.hashCode(), c.hashCode());
 
-        assertEquals(b, c);
-        assertEquals(b.hashCode(), c.hashCode());
+            assertEquals(b, c);
+            assertEquals(b.hashCode(), c.hashCode());
 
-        b.add(randomDouble());
-        c.add(randomDouble());
+            b.add(randomDouble());
+            c.add(randomDouble());
 
-        assertNotEquals(b, c);
-        assertNotEquals(b.hashCode(), c.hashCode());
+            assertNotEquals(b, c);
+            assertNotEquals(b.hashCode(), c.hashCode());
+        }
     }
 
     public void testHash() {
         final HashMap<String, TDigestState> map = new HashMap<>();
         final Set<TDigestState> set = new HashSet<>();
-        final TDigestState empty1 = new EmptyTDigestState();
-        final TDigestState empty2 = new EmptyTDigestState();
-        final TDigestState a = TDigestState.create(arrays(), 200);
-        final TDigestState b = TDigestState.create(arrays(), 100);
-        final TDigestState c = TDigestState.create(arrays(), 100);
-
-        a.add(randomDouble());
-        b.add(randomDouble());
-        c.add(randomDouble());
-        expectThrows(UnsupportedOperationException.class, () -> empty1.add(randomDouble()));
-        expectThrows(UnsupportedOperationException.class, () -> empty2.add(a));
-
-        map.put("empty1", empty1);
-        map.put("empty2", empty2);
-        map.put("a", a);
-        map.put("b", b);
-        map.put("c", c);
-        set.add(empty1);
-        set.add(empty2);
-        set.add(a);
-        set.add(b);
-        set.add(c);
-
-        assertEquals(5, map.size());
-        assertEquals(4, set.size());
-
-        assertEquals(empty1, map.get("empty1"));
-        assertEquals(empty2, map.get("empty2"));
-        assertEquals(a, map.get("a"));
-        assertEquals(b, map.get("b"));
-        assertEquals(c, map.get("c"));
-
-        assertTrue(set.stream().anyMatch(digest -> digest.equals(a)));
-        assertTrue(set.stream().anyMatch(digest -> digest.equals(b)));
-        assertTrue(set.stream().anyMatch(digest -> digest.equals(c)));
-        assertTrue(set.stream().anyMatch(digest -> digest.equals(empty1)));
-        assertTrue(set.stream().anyMatch(digest -> digest.equals(empty2)));
+        try (
+            TDigestState empty1 = new EmptyTDigestState();
+            TDigestState empty2 = new EmptyTDigestState();
+            TDigestState a = TDigestState.create(arrays(), 200);
+            TDigestState b = TDigestState.create(arrays(), 100);
+            TDigestState c = TDigestState.create(arrays(), 100);
+        ) {
+
+            a.add(randomDouble());
+            b.add(randomDouble());
+            c.add(randomDouble());
+            expectThrows(UnsupportedOperationException.class, () -> empty1.add(randomDouble()));
+            expectThrows(UnsupportedOperationException.class, () -> empty2.add(a));
+
+            map.put("empty1", empty1);
+            map.put("empty2", empty2);
+            map.put("a", a);
+            map.put("b", b);
+            map.put("c", c);
+            set.add(empty1);
+            set.add(empty2);
+            set.add(a);
+            set.add(b);
+            set.add(c);
+
+            assertEquals(5, map.size());
+            assertEquals(4, set.size());
+
+            assertEquals(empty1, map.get("empty1"));
+            assertEquals(empty2, map.get("empty2"));
+            assertEquals(a, map.get("a"));
+            assertEquals(b, map.get("b"));
+            assertEquals(c, map.get("c"));
+
+            assertTrue(set.stream().anyMatch(digest -> digest.equals(a)));
+            assertTrue(set.stream().anyMatch(digest -> digest.equals(b)));
+            assertTrue(set.stream().anyMatch(digest -> digest.equals(c)));
+            assertTrue(set.stream().anyMatch(digest -> digest.equals(empty1)));
+            assertTrue(set.stream().anyMatch(digest -> digest.equals(empty2)));
+        }
     }
 
     public void testFactoryMethods() {
-        TDigestState fast = TDigestState.create(arrays(), 100);
-        TDigestState anotherFast = TDigestState.create(arrays(), 100);
-        TDigestState accurate = TDigestState.createOptimizedForAccuracy(arrays(), 100);
-        TDigestState anotherAccurate = TDigestState.createUsingParamsFrom(accurate);
-
-        for (int i = 0; i < 100; i++) {
-            fast.add(i);
-            anotherFast.add(i);
-            accurate.add(i);
-            anotherAccurate.add(i);
-        }
+        try (
+            TDigestState fast = TDigestState.create(arrays(), 100);
+            TDigestState anotherFast = TDigestState.create(arrays(), 100);
+            TDigestState accurate = TDigestState.createOptimizedForAccuracy(arrays(), 100);
+            TDigestState anotherAccurate = TDigestState.createUsingParamsFrom(accurate);
+        ) {
 
-        for (double p : new double[] { 0.1, 1, 10, 25, 50, 75, 90, 99, 99.9 }) {
-            double q = p / 100;
-            assertEquals(fast.quantile(q), accurate.quantile(q), 0.5);
-            assertEquals(fast.quantile(q), anotherFast.quantile(q), 1e-5);
-            assertEquals(accurate.quantile(q), anotherAccurate.quantile(q), 1e-5);
+            for (int i = 0; i < 100; i++) {
+                fast.add(i);
+                anotherFast.add(i);
+                accurate.add(i);
+                anotherAccurate.add(i);
+            }
+
+            for (double p : new double[] { 0.1, 1, 10, 25, 50, 75, 90, 99, 99.9 }) {
+                double q = p / 100;
+                assertEquals(fast.quantile(q), accurate.quantile(q), 0.5);
+                assertEquals(fast.quantile(q), anotherFast.quantile(q), 1e-5);
+                assertEquals(accurate.quantile(q), anotherAccurate.quantile(q), 1e-5);
+            }
+
+            assertEquals(fast, anotherFast);
+            assertEquals(accurate, anotherAccurate);
+            assertNotEquals(fast, accurate);
+            assertNotEquals(anotherFast, anotherAccurate);
         }
-
-        assertEquals(fast, anotherFast);
-        assertEquals(accurate, anotherAccurate);
-        assertNotEquals(fast, accurate);
-        assertNotEquals(anotherFast, anotherAccurate);
     }
 
-    private static TDigestState writeToAndReadFrom(TDigestState state, TransportVersion version) throws IOException {
+    private TDigestState writeToAndReadFrom(TDigestState state, TransportVersion version) throws IOException {
         BytesRef serializedAggs = serialize(state, version);
         try (
             StreamInput in = new NamedWriteableAwareStreamInput(
@@ -203,9 +214,11 @@ public class TDigestStateTests extends ESTestCase {
         TDigestState serializedBackwardsCompatible = writeToAndReadFrom(state, TransportVersions.V_8_8_1);
         assertNotEquals(serializedBackwardsCompatible, state);
         assertEquals(serializedBackwardsCompatible, backwardsCompatible);
+
+        Releasables.close(state, backwardsCompatible, serialized, serializedBackwardsCompatible);
     }
 
-    private static TDigestArrays arrays() {
-        return WrapperTDigestArrays.INSTANCE;
+    private TDigestArrays arrays() {
+        return new MemoryTrackingTDigestArrays(newLimitedBreaker(ByteSizeValue.ofMb(100)));
     }
 }

+ 16 - 0
test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

@@ -57,6 +57,7 @@ import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.CheckedSupplier;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
@@ -577,6 +578,21 @@ public abstract class ESTestCase extends LuceneTestCase {
         }
     }
 
+    private final List<CircuitBreaker> breakers = Collections.synchronizedList(new ArrayList<>());
+
+    protected final CircuitBreaker newLimitedBreaker(ByteSizeValue max) {
+        CircuitBreaker breaker = new MockBigArrays.LimitedBreaker("<es-test-case>", max);
+        breakers.add(breaker);
+        return breaker;
+    }
+
+    @After
+    public final void allBreakersMemoryReleased() {
+        for (CircuitBreaker breaker : breakers) {
+            assertThat(breaker.getUsed(), equalTo(0L));
+        }
+    }
+
     /**
      * Whether or not we check after each test whether it has left warnings behind. That happens if any deprecated feature or syntax
      * was used by the test and the test didn't assert on it using {@link #assertWarnings(String...)}.