Browse Source

[8.x] Add CircuitBreaker to TDigest, Step 4: Take into account shallow classes size (#114028)

* Add CircuitBreaker to TDigest, Step 4: Take into account shallow classes size (#113613)

* Removed muted tests from merge conflict

* Added missing empty line in muted tests
Iván Cea Fontenla 1 year ago
parent
commit
298984048b
39 changed files with 1840 additions and 632 deletions
  1. 1 2
      benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/TDigestBench.java
  2. 7 0
      docs/changelog/113613.yaml
  3. 1 0
      libs/tdigest/build.gradle
  4. 475 0
      libs/tdigest/licenses/lucene-core-LICENSE.txt
  5. 192 0
      libs/tdigest/licenses/lucene-core-NOTICE.txt
  6. 1 0
      libs/tdigest/src/main/java/module-info.java
  7. 95 38
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java
  8. 28 5
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java
  9. 29 5
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java
  10. 105 22
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/IntAVLTree.java
  11. 59 10
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java
  12. 28 2
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java
  13. 10 9
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java
  14. 2 0
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestArrays.java
  15. 2 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestByteArray.java
  16. 2 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestDoubleArray.java
  17. 2 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestIntArray.java
  18. 2 1
      libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestLongArray.java
  19. 58 54
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLGroupTreeTests.java
  20. 1 1
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLTreeDigestTests.java
  21. 42 39
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/AlternativeMergeTests.java
  22. 8 5
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java
  23. 1 1
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java
  24. 1 1
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java
  25. 14 0
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/ComparisonTests.java
  26. 1 1
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/HybridDigestTests.java
  27. 40 37
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/IntAVLTreeTests.java
  28. 28 24
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/MedianTests.java
  29. 95 89
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java
  30. 6 4
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/SortTests.java
  31. 1 1
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/SortingDigestTests.java
  32. 89 0
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestReleasingTests.java
  33. 7 2
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTestCase.java
  34. 277 256
      libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTests.java
  35. 0 1
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java
  36. 12 3
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArrays.java
  37. 58 11
      server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java
  38. 59 0
      server/src/test/java/org/elasticsearch/search/aggregations/metrics/TDigestStateReleasingTests.java
  39. 1 5
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/QuantileStates.java

+ 1 - 2
benchmarks/src/main/java/org/elasticsearch/benchmark/tdigest/TDigestBench.java

@@ -23,7 +23,6 @@ package org.elasticsearch.benchmark.tdigest;
 
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
 import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
-import org.elasticsearch.tdigest.MergingDigest;
 import org.elasticsearch.tdigest.TDigest;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -64,7 +63,7 @@ public class TDigestBench {
         MERGE {
             @Override
             TDigest create(double compression) {
-                return new MergingDigest(arrays, compression, (int) (10 * compression));
+                return TDigest.createMergingDigest(arrays, compression);
             }
         },
         AVL_TREE {

+ 7 - 0
docs/changelog/113613.yaml

@@ -0,0 +1,7 @@
+pr: 113613
+summary: "Add `CircuitBreaker` to TDigest, Step 4: Take into account shallow classes\
+  \ size"
+area: ES|QL
+type: enhancement
+issues:
+ - 113916

+ 1 - 0
libs/tdigest/build.gradle

@@ -23,6 +23,7 @@ apply plugin: 'elasticsearch.publish'
 
 dependencies {
   api project(':libs:elasticsearch-core')
+  api "org.apache.lucene:lucene-core:${versions.lucene}"
 
   testImplementation(project(":test:framework")) {
     exclude group: 'org.elasticsearch', module: 'elasticsearch-tdigest'

+ 475 - 0
libs/tdigest/licenses/lucene-core-LICENSE.txt

@@ -0,0 +1,475 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+
+Some code in core/src/java/org/apache/lucene/util/UnicodeUtil.java was
+derived from unicode conversion examples available at
+http://www.unicode.org/Public/PROGRAMS/CVTUTF.  Here is the copyright
+from those sources:
+
+/*
+ * Copyright 2001-2004 Unicode, Inc.
+ * 
+ * Disclaimer
+ * 
+ * This source code is provided as is by Unicode, Inc. No claims are
+ * made as to fitness for any particular purpose. No warranties of any
+ * kind are expressed or implied. The recipient agrees to determine
+ * applicability of information provided. If this file has been
+ * purchased on magnetic or optical media from Unicode, Inc., the
+ * sole remedy for any claim will be exchange of defective media
+ * within 90 days of receipt.
+ * 
+ * Limitations on Rights to Redistribute This Code
+ * 
+ * Unicode, Inc. hereby grants the right to freely use the information
+ * supplied in this file in the creation of products supporting the
+ * Unicode Standard, and to make copies of this file in any form
+ * for internal or external distribution as long as this notice
+ * remains attached.
+ */
+
+
+Some code in core/src/java/org/apache/lucene/util/ArrayUtil.java was
+derived from Python 2.4.2 sources available at
+http://www.python.org. Full license is here:
+
+  http://www.python.org/download/releases/2.4.2/license/
+
+Some code in core/src/java/org/apache/lucene/util/UnicodeUtil.java was
+derived from Python 3.1.2 sources available at
+http://www.python.org. Full license is here:
+
+  http://www.python.org/download/releases/3.1.2/license/
+
+Some code in core/src/java/org/apache/lucene/util/automaton was
+derived from Brics automaton sources available at
+www.brics.dk/automaton/. Here is the copyright from those sources:
+
+/*
+ * Copyright (c) 2001-2009 Anders Moeller
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ *    derived from this software without specific prior written permission.
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+ 
+The levenshtein automata tables in core/src/java/org/apache/lucene/util/automaton 
+were automatically generated with the moman/finenight FSA package.
+Here is the copyright for those sources:
+
+# Copyright (c) 2010, Jean-Philippe Barrette-LaPierre, <jpb@rrette.com>
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation
+# files (the "Software"), to deal in the Software without
+# restriction, including without limitation the rights to use,
+# copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following
+# conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+# OTHER DEALINGS IN THE SOFTWARE.
+
+Some code in core/src/java/org/apache/lucene/util/UnicodeUtil.java was
+derived from ICU (http://www.icu-project.org)
+The full license is available here: 
+  http://source.icu-project.org/repos/icu/icu/trunk/license.html
+
+/*
+ * Copyright (C) 1999-2010, International Business Machines
+ * Corporation and others.  All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy 
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights 
+ * to use, copy, modify, merge, publish, distribute, and/or sell copies of the 
+ * Software, and to permit persons to whom the Software is furnished to do so, 
+ * provided that the above copyright notice(s) and this permission notice appear 
+ * in all copies of the Software and that both the above copyright notice(s) and
+ * this permission notice appear in supporting documentation.
+ * 
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT OF THIRD PARTY RIGHTS. 
+ * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR HOLDERS INCLUDED IN THIS NOTICE BE 
+ * LIABLE FOR ANY CLAIM, OR ANY SPECIAL INDIRECT OR CONSEQUENTIAL DAMAGES, OR 
+ * ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 
+ * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
+ * OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ *
+ * Except as contained in this notice, the name of a copyright holder shall not 
+ * be used in advertising or otherwise to promote the sale, use or other 
+ * dealings in this Software without prior written authorization of the 
+ * copyright holder.
+ */
+ 
+The following license applies to the Snowball stemmers:
+
+Copyright (c) 2001, Dr Martin Porter
+Copyright (c) 2002, Richard Boulton
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice,
+    * this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above copyright
+    * notice, this list of conditions and the following disclaimer in the
+    * documentation and/or other materials provided with the distribution.
+    * Neither the name of the copyright holders nor the names of its contributors
+    * may be used to endorse or promote products derived from this software
+    * without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+The following license applies to the KStemmer:
+
+Copyright © 2003,
+Center for Intelligent Information Retrieval,
+University of Massachusetts, Amherst.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+this list of conditions and the following disclaimer in the documentation
+and/or other materials provided with the distribution.
+
+3. The names "Center for Intelligent Information Retrieval" and
+"University of Massachusetts" must not be used to endorse or promote products
+derived from this software without prior written permission. To obtain
+permission, contact info@ciir.cs.umass.edu.
+
+THIS SOFTWARE IS PROVIDED BY UNIVERSITY OF MASSACHUSETTS AND OTHER CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
+LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGE.
+
+The following license applies to the Morfologik project:
+
+Copyright (c) 2006 Dawid Weiss
+Copyright (c) 2007-2011 Dawid Weiss, Marcin Miłkowski
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, 
+are permitted provided that the following conditions are met:
+
+    * Redistributions of source code must retain the above copyright notice, 
+    this list of conditions and the following disclaimer.
+    
+    * Redistributions in binary form must reproduce the above copyright notice, 
+    this list of conditions and the following disclaimer in the documentation 
+    and/or other materials provided with the distribution.
+    
+    * Neither the name of Morfologik nor the names of its contributors 
+    may be used to endorse or promote products derived from this software 
+    without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+---
+
+The dictionary comes from Morfologik project. Morfologik uses data from 
+Polish ispell/myspell dictionary hosted at http://www.sjp.pl/slownik/en/ and 
+is licenced on the terms of (inter alia) LGPL and Creative Commons 
+ShareAlike. The part-of-speech tags were added in Morfologik project and
+are not found in the data from sjp.pl. The tagset is similar to IPI PAN
+tagset.
+
+---
+
+The following license applies to the Morfeusz project,
+used by org.apache.lucene.analysis.morfologik.
+
+BSD-licensed dictionary of Polish (SGJP)
+http://sgjp.pl/morfeusz/
+
+Copyright © 2011 Zygmunt Saloni, Włodzimierz Gruszczyński, 
+             Marcin Woliński, Robert Wołosz
+
+All rights reserved.
+
+Redistribution and  use in  source and binary  forms, with  or without
+modification, are permitted provided that the following conditions are
+met:
+
+1. Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+   notice, this list of conditions and the following disclaimer in the
+   documentation and/or other materials provided with the
+   distribution.
+
+THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDERS “AS IS” AND ANY EXPRESS
+OR  IMPLIED WARRANTIES,  INCLUDING, BUT  NOT LIMITED  TO,  THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED.  IN NO EVENT  SHALL COPYRIGHT  HOLDERS OR  CONTRIBUTORS BE
+LIABLE FOR  ANY DIRECT,  INDIRECT, INCIDENTAL, SPECIAL,  EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES  (INCLUDING, BUT NOT LIMITED  TO, PROCUREMENT OF
+SUBSTITUTE  GOODS OR  SERVICES;  LOSS  OF USE,  DATA,  OR PROFITS;  OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED  AND ON ANY THEORY OF LIABILITY,
+WHETHER IN  CONTRACT, STRICT LIABILITY, OR  TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

+ 192 - 0
libs/tdigest/licenses/lucene-core-NOTICE.txt

@@ -0,0 +1,192 @@
+Apache Lucene
+Copyright 2014 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+Includes software from other Apache Software Foundation projects,
+including, but not limited to:
+ - Apache Ant
+ - Apache Jakarta Regexp
+ - Apache Commons
+ - Apache Xerces
+
+ICU4J, (under analysis/icu) is licensed under an MIT styles license
+and Copyright (c) 1995-2008 International Business Machines Corporation and others
+
+Some data files (under analysis/icu/src/data) are derived from Unicode data such
+as the Unicode Character Database. See http://unicode.org/copyright.html for more
+details.
+
+Brics Automaton (under core/src/java/org/apache/lucene/util/automaton) is 
+BSD-licensed, created by Anders Møller. See http://www.brics.dk/automaton/
+
+The levenshtein automata tables (under core/src/java/org/apache/lucene/util/automaton) were
+automatically generated with the moman/finenight FSA library, created by
+Jean-Philippe Barrette-LaPierre. This library is available under an MIT license,
+see http://sites.google.com/site/rrettesite/moman and 
+http://bitbucket.org/jpbarrette/moman/overview/
+
+The class org.apache.lucene.util.WeakIdentityMap was derived from
+the Apache CXF project and is Apache License 2.0.
+
+The Google Code Prettify is Apache License 2.0.
+See http://code.google.com/p/google-code-prettify/
+
+JUnit (junit-4.10) is licensed under the Common Public License v. 1.0
+See http://junit.sourceforge.net/cpl-v10.html
+
+This product includes code (JaspellTernarySearchTrie) from Java Spelling Checkin
+g Package (jaspell): http://jaspell.sourceforge.net/
+License: The BSD License (http://www.opensource.org/licenses/bsd-license.php)
+
+The snowball stemmers in
+  analysis/common/src/java/net/sf/snowball
+were developed by Martin Porter and Richard Boulton.
+The snowball stopword lists in
+  analysis/common/src/resources/org/apache/lucene/analysis/snowball
+were developed by Martin Porter and Richard Boulton.
+The full snowball package is available from
+  http://snowball.tartarus.org/
+
+The KStem stemmer in
+  analysis/common/src/org/apache/lucene/analysis/en
+was developed by Bob Krovetz and Sergio Guzman-Lara (CIIR-UMass Amherst)
+under the BSD-license.
+
+The Arabic,Persian,Romanian,Bulgarian, Hindi and Bengali analyzers (common) come with a default
+stopword list that is BSD-licensed created by Jacques Savoy.  These files reside in:
+analysis/common/src/resources/org/apache/lucene/analysis/ar/stopwords.txt,
+analysis/common/src/resources/org/apache/lucene/analysis/fa/stopwords.txt,
+analysis/common/src/resources/org/apache/lucene/analysis/ro/stopwords.txt,
+analysis/common/src/resources/org/apache/lucene/analysis/bg/stopwords.txt,
+analysis/common/src/resources/org/apache/lucene/analysis/hi/stopwords.txt,
+analysis/common/src/resources/org/apache/lucene/analysis/bn/stopwords.txt
+See http://members.unine.ch/jacques.savoy/clef/index.html.
+
+The German,Spanish,Finnish,French,Hungarian,Italian,Portuguese,Russian and Swedish light stemmers
+(common) are based on BSD-licensed reference implementations created by Jacques Savoy and
+Ljiljana Dolamic. These files reside in:
+analysis/common/src/java/org/apache/lucene/analysis/de/GermanLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/de/GermanMinimalStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/es/SpanishLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/fi/FinnishLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/fr/FrenchMinimalStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/hu/HungarianLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/it/ItalianLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/pt/PortugueseLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/ru/RussianLightStemmer.java
+analysis/common/src/java/org/apache/lucene/analysis/sv/SwedishLightStemmer.java
+
+The Stempel analyzer (stempel) includes BSD-licensed software developed 
+by the Egothor project http://egothor.sf.net/, created by Leo Galambos, Martin Kvapil,
+and Edmond Nolan.
+
+The Polish analyzer (stempel) comes with a default
+stopword list that is BSD-licensed created by the Carrot2 project. The file resides
+in stempel/src/resources/org/apache/lucene/analysis/pl/stopwords.txt.
+See http://project.carrot2.org/license.html.
+
+The SmartChineseAnalyzer source code (smartcn) was
+provided by Xiaoping Gao and copyright 2009 by www.imdict.net.
+
+WordBreakTestUnicode_*.java (under modules/analysis/common/src/test/) 
+is derived from Unicode data such as the Unicode Character Database. 
+See http://unicode.org/copyright.html for more details.
+
+The Morfologik analyzer (morfologik) includes BSD-licensed software
+developed by Dawid Weiss and Marcin Miłkowski (http://morfologik.blogspot.com/).
+
+Morfologik uses data from Polish ispell/myspell dictionary
+(http://www.sjp.pl/slownik/en/) licenced on the terms of (inter alia)
+LGPL and Creative Commons ShareAlike.
+
+Morfologic includes data from BSD-licensed dictionary of Polish (SGJP)
+(http://sgjp.pl/morfeusz/)
+
+Servlet-api.jar and javax.servlet-*.jar are under the CDDL license, the original
+source code for this can be found at http://www.eclipse.org/jetty/downloads.php
+
+===========================================================================
+Kuromoji Japanese Morphological Analyzer - Apache Lucene Integration
+===========================================================================
+
+This software includes a binary and/or source version of data from
+
+  mecab-ipadic-2.7.0-20070801
+
+which can be obtained from
+
+  http://atilika.com/releases/mecab-ipadic/mecab-ipadic-2.7.0-20070801.tar.gz
+
+or
+
+  http://jaist.dl.sourceforge.net/project/mecab/mecab-ipadic/2.7.0-20070801/mecab-ipadic-2.7.0-20070801.tar.gz
+
+===========================================================================
+mecab-ipadic-2.7.0-20070801 Notice
+===========================================================================
+
+Nara Institute of Science and Technology (NAIST),
+the copyright holders, disclaims all warranties with regard to this
+software, including all implied warranties of merchantability and
+fitness, in no event shall NAIST be liable for
+any special, indirect or consequential damages or any damages
+whatsoever resulting from loss of use, data or profits, whether in an
+action of contract, negligence or other tortuous action, arising out
+of or in connection with the use or performance of this software.
+
+A large portion of the dictionary entries
+originate from ICOT Free Software.  The following conditions for ICOT
+Free Software applies to the current dictionary as well.
+
+Each User may also freely distribute the Program, whether in its
+original form or modified, to any third party or parties, PROVIDED
+that the provisions of Section 3 ("NO WARRANTY") will ALWAYS appear
+on, or be attached to, the Program, which is distributed substantially
+in the same form as set out herein and that such intended
+distribution, if actually made, will neither violate or otherwise
+contravene any of the laws and regulations of the countries having
+jurisdiction over the User or the intended distribution itself.
+
+NO WARRANTY
+
+The program was produced on an experimental basis in the course of the
+research and development conducted during the project and is provided
+to users as so produced on an experimental basis.  Accordingly, the
+program is provided without any warranty whatsoever, whether express,
+implied, statutory or otherwise.  The term "warranty" used herein
+includes, but is not limited to, any warranty of the quality,
+performance, merchantability and fitness for a particular purpose of
+the program and the nonexistence of any infringement or violation of
+any right of any third party.
+
+Each user of the program will agree and understand, and be deemed to
+have agreed and understood, that there is no warranty whatsoever for
+the program and, accordingly, the entire risk arising from or
+otherwise connected with the program is assumed by the user.
+
+Therefore, neither ICOT, the copyright holder, or any other
+organization that participated in or was otherwise related to the
+development of the program and their respective officials, directors,
+officers and other employees shall be held liable for any and all
+damages, including, without limitation, general, special, incidental
+and consequential damages, arising out of or otherwise in connection
+with the use or inability to use the program or any product, material
+or result produced or otherwise obtained by using the program,
+regardless of whether they have been advised of, or otherwise had
+knowledge of, the possibility of such damages at any time during the
+project or thereafter.  Each user will be deemed to have agreed to the
+foregoing by his or her commencement of use of the program.  The term
+"use" as used herein includes, but is not limited to, the use,
+modification, copying and distribution of the program and the
+production of secondary products from the program.
+
+In the case where the program, whether in its original form or
+modified, was distributed or delivered to or received by a user from
+any person, organization or entity other than ICOT, unless it makes or
+grants independently of ICOT any specific warranty to the user in
+writing, such person, organization or entity, will also be exempted
+from and not be held liable to the user for any such damages as noted
+above as far as the program is concerned.

+ 1 - 0
libs/tdigest/src/main/java/module-info.java

@@ -19,6 +19,7 @@
 
 module org.elasticsearch.tdigest {
     requires org.elasticsearch.base;
+    requires org.apache.lucene.core;
 
     exports org.elasticsearch.tdigest;
     exports org.elasticsearch.tdigest.arrays;

+ 95 - 38
libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLGroupTree.java

@@ -21,6 +21,8 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
@@ -33,7 +35,12 @@ import java.util.Iterator;
 /**
  * A tree of t-digest centroids.
  */
-final class AVLGroupTree extends AbstractCollection<Centroid> implements Releasable {
+final class AVLGroupTree extends AbstractCollection<Centroid> implements Releasable, Accountable {
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(AVLGroupTree.class);
+
+    private final TDigestArrays arrays;
+    private boolean closed = false;
+
     /* For insertions into the tree */
     private double centroid;
     private long count;
@@ -42,49 +49,95 @@ final class AVLGroupTree extends AbstractCollection<Centroid> implements Releasa
     private final TDigestLongArray aggregatedCounts;
     private final IntAVLTree tree;
 
-    AVLGroupTree(TDigestArrays arrays) {
-        tree = new IntAVLTree(arrays) {
+    static AVLGroupTree create(TDigestArrays arrays) {
+        arrays.adjustBreaker(SHALLOW_SIZE);
+        try {
+            return new AVLGroupTree(arrays);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
 
-            @Override
-            protected void resize(int newCapacity) {
-                super.resize(newCapacity);
-                centroids.resize(newCapacity);
-                counts.resize(newCapacity);
-                aggregatedCounts.resize(newCapacity);
-            }
+    private AVLGroupTree(TDigestArrays arrays) {
+        this.arrays = arrays;
+
+        IntAVLTree tree = null;
+        TDigestDoubleArray centroids = null;
+        TDigestLongArray counts = null;
+        TDigestLongArray aggregatedCounts = null;
+
+        try {
+            this.tree = tree = createIntAvlTree(arrays);
+            this.centroids = centroids = arrays.newDoubleArray(tree.capacity());
+            this.counts = counts = arrays.newLongArray(tree.capacity());
+            this.aggregatedCounts = aggregatedCounts = arrays.newLongArray(tree.capacity());
+
+            tree = null;
+            centroids = null;
+            counts = null;
+            aggregatedCounts = null;
+        } finally {
+            Releasables.close(tree, centroids, counts, aggregatedCounts);
+        }
+    }
 
-            @Override
-            protected void merge(int node) {
-                // two nodes are never considered equal
-                throw new UnsupportedOperationException();
-            }
+    private IntAVLTree createIntAvlTree(TDigestArrays arrays) {
+        arrays.adjustBreaker(IntAVLTree.SHALLOW_SIZE);
+        try {
+            return new InternalIntAvlTree(arrays);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-IntAVLTree.SHALLOW_SIZE);
+            throw e;
+        }
+    }
 
-            @Override
-            protected void copy(int node) {
-                centroids.set(node, centroid);
-                counts.set(node, count);
-            }
+    private class InternalIntAvlTree extends IntAVLTree {
+        private InternalIntAvlTree(TDigestArrays arrays) {
+            super(arrays);
+        }
 
-            @Override
-            protected int compare(int node) {
-                if (centroid < centroids.get(node)) {
-                    return -1;
-                } else {
-                    // upon equality, the newly added node is considered greater
-                    return 1;
-                }
-            }
+        @Override
+        protected void resize(int newCapacity) {
+            super.resize(newCapacity);
+            centroids.resize(newCapacity);
+            counts.resize(newCapacity);
+            aggregatedCounts.resize(newCapacity);
+        }
 
-            @Override
-            protected void fixAggregates(int node) {
-                super.fixAggregates(node);
-                aggregatedCounts.set(node, counts.get(node) + aggregatedCounts.get(left(node)) + aggregatedCounts.get(right(node)));
+        @Override
+        protected void merge(int node) {
+            // two nodes are never considered equal
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        protected void copy(int node) {
+            centroids.set(node, centroid);
+            counts.set(node, count);
+        }
+
+        @Override
+        protected int compare(int node) {
+            if (centroid < centroids.get(node)) {
+                return -1;
+            } else {
+                // upon equality, the newly added node is considered greater
+                return 1;
             }
+        }
+
+        @Override
+        protected void fixAggregates(int node) {
+            super.fixAggregates(node);
+            aggregatedCounts.set(node, counts.get(node) + aggregatedCounts.get(left(node)) + aggregatedCounts.get(right(node)));
+        }
 
-        };
-        centroids = arrays.newDoubleArray(tree.capacity());
-        counts = arrays.newLongArray(tree.capacity());
-        aggregatedCounts = arrays.newLongArray(tree.capacity());
+    }
+
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + centroids.ramBytesUsed() + counts.ramBytesUsed() + aggregatedCounts.ramBytesUsed() + tree.ramBytesUsed();
     }
 
     /**
@@ -271,6 +324,10 @@ final class AVLGroupTree extends AbstractCollection<Centroid> implements Releasa
 
     @Override
     public void close() {
-        Releasables.close(centroids, counts, aggregatedCounts, tree);
+        if (closed == false) {
+            closed = true;
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            Releasables.close(centroids, counts, aggregatedCounts, tree);
+        }
     }
 }

+ 28 - 5
libs/tdigest/src/main/java/org/elasticsearch/tdigest/AVLTreeDigest.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
@@ -32,7 +33,10 @@ import java.util.Random;
 import static org.elasticsearch.tdigest.IntAVLTree.NIL;
 
 public class AVLTreeDigest extends AbstractTDigest {
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(AVLTreeDigest.class);
+
     private final TDigestArrays arrays;
+    private boolean closed = false;
 
     final Random gen = new Random();
     private final double compression;
@@ -43,6 +47,16 @@ public class AVLTreeDigest extends AbstractTDigest {
     // Indicates if a sample has been added after the last compression.
     private boolean needsCompression;
 
+    static AVLTreeDigest create(TDigestArrays arrays, double compression) {
+        arrays.adjustBreaker(SHALLOW_SIZE);
+        try {
+            return new AVLTreeDigest(arrays, compression);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
+
     /**
      * A histogram structure that will record a sketch of a distribution.
      *
@@ -51,15 +65,20 @@ public class AVLTreeDigest extends AbstractTDigest {
      *                    quantiles.  Conversely, you should expect to track about 5 N centroids for this
      *                    accuracy.
      */
-    AVLTreeDigest(TDigestArrays arrays, double compression) {
+    private AVLTreeDigest(TDigestArrays arrays, double compression) {
         this.arrays = arrays;
         this.compression = compression;
-        summary = new AVLGroupTree(arrays);
+        summary = AVLGroupTree.create(arrays);
+    }
+
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + summary.ramBytesUsed();
     }
 
     /**
      * Sets the seed for the RNG.
-     * In cases where a predicatable tree should be created, this function may be used to make the
+     * In cases where a predictable tree should be created, this function may be used to make the
      * randomness in this AVLTree become more deterministic.
      *
      * @param seed The random seed to use for RNG purposes
@@ -155,7 +174,7 @@ public class AVLTreeDigest extends AbstractTDigest {
         needsCompression = false;
 
         try (AVLGroupTree centroids = summary) {
-            this.summary = new AVLGroupTree(arrays);
+            this.summary = AVLGroupTree.create(arrays);
 
             final int[] nodes = new int[centroids.size()];
             nodes[0] = centroids.first();
@@ -361,6 +380,10 @@ public class AVLTreeDigest extends AbstractTDigest {
 
     @Override
     public void close() {
-        Releasables.close(summary);
+        if (closed == false) {
+            closed = true;
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            Releasables.close(summary);
+        }
     }
 }

+ 29 - 5
libs/tdigest/src/main/java/org/elasticsearch/tdigest/HybridDigest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
@@ -34,8 +35,10 @@ import java.util.Collection;
  * bounded memory allocation and acceptable speed and accuracy for larger ones.
  */
 public class HybridDigest extends AbstractTDigest {
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(HybridDigest.class);
 
     private final TDigestArrays arrays;
+    private boolean closed = false;
 
     // See MergingDigest's compression param.
     private final double compression;
@@ -49,6 +52,16 @@ public class HybridDigest extends AbstractTDigest {
     // This gets initialized when the implementation switches to MergingDigest.
     private MergingDigest mergingDigest;
 
+    static HybridDigest create(TDigestArrays arrays, double compression) {
+        arrays.adjustBreaker(SHALLOW_SIZE);
+        try {
+            return new HybridDigest(arrays, compression);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
+
     /**
      * Creates a hybrid digest that uses a {@link SortingDigest} for up to {@param maxSortingSize} samples,
      * then switches to a {@link MergingDigest}.
@@ -56,11 +69,11 @@ public class HybridDigest extends AbstractTDigest {
      * @param compression The compression factor for the MergingDigest
      * @param maxSortingSize The sample size limit for switching from a {@link SortingDigest} to a {@link MergingDigest} implementation
      */
-    HybridDigest(TDigestArrays arrays, double compression, long maxSortingSize) {
+    private HybridDigest(TDigestArrays arrays, double compression, long maxSortingSize) {
         this.arrays = arrays;
         this.compression = compression;
         this.maxSortingSize = maxSortingSize;
-        this.sortingDigest = new SortingDigest(arrays);
+        this.sortingDigest = TDigest.createSortingDigest(arrays);
     }
 
     /**
@@ -69,13 +82,20 @@ public class HybridDigest extends AbstractTDigest {
      *
      * @param compression The compression factor for the MergingDigest
      */
-    HybridDigest(TDigestArrays arrays, double compression) {
+    private HybridDigest(TDigestArrays arrays, double compression) {
         // The default maxSortingSize is calculated so that the SortingDigest will have comparable size with the MergingDigest
         // at the point where implementations switch, e.g. for default compression 100 SortingDigest allocates ~16kB and MergingDigest
         // allocates ~15kB.
         this(arrays, compression, Math.round(compression) * 20);
     }
 
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + (sortingDigest != null ? sortingDigest.ramBytesUsed() : 0) + (mergingDigest != null
+            ? mergingDigest.ramBytesUsed()
+            : 0);
+    }
+
     @Override
     public void add(double x, long w) {
         reserve(w);
@@ -105,7 +125,7 @@ public class HybridDigest extends AbstractTDigest {
         // Check if we need to switch implementations.
         assert sortingDigest != null;
         if (sortingDigest.size() + size >= maxSortingSize) {
-            mergingDigest = new MergingDigest(arrays, compression);
+            mergingDigest = TDigest.createMergingDigest(arrays, compression);
             for (int i = 0; i < sortingDigest.values.size(); i++) {
                 mergingDigest.add(sortingDigest.values.get(i));
             }
@@ -201,6 +221,10 @@ public class HybridDigest extends AbstractTDigest {
 
     @Override
     public void close() {
-        Releasables.close(sortingDigest, mergingDigest);
+        if (closed == false) {
+            closed = true;
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            Releasables.close(sortingDigest, mergingDigest);
+        }
     }
 }

+ 105 - 22
libs/tdigest/src/main/java/org/elasticsearch/tdigest/IntAVLTree.java

@@ -21,21 +21,22 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestByteArray;
 import org.elasticsearch.tdigest.arrays.TDigestIntArray;
 
-import java.util.Arrays;
-
 /**
  * An AVL-tree structure stored in parallel arrays.
  * This class only stores the tree structure, so you need to extend it if you
  * want to add data to the nodes, typically by using arrays and node
  * identifiers as indices.
  */
-abstract class IntAVLTree implements Releasable {
+abstract class IntAVLTree implements Releasable, Accountable {
+    static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IntAVLTree.class);
     /**
      * We use <code>0</code> instead of <code>-1</code> so that left(NIL) works without
      * condition.
@@ -47,6 +48,9 @@ abstract class IntAVLTree implements Releasable {
         return size + (size >>> 3);
     }
 
+    private final TDigestArrays arrays;
+    private boolean closed = false;
+
     private final NodeAllocator nodeAllocator;
     private int root;
     private final TDigestIntArray parent;
@@ -55,18 +59,42 @@ abstract class IntAVLTree implements Releasable {
     private final TDigestByteArray depth;
 
     IntAVLTree(TDigestArrays arrays, int initialCapacity) {
-        nodeAllocator = new NodeAllocator();
+        this.arrays = arrays;
         root = NIL;
-        parent = arrays.newIntArray(initialCapacity);
-        left = arrays.newIntArray(initialCapacity);
-        right = arrays.newIntArray(initialCapacity);
-        depth = arrays.newByteArray(initialCapacity);
+
+        NodeAllocator nodeAllocator = null;
+        TDigestIntArray parent = null;
+        TDigestIntArray left = null;
+        TDigestIntArray right = null;
+        TDigestByteArray depth = null;
+
+        try {
+            this.nodeAllocator = nodeAllocator = NodeAllocator.create(arrays);
+            this.parent = parent = arrays.newIntArray(initialCapacity);
+            this.left = left = arrays.newIntArray(initialCapacity);
+            this.right = right = arrays.newIntArray(initialCapacity);
+            this.depth = depth = arrays.newByteArray(initialCapacity);
+
+            nodeAllocator = null;
+            parent = null;
+            left = null;
+            right = null;
+            depth = null;
+        } finally {
+            Releasables.close(nodeAllocator, parent, left, right, depth);
+        }
     }
 
     IntAVLTree(TDigestArrays arrays) {
         this(arrays, 16);
     }
 
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + nodeAllocator.ramBytesUsed() + parent.ramBytesUsed() + left.ramBytesUsed() + right.ramBytesUsed() + depth
+            .ramBytesUsed();
+    }
+
     /**
      * Return the current root of the tree.
      */
@@ -531,42 +559,85 @@ abstract class IntAVLTree implements Releasable {
     /**
      * A stack of int values.
      */
-    private static class IntStack {
+    private static class IntStack implements Releasable, Accountable {
+        private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(IntStack.class);
+
+        private final TDigestArrays arrays;
+        private boolean closed = false;
 
-        private int[] stack;
+        private final TDigestIntArray stack;
         private int size;
 
-        IntStack() {
-            stack = new int[0];
+        IntStack(TDigestArrays arrays) {
+            this.arrays = arrays;
+            stack = arrays.newIntArray(0);
             size = 0;
         }
 
+        @Override
+        public long ramBytesUsed() {
+            return SHALLOW_SIZE + stack.ramBytesUsed();
+        }
+
         int size() {
             return size;
         }
 
         int pop() {
-            return stack[--size];
+            int value = stack.get(--size);
+            stack.resize(size);
+            return value;
         }
 
         void push(int v) {
-            if (size >= stack.length) {
-                final int newLength = oversize(size + 1);
-                stack = Arrays.copyOf(stack, newLength);
-            }
-            stack[size++] = v;
+            stack.resize(++size);
+            stack.set(size - 1, v);
         }
 
+        @Override
+        public void close() {
+            if (closed == false) {
+                closed = true;
+                arrays.adjustBreaker(-SHALLOW_SIZE);
+                stack.close();
+            }
+        }
     }
 
-    private static class NodeAllocator {
+    private static class NodeAllocator implements Releasable, Accountable {
+        private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(NodeAllocator.class);
+
+        private final TDigestArrays arrays;
+        private boolean closed = false;
 
         private int nextNode;
         private final IntStack releasedNodes;
 
-        NodeAllocator() {
+        static NodeAllocator create(TDigestArrays arrays) {
+            arrays.adjustBreaker(SHALLOW_SIZE);
+            try {
+                return new NodeAllocator(arrays);
+            } catch (Exception e) {
+                arrays.adjustBreaker(-SHALLOW_SIZE);
+                throw e;
+            }
+        }
+
+        private NodeAllocator(TDigestArrays arrays) {
+            this.arrays = arrays;
             nextNode = NIL + 1;
-            releasedNodes = new IntStack();
+            arrays.adjustBreaker(IntStack.SHALLOW_SIZE);
+            try {
+                releasedNodes = new IntStack(arrays);
+            } catch (Exception e) {
+                arrays.adjustBreaker(-IntStack.SHALLOW_SIZE);
+                throw e;
+            }
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return SHALLOW_SIZE + releasedNodes.ramBytesUsed();
         }
 
         int newNode() {
@@ -586,10 +657,22 @@ abstract class IntAVLTree implements Releasable {
             return nextNode - releasedNodes.size() - 1;
         }
 
+        @Override
+        public void close() {
+            if (closed == false) {
+                closed = true;
+                arrays.adjustBreaker(-SHALLOW_SIZE);
+                releasedNodes.close();
+            }
+        }
     }
 
     @Override
     public void close() {
-        Releasables.close(parent, left, right, depth);
+        if (closed == false) {
+            closed = true;
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            Releasables.close(nodeAllocator, parent, left, right, depth);
+        }
     }
 }

+ 59 - 10
libs/tdigest/src/main/java/org/elasticsearch/tdigest/MergingDigest.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
@@ -67,6 +68,11 @@ import java.util.Iterator;
  * what the AVLTreeDigest uses and no dynamic allocation is required at all.
  */
 public class MergingDigest extends AbstractTDigest {
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(MergingDigest.class);
+
+    private final TDigestArrays arrays;
+    private boolean closed = false;
+
     private int mergeCount = 0;
 
     private final double publicCompression;
@@ -107,6 +113,26 @@ public class MergingDigest extends AbstractTDigest {
     // weight limits.
     public static boolean useWeightLimit = true;
 
+    static MergingDigest create(TDigestArrays arrays, double compression) {
+        arrays.adjustBreaker(SHALLOW_SIZE);
+        try {
+            return new MergingDigest(arrays, compression);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
+
+    static MergingDigest create(TDigestArrays arrays, double compression, int bufferSize, int size) {
+        arrays.adjustBreaker(SHALLOW_SIZE);
+        try {
+            return new MergingDigest(arrays, compression, bufferSize, size);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
+
     /**
      * Allocates a buffer merging t-digest.  This is the normally used constructor that
      * allocates default sized internal arrays.  Other versions are available, but should
@@ -114,7 +140,7 @@ public class MergingDigest extends AbstractTDigest {
      *
      * @param compression The compression factor
      */
-    public MergingDigest(TDigestArrays arrays, double compression) {
+    private MergingDigest(TDigestArrays arrays, double compression) {
         this(arrays, compression, -1);
     }
 
@@ -124,7 +150,7 @@ public class MergingDigest extends AbstractTDigest {
      * @param compression Compression factor for t-digest.  Same as 1/\delta in the paper.
      * @param bufferSize  How many samples to retain before merging.
      */
-    public MergingDigest(TDigestArrays arrays, double compression, int bufferSize) {
+    private MergingDigest(TDigestArrays arrays, double compression, int bufferSize) {
         // we can guarantee that we only need ceiling(compression).
         this(arrays, compression, bufferSize, -1);
     }
@@ -136,7 +162,9 @@ public class MergingDigest extends AbstractTDigest {
      * @param bufferSize  Number of temporary centroids
      * @param size        Size of main buffer
      */
-    public MergingDigest(TDigestArrays arrays, double compression, int bufferSize, int size) {
+    private MergingDigest(TDigestArrays arrays, double compression, int bufferSize, int size) {
+        this.arrays = arrays;
+
         // ensure compression >= 10
         // default size = 2 * ceil(compression)
         // default bufferSize = 5 * size
@@ -210,16 +238,33 @@ public class MergingDigest extends AbstractTDigest {
             bufferSize = 2 * size;
         }
 
-        weight = arrays.newDoubleArray(size);
-        mean = arrays.newDoubleArray(size);
-
-        tempWeight = arrays.newDoubleArray(bufferSize);
-        tempMean = arrays.newDoubleArray(bufferSize);
-        order = arrays.newIntArray(bufferSize);
+        TDigestDoubleArray weight = null;
+        TDigestDoubleArray mean = null;
+        TDigestDoubleArray tempWeight = null;
+        TDigestDoubleArray tempMean = null;
+        TDigestIntArray order = null;
+
+        try {
+            this.weight = weight = arrays.newDoubleArray(size);
+            this.mean = mean = arrays.newDoubleArray(size);
+
+            this.tempWeight = tempWeight = arrays.newDoubleArray(bufferSize);
+            this.tempMean = tempMean = arrays.newDoubleArray(bufferSize);
+            this.order = order = arrays.newIntArray(bufferSize);
+        } catch (Exception e) {
+            Releasables.close(weight, mean, tempWeight, tempMean, order);
+            throw e;
+        }
 
         lastUsedCell = 0;
     }
 
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + weight.ramBytesUsed() + mean.ramBytesUsed() + tempWeight.ramBytesUsed() + tempMean.ramBytesUsed() + order
+            .ramBytesUsed();
+    }
+
     @Override
     public void add(double x, long w) {
         checkValue(x);
@@ -578,6 +623,10 @@ public class MergingDigest extends AbstractTDigest {
 
     @Override
     public void close() {
-        Releasables.close(weight, mean, tempWeight, tempMean, order);
+        if (closed == false) {
+            closed = true;
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            Releasables.close(weight, mean, tempWeight, tempMean, order);
+        }
     }
 }

+ 28 - 2
libs/tdigest/src/main/java/org/elasticsearch/tdigest/SortingDigest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 import org.elasticsearch.tdigest.arrays.TDigestDoubleArray;
@@ -33,16 +34,37 @@ import java.util.Iterator;
  * samples, at the expense of allocating much more memory.
  */
 public class SortingDigest extends AbstractTDigest {
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(SortingDigest.class);
+
+    private final TDigestArrays arrays;
+    private boolean closed = false;
+
     // Tracks all samples. Gets sorted on quantile and cdf calls.
     final TDigestDoubleArray values;
 
     // Indicates if all values have been sorted.
     private boolean isSorted = true;
 
-    public SortingDigest(TDigestArrays arrays) {
+    static SortingDigest create(TDigestArrays arrays) {
+        arrays.adjustBreaker(SHALLOW_SIZE);
+        try {
+            return new SortingDigest(arrays);
+        } catch (Exception e) {
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
+
+    private SortingDigest(TDigestArrays arrays) {
+        this.arrays = arrays;
         values = arrays.newDoubleArray(0);
     }
 
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + values.ramBytesUsed();
+    }
+
     @Override
     public void add(double x, long w) {
         checkValue(x);
@@ -141,6 +163,10 @@ public class SortingDigest extends AbstractTDigest {
 
     @Override
     public void close() {
-        Releasables.close(values);
+        if (closed == false) {
+            closed = true;
+            arrays.adjustBreaker(-SHALLOW_SIZE);
+            Releasables.close(values);
+        }
     }
 }

+ 10 - 9
libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.apache.lucene.util.Accountable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.tdigest.arrays.TDigestArrays;
 
@@ -38,7 +39,7 @@ import java.util.Locale;
  * - test coverage roughly at 90%
  * - easy to adapt for use with map-reduce
  */
-public abstract class TDigest implements Releasable {
+public abstract class TDigest implements Releasable, Accountable {
     protected ScaleFunction scale = ScaleFunction.K_2;
     double min = Double.POSITIVE_INFINITY;
     double max = Double.NEGATIVE_INFINITY;
@@ -51,8 +52,8 @@ public abstract class TDigest implements Releasable {
      *                    The number of centroids retained will be a smallish (usually less than 10) multiple of this number.
      * @return the MergingDigest
      */
-    public static TDigest createMergingDigest(TDigestArrays arrays, double compression) {
-        return new MergingDigest(arrays, compression);
+    public static MergingDigest createMergingDigest(TDigestArrays arrays, double compression) {
+        return MergingDigest.create(arrays, compression);
     }
 
     /**
@@ -64,8 +65,8 @@ public abstract class TDigest implements Releasable {
      *                    The number of centroids retained will be a smallish (usually less than 10) multiple of this number.
      * @return the AvlTreeDigest
      */
-    public static TDigest createAvlTreeDigest(TDigestArrays arrays, double compression) {
-        return new AVLTreeDigest(arrays, compression);
+    public static AVLTreeDigest createAvlTreeDigest(TDigestArrays arrays, double compression) {
+        return AVLTreeDigest.create(arrays, compression);
     }
 
     /**
@@ -74,8 +75,8 @@ public abstract class TDigest implements Releasable {
      *
      * @return the SortingDigest
      */
-    public static TDigest createSortingDigest(TDigestArrays arrays) {
-        return new SortingDigest(arrays);
+    public static SortingDigest createSortingDigest(TDigestArrays arrays) {
+        return SortingDigest.create(arrays);
     }
 
     /**
@@ -87,8 +88,8 @@ public abstract class TDigest implements Releasable {
      *                    The number of centroids retained will be a smallish (usually less than 10) multiple of this number.
      * @return the HybridDigest
      */
-    public static TDigest createHybridDigest(TDigestArrays arrays, double compression) {
-        return new HybridDigest(arrays, compression);
+    public static HybridDigest createHybridDigest(TDigestArrays arrays, double compression) {
+        return HybridDigest.create(arrays, compression);
     }
 
     /**

+ 2 - 0
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestArrays.java

@@ -25,6 +25,8 @@ package org.elasticsearch.tdigest.arrays;
  * Minimal interface for BigArrays-like classes used within TDigest.
  */
 public interface TDigestArrays {
+    void adjustBreaker(long size);
+
     TDigestDoubleArray newDoubleArray(int initialSize);
 
     TDigestIntArray newIntArray(int initialSize);

+ 2 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestByteArray.java

@@ -21,12 +21,13 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.apache.lucene.util.Accountable;
 import org.elasticsearch.core.Releasable;
 
 /**
  * Minimal interface for ByteArray-like classes used within TDigest.
  */
-public interface TDigestByteArray extends Releasable {
+public interface TDigestByteArray extends Releasable, Accountable {
     int size();
 
     byte get(int index);

+ 2 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestDoubleArray.java

@@ -21,12 +21,13 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.apache.lucene.util.Accountable;
 import org.elasticsearch.core.Releasable;
 
 /**
  * Minimal interface for DoubleArray-like classes used within TDigest.
  */
-public interface TDigestDoubleArray extends Releasable {
+public interface TDigestDoubleArray extends Releasable, Accountable {
     int size();
 
     double get(int index);

+ 2 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestIntArray.java

@@ -21,12 +21,13 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.apache.lucene.util.Accountable;
 import org.elasticsearch.core.Releasable;
 
 /**
  * Minimal interface for IntArray-like classes used within TDigest.
  */
-public interface TDigestIntArray extends Releasable {
+public interface TDigestIntArray extends Releasable, Accountable {
     int size();
 
     int get(int index);

+ 2 - 1
libs/tdigest/src/main/java/org/elasticsearch/tdigest/arrays/TDigestLongArray.java

@@ -21,12 +21,13 @@
 
 package org.elasticsearch.tdigest.arrays;
 
+import org.apache.lucene.util.Accountable;
 import org.elasticsearch.core.Releasable;
 
 /**
  * Minimal interface for LongArray-like classes used within TDigest.
  */
-public interface TDigestLongArray extends Releasable {
+public interface TDigestLongArray extends Releasable, Accountable {
     int size();
 
     long get(int index);

+ 58 - 54
libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLGroupTreeTests.java

@@ -24,81 +24,85 @@ package org.elasticsearch.tdigest;
 public class AVLGroupTreeTests extends TDigestTestCase {
 
     public void testSimpleAdds() {
-        AVLGroupTree x = new AVLGroupTree(arrays());
-        assertEquals(IntAVLTree.NIL, x.floor(34));
-        assertEquals(IntAVLTree.NIL, x.first());
-        assertEquals(IntAVLTree.NIL, x.last());
-        assertEquals(0, x.size());
-        assertEquals(0, x.sum());
+        try (AVLGroupTree x = AVLGroupTree.create(arrays())) {
+            assertEquals(IntAVLTree.NIL, x.floor(34));
+            assertEquals(IntAVLTree.NIL, x.first());
+            assertEquals(IntAVLTree.NIL, x.last());
+            assertEquals(0, x.size());
+            assertEquals(0, x.sum());
 
-        x.add(new Centroid(1));
-        assertEquals(1, x.sum());
-        Centroid centroid = new Centroid(2);
-        centroid.add(3, 1);
-        centroid.add(4, 1);
-        x.add(centroid);
+            x.add(new Centroid(1));
+            assertEquals(1, x.sum());
+            Centroid centroid = new Centroid(2);
+            centroid.add(3, 1);
+            centroid.add(4, 1);
+            x.add(centroid);
 
-        assertEquals(2, x.size());
-        assertEquals(4, x.sum());
+            assertEquals(2, x.size());
+            assertEquals(4, x.sum());
+        }
     }
 
     public void testBalancing() {
-        AVLGroupTree x = new AVLGroupTree(arrays());
-        for (int i = 0; i < 101; i++) {
-            x.add(new Centroid(i));
-        }
+        try (AVLGroupTree x = AVLGroupTree.create(arrays())) {
+            for (int i = 0; i < 101; i++) {
+                x.add(new Centroid(i));
+            }
 
-        assertEquals(101, x.size());
-        assertEquals(101, x.sum());
+            assertEquals(101, x.size());
+            assertEquals(101, x.sum());
 
-        x.checkBalance();
-        x.checkAggregates();
+            x.checkBalance();
+            x.checkAggregates();
+        }
     }
 
     public void testFloor() {
         // mostly tested in other tests
-        AVLGroupTree x = new AVLGroupTree(arrays());
-        for (int i = 0; i < 101; i++) {
-            x.add(new Centroid(i / 2));
-        }
+        try (AVLGroupTree x = AVLGroupTree.create(arrays())) {
+            for (int i = 0; i < 101; i++) {
+                x.add(new Centroid(i / 2));
+            }
 
-        assertEquals(IntAVLTree.NIL, x.floor(-30));
+            assertEquals(IntAVLTree.NIL, x.floor(-30));
 
-        for (Centroid centroid : x) {
-            assertEquals(centroid.mean(), x.mean(x.floor(centroid.mean() + 0.1)), 0);
+            for (Centroid centroid : x) {
+                assertEquals(centroid.mean(), x.mean(x.floor(centroid.mean() + 0.1)), 0);
+            }
         }
     }
 
     public void testHeadSum() {
-        AVLGroupTree x = new AVLGroupTree(arrays());
-        for (int i = 0; i < 1000; ++i) {
-            x.add(randomDouble(), randomIntBetween(1, 10));
+        try (AVLGroupTree x = AVLGroupTree.create(arrays())) {
+            for (int i = 0; i < 1000; ++i) {
+                x.add(randomDouble(), randomIntBetween(1, 10));
+            }
+            long sum = 0;
+            long last = -1;
+            for (int node = x.first(); node != IntAVLTree.NIL; node = x.next(node)) {
+                assertEquals(sum, x.headSum(node));
+                sum += x.count(node);
+                last = x.count(node);
+            }
+            assertEquals(last, x.count(x.last()));
         }
-        long sum = 0;
-        long last = -1;
-        for (int node = x.first(); node != IntAVLTree.NIL; node = x.next(node)) {
-            assertEquals(sum, x.headSum(node));
-            sum += x.count(node);
-            last = x.count(node);
-        }
-        assertEquals(last, x.count(x.last()));
     }
 
     public void testFloorSum() {
-        AVLGroupTree x = new AVLGroupTree(arrays());
-        int total = 0;
-        for (int i = 0; i < 1000; ++i) {
-            int count = randomIntBetween(1, 10);
-            x.add(randomDouble(), count);
-            total += count;
-        }
-        assertEquals(IntAVLTree.NIL, x.floorSum(-1));
-        for (long i = 0; i < total + 10; ++i) {
-            final int floorNode = x.floorSum(i);
-            assertTrue(x.headSum(floorNode) <= i);
-            final int next = x.next(floorNode);
-            assertTrue(next == IntAVLTree.NIL || x.headSum(next) > i);
+        try (AVLGroupTree x = AVLGroupTree.create(arrays())) {
+            int total = 0;
+            for (int i = 0; i < 1000; ++i) {
+                int count = randomIntBetween(1, 10);
+                x.add(randomDouble(), count);
+                total += count;
+            }
+            assertEquals(IntAVLTree.NIL, x.floorSum(-1));
+            for (long i = 0; i < total + 10; ++i) {
+                final int floorNode = x.floorSum(i);
+                assertTrue(x.headSum(floorNode) <= i);
+                final int next = x.next(floorNode);
+                assertTrue(next == IntAVLTree.NIL || x.headSum(next) > i);
+            }
         }
     }
-
 }

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/AVLTreeDigestTests.java

@@ -25,7 +25,7 @@ public class AVLTreeDigestTests extends TDigestTests {
 
     protected DigestFactory factory(final double compression) {
         return () -> {
-            AVLTreeDigest digest = new AVLTreeDigest(arrays(), compression);
+            AVLTreeDigest digest = AVLTreeDigest.create(arrays(), compression);
             digest.setRandomSeed(randomLong());
             return digest;
         };

+ 42 - 39
libs/tdigest/src/test/java/org/elasticsearch/tdigest/AlternativeMergeTests.java

@@ -34,48 +34,51 @@ public class AlternativeMergeTests extends TDigestTestCase {
     public void testMerges() {
         for (int n : new int[] { 100, 1000, 10000, 100000 }) {
             for (double compression : new double[] { 50, 100, 200, 400 }) {
-                MergingDigest mergingDigest = new MergingDigest(arrays(), compression);
-                AVLTreeDigest treeDigest = new AVLTreeDigest(arrays(), compression);
-                List<Double> data = new ArrayList<>();
-                Random gen = random();
-                for (int i = 0; i < n; i++) {
-                    double x = gen.nextDouble();
-                    data.add(x);
-                    mergingDigest.add(x);
-                    treeDigest.add(x);
-                }
-                Collections.sort(data);
-                List<Double> counts = new ArrayList<>();
-                double soFar = 0;
-                double current = 0;
-                for (Double x : data) {
-                    double q = (soFar + (current + 1.0) / 2) / n;
-                    if (current == 0 || current + 1 < n * Math.PI / compression * Math.sqrt(q * (1 - q))) {
-                        current += 1;
-                    } else {
+                try (
+                    MergingDigest mergingDigest = TDigest.createMergingDigest(arrays(), compression);
+                    AVLTreeDigest treeDigest = TDigest.createAvlTreeDigest(arrays(), compression);
+                ) {
+                    List<Double> data = new ArrayList<>();
+                    Random gen = random();
+                    for (int i = 0; i < n; i++) {
+                        double x = gen.nextDouble();
+                        data.add(x);
+                        mergingDigest.add(x);
+                        treeDigest.add(x);
+                    }
+                    Collections.sort(data);
+                    List<Double> counts = new ArrayList<>();
+                    double soFar = 0;
+                    double current = 0;
+                    for (Double x : data) {
+                        double q = (soFar + (current + 1.0) / 2) / n;
+                        if (current == 0 || current + 1 < n * Math.PI / compression * Math.sqrt(q * (1 - q))) {
+                            current += 1;
+                        } else {
+                            counts.add(current);
+                            soFar += current;
+                            current = 1;
+                        }
+                    }
+                    if (current > 0) {
                         counts.add(current);
-                        soFar += current;
-                        current = 1;
                     }
+                    soFar = 0;
+                    for (Double count : counts) {
+                        soFar += count;
+                    }
+                    assertEquals(n, soFar, 0);
+                    soFar = 0;
+                    for (Centroid c : mergingDigest.centroids()) {
+                        soFar += c.count();
+                    }
+                    assertEquals(n, soFar, 0);
+                    soFar = 0;
+                    for (Centroid c : treeDigest.centroids()) {
+                        soFar += c.count();
+                    }
+                    assertEquals(n, soFar, 0);
                 }
-                if (current > 0) {
-                    counts.add(current);
-                }
-                soFar = 0;
-                for (Double count : counts) {
-                    soFar += count;
-                }
-                assertEquals(n, soFar, 0);
-                soFar = 0;
-                for (Centroid c : mergingDigest.centroids()) {
-                    soFar += c.count();
-                }
-                assertEquals(n, soFar, 0);
-                soFar = 0;
-                for (Centroid c : treeDigest.centroids()) {
-                    soFar += c.count();
-                }
-                assertEquals(n, soFar, 0);
             }
         }
     }

+ 8 - 5
libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTests.java

@@ -24,11 +24,14 @@ package org.elasticsearch.tdigest;
 public abstract class BigCountTests extends TDigestTestCase {
 
     public void testBigMerge() {
-        TDigest digest = createDigest();
-        for (int i = 0; i < 5; i++) {
-            digest.add(getDigest());
-            double actual = digest.quantile(0.5);
-            assertEquals("Count = " + digest.size(), 3000, actual, 0.001);
+        try (TDigest digest = createDigest()) {
+            for (int i = 0; i < 5; i++) {
+                try (TDigest digestToMerge = getDigest()) {
+                    digest.add(digestToMerge);
+                }
+                double actual = digest.quantile(0.5);
+                assertEquals("Count = " + digest.size(), 3000, actual, 0.001);
+            }
         }
     }
 

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsMergingDigestTests.java

@@ -24,6 +24,6 @@ package org.elasticsearch.tdigest;
 public class BigCountTestsMergingDigestTests extends BigCountTests {
     @Override
     public TDigest createDigest() {
-        return new MergingDigest(arrays(), 100);
+        return TDigest.createMergingDigest(arrays(), 100);
     }
 }

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/BigCountTestsTreeDigestTests.java

@@ -24,6 +24,6 @@ package org.elasticsearch.tdigest;
 public class BigCountTestsTreeDigestTests extends BigCountTests {
     @Override
     public TDigest createDigest() {
-        return new AVLTreeDigest(arrays(), 100);
+        return TDigest.createAvlTreeDigest(arrays(), 100);
     }
 }

+ 14 - 0
libs/tdigest/src/test/java/org/elasticsearch/tdigest/ComparisonTests.java

@@ -21,6 +21,8 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasables;
+
 import java.util.Arrays;
 import java.util.function.Supplier;
 
@@ -53,6 +55,10 @@ public class ComparisonTests extends TDigestTestCase {
         Arrays.sort(samples);
     }
 
+    private void releaseData() {
+        Releasables.close(avlTreeDigest, mergingDigest, sortingDigest, hybridDigest);
+    }
+
     public void testRandomDenseDistribution() {
         loadData(() -> random().nextDouble());
 
@@ -65,6 +71,8 @@ public class ComparisonTests extends TDigestTestCase {
             assertEquals(String.valueOf(percentile), expected, mergingDigest.quantile(q), accuracy);
             assertEquals(String.valueOf(percentile), expected, hybridDigest.quantile(q), accuracy);
         }
+
+        releaseData();
     }
 
     public void testRandomSparseDistribution() {
@@ -79,6 +87,8 @@ public class ComparisonTests extends TDigestTestCase {
             assertEquals(String.valueOf(percentile), expected, mergingDigest.quantile(q), accuracy);
             assertEquals(String.valueOf(percentile), expected, hybridDigest.quantile(q), accuracy);
         }
+
+        releaseData();
     }
 
     public void testDenseGaussianDistribution() {
@@ -99,6 +109,8 @@ public class ComparisonTests extends TDigestTestCase {
         assertEquals(expectedMedian, avlTreeDigest.quantile(0.5), 0.01);
         assertEquals(expectedMedian, mergingDigest.quantile(0.5), 0.01);
         assertEquals(expectedMedian, hybridDigest.quantile(0.5), 0.01);
+
+        releaseData();
     }
 
     public void testSparseGaussianDistribution() {
@@ -120,5 +132,7 @@ public class ComparisonTests extends TDigestTestCase {
         assertEquals(expectedMedian, avlTreeDigest.quantile(0.5), 5000);
         assertEquals(expectedMedian, mergingDigest.quantile(0.5), 5000);
         assertEquals(expectedMedian, hybridDigest.quantile(0.5), 5000);
+
+        releaseData();
     }
 }

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/HybridDigestTests.java

@@ -24,6 +24,6 @@ package org.elasticsearch.tdigest;
 public class HybridDigestTests extends TDigestTests {
 
     protected DigestFactory factory(final double compression) {
-        return () -> new HybridDigest(arrays(), compression);
+        return () -> HybridDigest.create(arrays(), compression);
     }
 }

+ 40 - 37
libs/tdigest/src/test/java/org/elasticsearch/tdigest/IntAVLTreeTests.java

@@ -39,6 +39,8 @@ public class IntAVLTreeTests extends TDigestTestCase {
 
         IntegerBag(TDigestArrays arrays) {
             super(arrays);
+            // We adjust the breaker after creation as this is just a test class
+            arrays.adjustBreaker(IntAVLTree.SHALLOW_SIZE);
             values = new int[capacity()];
             counts = new int[capacity()];
         }
@@ -88,53 +90,54 @@ public class IntAVLTreeTests extends TDigestTestCase {
     public void testDualAdd() {
         Random r = random();
         TreeMap<Integer, Integer> map = new TreeMap<>();
-        IntegerBag bag = new IntegerBag(arrays());
-        for (int i = 0; i < 100000; ++i) {
-            final int v = r.nextInt(100000);
-            if (map.containsKey(v)) {
-                map.put(v, map.get(v) + 1);
-                assertFalse(bag.addValue(v));
-            } else {
-                map.put(v, 1);
-                assertTrue(bag.addValue(v));
+        try (IntegerBag bag = new IntegerBag(arrays())) {
+            for (int i = 0; i < 100000; ++i) {
+                final int v = r.nextInt(100000);
+                if (map.containsKey(v)) {
+                    map.put(v, map.get(v) + 1);
+                    assertFalse(bag.addValue(v));
+                } else {
+                    map.put(v, 1);
+                    assertTrue(bag.addValue(v));
+                }
             }
+            Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator();
+            for (int node = bag.first(bag.root()); node != IntAVLTree.NIL; node = bag.next(node)) {
+                final Map.Entry<Integer, Integer> next = it.next();
+                assertEquals(next.getKey().intValue(), bag.values[node]);
+                assertEquals(next.getValue().intValue(), bag.counts[node]);
+            }
+            assertFalse(it.hasNext());
         }
-        Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator();
-        for (int node = bag.first(bag.root()); node != IntAVLTree.NIL; node = bag.next(node)) {
-            final Map.Entry<Integer, Integer> next = it.next();
-            assertEquals(next.getKey().intValue(), bag.values[node]);
-            assertEquals(next.getValue().intValue(), bag.counts[node]);
-        }
-        assertFalse(it.hasNext());
     }
 
     public void testDualAddRemove() {
         Random r = random();
         TreeMap<Integer, Integer> map = new TreeMap<>();
-        IntegerBag bag = new IntegerBag(arrays());
-        for (int i = 0; i < 100000; ++i) {
-            final int v = r.nextInt(1000);
-            if (r.nextBoolean()) {
-                // add
-                if (map.containsKey(v)) {
-                    map.put(v, map.get(v) + 1);
-                    assertFalse(bag.addValue(v));
+        try (IntegerBag bag = new IntegerBag(arrays())) {
+            for (int i = 0; i < 100000; ++i) {
+                final int v = r.nextInt(1000);
+                if (r.nextBoolean()) {
+                    // add
+                    if (map.containsKey(v)) {
+                        map.put(v, map.get(v) + 1);
+                        assertFalse(bag.addValue(v));
+                    } else {
+                        map.put(v, 1);
+                        assertTrue(bag.addValue(v));
+                    }
                 } else {
-                    map.put(v, 1);
-                    assertTrue(bag.addValue(v));
+                    // remove
+                    assertEquals(map.remove(v) != null, bag.removeValue(v));
                 }
-            } else {
-                // remove
-                assertEquals(map.remove(v) != null, bag.removeValue(v));
             }
+            Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator();
+            for (int node = bag.first(bag.root()); node != IntAVLTree.NIL; node = bag.next(node)) {
+                final Map.Entry<Integer, Integer> next = it.next();
+                assertEquals(next.getKey().intValue(), bag.values[node]);
+                assertEquals(next.getValue().intValue(), bag.counts[node]);
+            }
+            assertFalse(it.hasNext());
         }
-        Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator();
-        for (int node = bag.first(bag.root()); node != IntAVLTree.NIL; node = bag.next(node)) {
-            final Map.Entry<Integer, Integer> next = it.next();
-            assertEquals(next.getKey().intValue(), bag.values[node]);
-            assertEquals(next.getValue().intValue(), bag.counts[node]);
-        }
-        assertFalse(it.hasNext());
     }
-
 }

+ 28 - 24
libs/tdigest/src/test/java/org/elasticsearch/tdigest/MedianTests.java

@@ -25,46 +25,50 @@ public class MedianTests extends TDigestTestCase {
 
     public void testAVL() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new AVLTreeDigest(arrays(), 100);
-        for (double value : data) {
-            digest.add(value);
-        }
+        try (TDigest digest = TDigest.createAvlTreeDigest(arrays(), 100)) {
+            for (double value : data) {
+                digest.add(value);
+            }
 
-        assertEquals(37.5, digest.quantile(0.5), 0);
-        assertEquals(0.5, digest.cdf(37.5), 0);
+            assertEquals(37.5, digest.quantile(0.5), 0);
+            assertEquals(0.5, digest.cdf(37.5), 0);
+        }
     }
 
     public void testMergingDigest() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new MergingDigest(arrays(), 100);
-        for (double value : data) {
-            digest.add(value);
-        }
+        try (TDigest digest = TDigest.createMergingDigest(arrays(), 100)) {
+            for (double value : data) {
+                digest.add(value);
+            }
 
-        assertEquals(37.5, digest.quantile(0.5), 0);
-        assertEquals(0.5, digest.cdf(37.5), 0);
+            assertEquals(37.5, digest.quantile(0.5), 0);
+            assertEquals(0.5, digest.cdf(37.5), 0);
+        }
     }
 
     public void testSortingDigest() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new SortingDigest(arrays());
-        for (double value : data) {
-            digest.add(value);
-        }
+        try (TDigest digest = TDigest.createSortingDigest(arrays())) {
+            for (double value : data) {
+                digest.add(value);
+            }
 
-        assertEquals(37.5, digest.quantile(0.5), 0);
-        assertEquals(0.5, digest.cdf(37.5), 0);
+            assertEquals(37.5, digest.quantile(0.5), 0);
+            assertEquals(0.5, digest.cdf(37.5), 0);
+        }
     }
 
     public void testHybridDigest() {
         double[] data = new double[] { 7, 15, 36, 39, 40, 41 };
-        TDigest digest = new HybridDigest(arrays(), 100);
-        for (double value : data) {
-            digest.add(value);
-        }
+        try (TDigest digest = TDigest.createHybridDigest(arrays(), 100)) {
+            for (double value : data) {
+                digest.add(value);
+            }
 
-        assertEquals(37.5, digest.quantile(0.5), 0);
-        assertEquals(0.5, digest.cdf(37.5), 0);
+            assertEquals(37.5, digest.quantile(0.5), 0);
+            assertEquals(0.5, digest.cdf(37.5), 0);
+        }
     }
 
     public void testReferenceWikipedia() {

+ 95 - 89
libs/tdigest/src/test/java/org/elasticsearch/tdigest/MergingDigestTests.java

@@ -21,6 +21,7 @@
 
 package org.elasticsearch.tdigest;
 
+import org.elasticsearch.core.Releasables;
 import org.junit.Assert;
 
 import java.util.ArrayList;
@@ -33,46 +34,47 @@ import java.util.Random;
 public class MergingDigestTests extends TDigestTests {
 
     protected DigestFactory factory(final double compression) {
-
-        return () -> new MergingDigest(arrays(), compression);
+        return () -> MergingDigest.create(arrays(), compression);
     }
 
     public void testNanDueToBadInitialization() {
         int compression = 100;
         int factor = 5;
-        MergingDigest md = new MergingDigest(arrays(), compression, (factor + 1) * compression, compression);
+        try (MergingDigest md = MergingDigest.create(arrays(), compression, (factor + 1) * compression, compression)) {
 
-        final int M = 10;
-        List<MergingDigest> mds = new ArrayList<>();
-        for (int i = 0; i < M; ++i) {
-            mds.add(new MergingDigest(arrays(), compression, (factor + 1) * compression, compression));
-        }
+            final int M = 10;
+            List<MergingDigest> mds = new ArrayList<>();
+            for (int i = 0; i < M; ++i) {
+                mds.add(MergingDigest.create(arrays(), compression, (factor + 1) * compression, compression));
+            }
 
-        // Fill all digests with values (0,10,20,...,80).
-        List<Double> raw = new ArrayList<>();
-        for (int i = 0; i < 9; ++i) {
-            double x = 10 * i;
-            md.add(x);
-            raw.add(x);
-            for (int j = 0; j < M; ++j) {
-                mds.get(j).add(x);
+            // Fill all digests with values (0,10,20,...,80).
+            List<Double> raw = new ArrayList<>();
+            for (int i = 0; i < 9; ++i) {
+                double x = 10 * i;
+                md.add(x);
                 raw.add(x);
+                for (int j = 0; j < M; ++j) {
+                    mds.get(j).add(x);
+                    raw.add(x);
+                }
             }
-        }
-        Collections.sort(raw);
+            Collections.sort(raw);
 
-        // Merge all mds one at a time into md.
-        for (int i = 0; i < M; ++i) {
-            md.add(mds.get(i));
-        }
-        Assert.assertFalse(Double.isNaN(md.quantile(0.01)));
-
-        for (double q : new double[] { 0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99 }) {
-            double est = md.quantile(q);
-            double actual = Dist.quantile(q, raw);
-            double qx = md.cdf(actual);
-            Assert.assertEquals(q, qx, 0.5);
-            Assert.assertEquals(est, actual, 3.8);
+            // Merge all mds one at a time into md.
+            for (int i = 0; i < M; ++i) {
+                md.add(mds.get(i));
+            }
+            Assert.assertFalse(Double.isNaN(md.quantile(0.01)));
+
+            for (double q : new double[] { 0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99 }) {
+                double est = md.quantile(q);
+                double actual = Dist.quantile(q, raw);
+                double qx = md.cdf(actual);
+                Assert.assertEquals(q, qx, 0.5);
+                Assert.assertEquals(est, actual, 3.8);
+            }
+            Releasables.close(mds);
         }
     }
 
@@ -80,86 +82,90 @@ public class MergingDigestTests extends TDigestTests {
      * Verifies interpolation between a singleton and a larger centroid.
      */
     public void testSingleMultiRange() {
-        TDigest digest = factory(100).create();
-        digest.setScaleFunction(ScaleFunction.K_0);
-        for (int i = 0; i < 100; i++) {
-            digest.add(1);
-            digest.add(2);
-            digest.add(3);
+        try (TDigest digest = factory(100).create()) {
+            digest.setScaleFunction(ScaleFunction.K_0);
+            for (int i = 0; i < 100; i++) {
+                digest.add(1);
+                digest.add(2);
+                digest.add(3);
+            }
+            // this check is, of course true, but it also forces merging before we change scale
+            assertTrue(digest.centroidCount() < 300);
+            digest.add(0);
+            // we now have a digest with a singleton first, then a heavier centroid next
+            Iterator<Centroid> ix = digest.centroids().iterator();
+            Centroid first = ix.next();
+            Centroid second = ix.next();
+            assertEquals(1, first.count());
+            assertEquals(0, first.mean(), 0);
+            // assertTrue(second.count() > 1);
+            assertEquals(1.0, second.mean(), 0);
+
+            assertEquals(0.00166, digest.cdf(0), 1e-5);
+            assertEquals(0.00166, digest.cdf(1e-10), 1e-5);
+            assertEquals(0.0025, digest.cdf(0.25), 1e-5);
         }
-        // this check is, of course true, but it also forces merging before we change scale
-        assertTrue(digest.centroidCount() < 300);
-        digest.add(0);
-        // we now have a digest with a singleton first, then a heavier centroid next
-        Iterator<Centroid> ix = digest.centroids().iterator();
-        Centroid first = ix.next();
-        Centroid second = ix.next();
-        assertEquals(1, first.count());
-        assertEquals(0, first.mean(), 0);
-        // assertTrue(second.count() > 1);
-        assertEquals(1.0, second.mean(), 0);
-
-        assertEquals(0.00166, digest.cdf(0), 1e-5);
-        assertEquals(0.00166, digest.cdf(1e-10), 1e-5);
-        assertEquals(0.0025, digest.cdf(0.25), 1e-5);
     }
 
     /**
      * Make sure that the first and last centroids have unit weight
      */
     public void testSingletonsAtEnds() {
-        TDigest d = new MergingDigest(arrays(), 50);
-        Random gen = random();
-        double[] data = new double[100];
-        for (int i = 0; i < data.length; i++) {
-            data[i] = Math.floor(gen.nextGaussian() * 3);
-        }
-        for (int i = 0; i < 100; i++) {
-            for (double x : data) {
-                d.add(x);
+        try (TDigest d = MergingDigest.create(arrays(), 50)) {
+            Random gen = random();
+            double[] data = new double[100];
+            for (int i = 0; i < data.length; i++) {
+                data[i] = Math.floor(gen.nextGaussian() * 3);
             }
-        }
-        long last = 0;
-        for (Centroid centroid : d.centroids()) {
-            if (last == 0) {
-                assertEquals(1, centroid.count());
+            for (int i = 0; i < 100; i++) {
+                for (double x : data) {
+                    d.add(x);
+                }
             }
-            last = centroid.count();
+            long last = 0;
+            for (Centroid centroid : d.centroids()) {
+                if (last == 0) {
+                    assertEquals(1, centroid.count());
+                }
+                last = centroid.count();
+            }
+            assertEquals(1, last);
         }
-        assertEquals(1, last);
     }
 
     /**
      * Verify centroid sizes.
      */
     public void testFill() {
-        MergingDigest x = new MergingDigest(arrays(), 300);
-        Random gen = random();
-        ScaleFunction scale = x.getScaleFunction();
-        double compression = x.compression();
-        for (int i = 0; i < 1000000; i++) {
-            x.add(gen.nextGaussian());
-        }
-        double q0 = 0;
-        int i = 0;
-        for (Centroid centroid : x.centroids()) {
-            double q1 = q0 + (double) centroid.count() / x.size();
-            double dk = scale.k(q1, compression, x.size()) - scale.k(q0, compression, x.size());
-            if (centroid.count() > 1) {
-                assertTrue(String.format(Locale.ROOT, "K-size for centroid %d at %.3f is %.3f", i, centroid.mean(), dk), dk <= 1);
+        try (MergingDigest x = MergingDigest.create(arrays(), 300)) {
+            Random gen = random();
+            ScaleFunction scale = x.getScaleFunction();
+            double compression = x.compression();
+            for (int i = 0; i < 1000000; i++) {
+                x.add(gen.nextGaussian());
+            }
+            double q0 = 0;
+            int i = 0;
+            for (Centroid centroid : x.centroids()) {
+                double q1 = q0 + (double) centroid.count() / x.size();
+                double dk = scale.k(q1, compression, x.size()) - scale.k(q0, compression, x.size());
+                if (centroid.count() > 1) {
+                    assertTrue(String.format(Locale.ROOT, "K-size for centroid %d at %.3f is %.3f", i, centroid.mean(), dk), dk <= 1);
+                }
+                q0 = q1;
+                i++;
             }
-            q0 = q1;
-            i++;
         }
     }
 
     public void testLargeInputSmallCompression() {
-        MergingDigest td = new MergingDigest(arrays(), 10);
-        for (int i = 0; i < 10_000_000; i++) {
-            td.add(between(0, 3_600_000));
+        try (MergingDigest td = MergingDigest.create(arrays(), 10)) {
+            for (int i = 0; i < 10_000_000; i++) {
+                td.add(between(0, 3_600_000));
+            }
+            assertTrue(td.centroidCount() < 100);
+            assertTrue(td.quantile(0.00001) < 100_000);
+            assertTrue(td.quantile(0.99999) > 3_000_000);
         }
-        assertTrue(td.centroidCount() < 100);
-        assertTrue(td.quantile(0.00001) < 100_000);
-        assertTrue(td.quantile(0.99999) > 3_000_000);
     }
 }

+ 6 - 4
libs/tdigest/src/test/java/org/elasticsearch/tdigest/SortTests.java

@@ -35,6 +35,7 @@ public class SortTests extends TDigestTestCase {
         Sort.reverse(x, 0, x.size());
 
         // reverse stuff!
+        x.close();
         x = arrays().newIntArray(new int[] { 1, 2, 3, 4, 5 });
         Sort.reverse(x, 0, x.size());
         for (int i = 0; i < 5; i++) {
@@ -57,11 +58,13 @@ public class SortTests extends TDigestTestCase {
         assertEquals(4, x.get(3));
         assertEquals(1, x.get(4));
 
+        x.close();
         x = arrays().newIntArray(new int[] { 1, 2, 3, 4, 5, 6 });
         Sort.reverse(x, 0, x.size());
         for (int i = 0; i < 6; i++) {
             assertEquals(6 - i, x.get(i));
         }
+        x.close();
     }
 
     public void testEmpty() {
@@ -227,9 +230,8 @@ public class SortTests extends TDigestTestCase {
     }
 
     private void sort(int[] order, double[] values, int n) {
-        var wrappedOrder = arrays().newIntArray(order);
-        var wrappedValues = arrays().newDoubleArray(values);
-
-        Sort.stableSort(wrappedOrder, wrappedValues, n);
+        try (var wrappedOrder = arrays().newIntArray(order); var wrappedValues = arrays().newDoubleArray(values);) {
+            Sort.stableSort(wrappedOrder, wrappedValues, n);
+        }
     }
 }

+ 1 - 1
libs/tdigest/src/test/java/org/elasticsearch/tdigest/SortingDigestTests.java

@@ -24,7 +24,7 @@ package org.elasticsearch.tdigest;
 public class SortingDigestTests extends TDigestTests {
 
     protected DigestFactory factory(final double compression) {
-        return () -> new SortingDigest(arrays());
+        return () -> SortingDigest.create(arrays());
     }
 
     // Make this test a noop to avoid OOMs.

+ 89 - 0
libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestReleasingTests.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch B.V. under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch B.V. licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * This project is based on a modification of https://github.com/tdunning/t-digest which is licensed under the Apache 2.0 License.
+ */
+
+package org.elasticsearch.tdigest;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays;
+import org.elasticsearch.tdigest.arrays.TDigestArrays;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class TDigestReleasingTests extends ESTestCase {
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() {
+        return List.of(
+            makeTDigestParams("Hybrid", (arrays) -> TDigest.createHybridDigest(arrays, 100)),
+            makeTDigestParams("Merging", (arrays) -> TDigest.createMergingDigest(arrays, 100)),
+            makeTDigestParams("Sorting", TDigest::createSortingDigest),
+            makeTDigestParams("AvlTree", (arrays) -> TDigest.createAvlTreeDigest(arrays, 100))
+        );
+    }
+
+    public record TestCase(String name, CircuitBreaker breaker, Supplier<TDigest> tDigestSupplier) {
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    private static Object[] makeTDigestParams(String name, Function<TDigestArrays, TDigest> tDigestSupplier) {
+        var breaker = newLimitedBreaker(ByteSizeValue.ofMb(100));
+        return new Object[] { new TestCase(name, breaker, () -> tDigestSupplier.apply(new MemoryTrackingTDigestArrays(breaker))) };
+    }
+
+    private final TestCase testCase;
+
+    public TDigestReleasingTests(TestCase testCase) {
+        this.testCase = testCase;
+    }
+
+    public void testRelease() {
+        var breaker = testCase.breaker;
+        assertThat(breaker.getUsed(), equalTo(0L));
+
+        var tDigest = testCase.tDigestSupplier.get();
+        assertThat(breaker.getUsed(), greaterThan(0L));
+        assertThat(breaker.getUsed(), equalTo(tDigest.ramBytesUsed()));
+
+        for (int i = 0; i < 10_000; i++) {
+            tDigest.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
+        }
+        assertThat(breaker.getUsed(), greaterThan(0L));
+        assertThat(breaker.getUsed(), equalTo(tDigest.ramBytesUsed()));
+
+        tDigest.close();
+        assertThat("close() must release all memory", breaker.getUsed(), equalTo(0L));
+
+        tDigest.close();
+        assertThat("close() must be idempotent", breaker.getUsed(), equalTo(0L));
+    }
+
+}

+ 7 - 2
libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTestCase.java

@@ -53,8 +53,8 @@ public abstract class TDigestTestCase extends ESTestCase {
      *     The arrays created by this method will be automatically released after the test.
      * </p>
      */
-    protected DelegatingTDigestArrays arrays() {
-        return new DelegatingTDigestArrays();
+    protected MemoryTrackingTDigestArrays arrays() {
+        return new MemoryTrackingTDigestArrays(newLimitedBreaker(ByteSizeValue.ofMb(100)));
     }
 
     /**
@@ -82,6 +82,11 @@ public abstract class TDigestTestCase extends ESTestCase {
             return register(delegate.newDoubleArray(data));
         }
 
+        @Override
+        public void adjustBreaker(long size) {
+            delegate.adjustBreaker(size);
+        }
+
         @Override
         public TDigestDoubleArray newDoubleArray(int size) {
             return register(delegate.newDoubleArray(size));

+ 277 - 256
libs/tdigest/src/test/java/org/elasticsearch/tdigest/TDigestTests.java

@@ -59,6 +59,7 @@ public abstract class TDigestTests extends TDigestTestCase {
         assertEquals(0.95, digest.cdf(500_000), 1e-5);
         assertEquals(0.975, digest.cdf(1_000_000), 1e-5);
 
+        digest.close();
         digest = factory(80).create();
         digest.setScaleFunction(ScaleFunction.K_0);
 
@@ -72,21 +73,23 @@ public abstract class TDigestTests extends TDigestTestCase {
         assertEquals(19.0, digest.quantile(0.915), 0.1);
         assertEquals(19.0, digest.quantile(0.935), 0.1);
         assertEquals(1_000_000.0, digest.quantile(0.965), 0.1);
+        digest.close();
     }
 
     public void testSmallCountQuantile() {
         List<Double> data = List.of(15.0, 20.0, 32.0, 60.0);
-        TDigest td = factory(200).create();
-        for (Double datum : data) {
-            td.add(datum);
+        try (TDigest td = factory(200).create()) {
+            for (Double datum : data) {
+                td.add(datum);
+            }
+            assertEquals(15.0, td.quantile(0.00), 1e-5);
+            assertEquals(16.0, td.quantile(0.10), 1.0);
+            assertEquals(18.0, td.quantile(0.25), 1.0);
+            assertEquals(26.0, td.quantile(0.50), 1e-5);
+            assertEquals(42.0, td.quantile(0.75), 4.0);
+            assertEquals(55.0, td.quantile(0.90), 5.0);
+            assertEquals(60.0, td.quantile(1.00), 1e-5);
         }
-        assertEquals(15.0, td.quantile(0.00), 1e-5);
-        assertEquals(16.0, td.quantile(0.10), 1.0);
-        assertEquals(18.0, td.quantile(0.25), 1.0);
-        assertEquals(26.0, td.quantile(0.50), 1e-5);
-        assertEquals(42.0, td.quantile(0.75), 4.0);
-        assertEquals(55.0, td.quantile(0.90), 5.0);
-        assertEquals(60.0, td.quantile(1.00), 1e-5);
     }
 
     public void testExplicitSkewedData() {
@@ -123,35 +126,37 @@ public abstract class TDigestTests extends TDigestTestCase {
             51242,
             54241 };
 
-        TDigest digest = factory().create();
-        for (double x : data) {
-            digest.add(x);
-        }
+        try (TDigest digest = factory().create()) {
+            for (double x : data) {
+                digest.add(x);
+            }
 
-        assertEquals(Dist.quantile(0.5, data), digest.quantile(0.5), 0);
+            assertEquals(Dist.quantile(0.5, data), digest.quantile(0.5), 0);
+        }
     }
 
     public void testQuantile() {
         double[] samples = new double[] { 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 3.0, 3.0, 3.0, 3.0, 4.0, 5.0, 6.0, 7.0 };
 
-        TDigest hist1 = factory().create();
-        List<Double> data = new ArrayList<>();
+        try (TDigest hist1 = factory().create(); TDigest hist2 = factory().create()) {
+            List<Double> data = new ArrayList<>();
 
-        for (int j = 0; j < 100; j++) {
-            for (double x : samples) {
-                data.add(x);
-                hist1.add(x);
+            for (int j = 0; j < 100; j++) {
+                for (double x : samples) {
+                    data.add(x);
+                    hist1.add(x);
+                }
             }
+
+            hist1.compress();
+            hist2.add(hist1);
+            Collections.sort(data);
+            hist2.compress();
+            double x1 = hist1.quantile(0.5);
+            double x2 = hist2.quantile(0.5);
+            assertEquals(Dist.quantile(0.5, data), x1, 0.25);
+            assertEquals(x1, x2, 0.01);
         }
-        TDigest hist2 = factory().create();
-        hist1.compress();
-        hist2.add(hist1);
-        Collections.sort(data);
-        hist2.compress();
-        double x1 = hist1.quantile(0.5);
-        double x2 = hist2.quantile(0.5);
-        assertEquals(Dist.quantile(0.5, data), x1, 0.25);
-        assertEquals(x1, x2, 0.01);
     }
 
     /**
@@ -159,43 +164,45 @@ public abstract class TDigestTests extends TDigestTestCase {
      */
     public void testSingletonQuantiles() {
         double[] data = new double[11];
-        TDigest digest = factory().create();
-        for (int i = 0; i < data.length; i++) {
-            digest.add(i);
-            data[i] = i;
-        }
+        try (TDigest digest = factory().create()) {
+            for (int i = 0; i < data.length; i++) {
+                digest.add(i);
+                data[i] = i;
+            }
 
-        for (double x = digest.getMin() - 0.1; x <= digest.getMax() + 0.1; x += 1e-3) {
-            assertEquals(String.valueOf(x), Dist.cdf(x, data), digest.cdf(x), 0.1);
-        }
+            for (double x = digest.getMin() - 0.1; x <= digest.getMax() + 0.1; x += 1e-3) {
+                assertEquals(String.valueOf(x), Dist.cdf(x, data), digest.cdf(x), 0.1);
+            }
 
-        for (int i = 0; i <= 1000; i++) {
-            double q = 0.001 * i;
-            double dist = Dist.quantile(q, data);
-            double td = digest.quantile(q);
-            assertEquals(String.valueOf(q), dist, td, 0.5);
+            for (int i = 0; i <= 1000; i++) {
+                double q = 0.001 * i;
+                double dist = Dist.quantile(q, data);
+                double td = digest.quantile(q);
+                assertEquals(String.valueOf(q), dist, td, 0.5);
+            }
         }
     }
 
     public void testCentroidsWithIncreasingWeights() {
         ArrayList<Double> data = new ArrayList<>();
-        TDigest digest = factory().create();
-        for (int i = 1; i <= 10; i++) {
-            digest.add(i, i);
-            for (int j = 0; j < i; j++) {
-                data.add((double) i);
+        try (TDigest digest = factory().create()) {
+            for (int i = 1; i <= 10; i++) {
+                digest.add(i, i);
+                for (int j = 0; j < i; j++) {
+                    data.add((double) i);
+                }
             }
-        }
 
-        for (double x = digest.getMin() - 0.1; x <= digest.getMax() + 0.1; x += 1e-3) {
-            assertEquals(String.valueOf(x), Dist.cdf(x, data), digest.cdf(x), 0.5);
-        }
+            for (double x = digest.getMin() - 0.1; x <= digest.getMax() + 0.1; x += 1e-3) {
+                assertEquals(String.valueOf(x), Dist.cdf(x, data), digest.cdf(x), 0.5);
+            }
 
-        for (int i = 0; i <= 1000; i++) {
-            double q = 0.001 * i;
-            double dist = Dist.quantile(q, data);
-            double td = digest.quantile(q);
-            assertEquals(String.valueOf(q), dist, td, 0.75);
+            for (int i = 0; i <= 1000; i++) {
+                double q = 0.001 * i;
+                double dist = Dist.quantile(q, data);
+                double td = digest.quantile(q);
+                assertEquals(String.valueOf(q), dist, td, 0.75);
+            }
         }
     }
 
@@ -203,15 +210,16 @@ public abstract class TDigestTests extends TDigestTestCase {
      * Verifies behavior involving interpolation between singleton centroids.
      */
     public void testSingleSingleRange() {
-        TDigest digest = factory().create();
-        digest.add(1);
-        digest.add(2);
-        digest.add(3);
+        try (TDigest digest = factory().create()) {
+            digest.add(1);
+            digest.add(2);
+            digest.add(3);
 
-        // verify the cdf is a step between singletons
-        assertEquals(0.5 / 3.0, digest.cdf(1), 0);
-        assertEquals(1.5 / 3.0, digest.cdf(2), 0);
-        assertEquals(2.5 / 3.0, digest.cdf(3), 0);
+            // verify the cdf is a step between singletons
+            assertEquals(0.5 / 3.0, digest.cdf(1), 0);
+            assertEquals(1.5 / 3.0, digest.cdf(2), 0);
+            assertEquals(2.5 / 3.0, digest.cdf(3), 0);
+        }
     }
 
     /**
@@ -240,6 +248,7 @@ public abstract class TDigestTests extends TDigestTestCase {
 
         // normally min == mean[0] because weight[0] == 1
         // we can force this not to be true for testing
+        digest.close();
         digest = factory().create();
         digest.setScaleFunction(ScaleFunction.K_0);
         for (int i = 0; i < 100; i++) {
@@ -278,219 +287,229 @@ public abstract class TDigestTests extends TDigestTestCase {
 
         assertEquals(4, digest.quantile(1), 0);
         assertEquals(last.mean(), 4, 0);
+        digest.close();
     }
 
     public void testFewRepeatedValues() {
-        TDigest d = factory().create();
-        for (int i = 0; i < 2; ++i) {
-            d.add(9000);
-        }
-        for (int i = 0; i < 11; ++i) {
-            d.add(3000);
-        }
-        for (int i = 0; i < 26; ++i) {
-            d.add(1000);
-        }
+        try (TDigest d = factory().create()) {
+            for (int i = 0; i < 2; ++i) {
+                d.add(9000);
+            }
+            for (int i = 0; i < 11; ++i) {
+                d.add(3000);
+            }
+            for (int i = 0; i < 26; ++i) {
+                d.add(1000);
+            }
 
-        assertEquals(3000.0, d.quantile(0.90), 1e-5);
-        assertEquals(4500.0, d.quantile(0.95), 2000);
-        assertEquals(8500.0, d.quantile(0.97), 500);
-        assertEquals(9000.0, d.quantile(0.98), 1e-5);
-        assertEquals(9000.0, d.quantile(1.00), 1e-5);
+            assertEquals(3000.0, d.quantile(0.90), 1e-5);
+            assertEquals(4500.0, d.quantile(0.95), 2000);
+            assertEquals(8500.0, d.quantile(0.97), 500);
+            assertEquals(9000.0, d.quantile(0.98), 1e-5);
+            assertEquals(9000.0, d.quantile(1.00), 1e-5);
+        }
     }
 
     public void testSingleValue() {
         Random rand = random();
-        final TDigest digest = factory().create();
-        final double value = rand.nextDouble() * 1000;
-        digest.add(value);
+        try (TDigest digest = factory().create()) {
+            final double value = rand.nextDouble() * 1000;
+            digest.add(value);
 
-        assertEquals(value, digest.quantile(0.0), 0);
-        assertEquals(value, digest.quantile(1.0), 0);
-        assertEquals(value, digest.quantile(rand.nextDouble()), 0);
+            assertEquals(value, digest.quantile(0.0), 0);
+            assertEquals(value, digest.quantile(1.0), 0);
+            assertEquals(value, digest.quantile(rand.nextDouble()), 0);
 
-        assertEquals(0.0, digest.cdf(value - 1e-5), 0.0);
-        assertEquals(1.0, digest.cdf(value + 1e5), 0.0);
-        assertEquals(0.5, digest.cdf(value), 0.0);
+            assertEquals(0.0, digest.cdf(value - 1e-5), 0.0);
+            assertEquals(1.0, digest.cdf(value + 1e5), 0.0);
+            assertEquals(0.5, digest.cdf(value), 0.0);
+        }
     }
 
     public void testFewValues() {
         // When there are few values in the tree, quantiles should be exact
-        final TDigest digest = factory().create();
-        final Random r = random();
-        final int length = r.nextInt(10);
-        final List<Double> values = new ArrayList<>();
-        for (int i = 0; i < length; ++i) {
-            final double value;
-            if (i == 0 || r.nextBoolean()) {
-                value = r.nextDouble() * 100;
-            } else {
-                // introduce duplicates
-                value = values.get(i - 1);
+        try (TDigest digest = factory().create()) {
+            final Random r = random();
+            final int length = r.nextInt(10);
+            final List<Double> values = new ArrayList<>();
+            for (int i = 0; i < length; ++i) {
+                final double value;
+                if (i == 0 || r.nextBoolean()) {
+                    value = r.nextDouble() * 100;
+                } else {
+                    // introduce duplicates
+                    value = values.get(i - 1);
+                }
+                digest.add(value);
+                values.add(value);
+            }
+            Collections.sort(values);
+
+            // for this value of the compression, the tree shouldn't have merged any node
+            assertEquals(digest.centroids().size(), values.size());
+            for (double q : new double[] { 0, 1e-10, 0.5, 1 - 1e-10, 1 }) {
+                double q1 = Dist.quantile(q, values);
+                double q2 = digest.quantile(q);
+                assertEquals(String.valueOf(q), q1, q2, q1);
             }
-            digest.add(value);
-            values.add(value);
-        }
-        Collections.sort(values);
-
-        // for this value of the compression, the tree shouldn't have merged any node
-        assertEquals(digest.centroids().size(), values.size());
-        for (double q : new double[] { 0, 1e-10, 0.5, 1 - 1e-10, 1 }) {
-            double q1 = Dist.quantile(q, values);
-            double q2 = digest.quantile(q);
-            assertEquals(String.valueOf(q), q1, q2, q1);
         }
     }
 
     public void testEmptyDigest() {
-        TDigest digest = factory().create();
-        assertEquals(0, digest.centroids().size());
-        assertEquals(0, digest.size());
-        assertTrue(Double.isNaN(digest.quantile(random().nextDouble())));
-        assertTrue(Double.isNaN(digest.cdf(0)));
+        try (TDigest digest = factory().create()) {
+            assertEquals(0, digest.centroids().size());
+            assertEquals(0, digest.size());
+            assertTrue(Double.isNaN(digest.quantile(random().nextDouble())));
+            assertTrue(Double.isNaN(digest.cdf(0)));
+        }
     }
 
     public void testMoreThan2BValues() {
-        final TDigest digest = factory().create();
-        // carefully build a t-digest that is as if we added 3 uniform values from [0,1]
-        double n = 3e9;
-        double q0 = 0;
-        for (int i = 0; i < 200 && q0 < 1 - 1e-10; ++i) {
-            double k0 = digest.scale.k(q0, digest.compression(), n);
-            double q = digest.scale.q(k0 + 1, digest.compression(), n);
-            int m = (int) Math.max(1, n * (q - q0));
-            digest.add((q + q0) / 2, m);
-            q0 = q0 + m / n;
-        }
-        digest.compress();
-        assertEquals(3_000_000_000L, digest.size());
-        assertTrue(digest.size() > Integer.MAX_VALUE);
-        final double[] quantiles = new double[] { 0, 0.1, 0.5, 0.9, 1 };
-        double prev = Double.NEGATIVE_INFINITY;
-        for (double q : quantiles) {
-            final double v = digest.quantile(q);
-            assertTrue(String.format(Locale.ROOT, "q=%.1f, v=%.4f, pref=%.4f", q, v, prev), v >= prev);
-            prev = v;
+        try (TDigest digest = factory().create()) {
+            // carefully build a t-digest that is as if we added 3 uniform values from [0,1]
+            double n = 3e9;
+            double q0 = 0;
+            for (int i = 0; i < 200 && q0 < 1 - 1e-10; ++i) {
+                double k0 = digest.scale.k(q0, digest.compression(), n);
+                double q = digest.scale.q(k0 + 1, digest.compression(), n);
+                int m = (int) Math.max(1, n * (q - q0));
+                digest.add((q + q0) / 2, m);
+                q0 = q0 + m / n;
+            }
+            digest.compress();
+            assertEquals(3_000_000_000L, digest.size());
+            assertTrue(digest.size() > Integer.MAX_VALUE);
+            final double[] quantiles = new double[] { 0, 0.1, 0.5, 0.9, 1 };
+            double prev = Double.NEGATIVE_INFINITY;
+            for (double q : quantiles) {
+                final double v = digest.quantile(q);
+                assertTrue(String.format(Locale.ROOT, "q=%.1f, v=%.4f, pref=%.4f", q, v, prev), v >= prev);
+                prev = v;
+            }
         }
     }
 
     public void testSorted() {
-        final TDigest digest = factory().create();
-        Random gen = random();
-        for (int i = 0; i < 10000; ++i) {
-            int w = 1 + gen.nextInt(10);
-            double x = gen.nextDouble();
-            for (int j = 0; j < w; j++) {
-                digest.add(x);
+        try (TDigest digest = factory().create()) {
+            Random gen = random();
+            for (int i = 0; i < 10000; ++i) {
+                int w = 1 + gen.nextInt(10);
+                double x = gen.nextDouble();
+                for (int j = 0; j < w; j++) {
+                    digest.add(x);
+                }
             }
-        }
-        Centroid previous = null;
-        for (Centroid centroid : digest.centroids()) {
-            if (previous != null) {
-                if (previous.mean() <= centroid.mean()) {
-                    assertTrue(Double.compare(previous.mean(), centroid.mean()) <= 0);
+            Centroid previous = null;
+            for (Centroid centroid : digest.centroids()) {
+                if (previous != null) {
+                    if (previous.mean() <= centroid.mean()) {
+                        assertTrue(Double.compare(previous.mean(), centroid.mean()) <= 0);
+                    }
                 }
+                previous = centroid;
             }
-            previous = centroid;
         }
     }
 
     public void testNaN() {
-        final TDigest digest = factory().create();
-        Random gen = random();
-        final int iters = gen.nextInt(100);
-        for (int i = 0; i < iters; ++i) {
-            digest.add(gen.nextDouble(), 1 + gen.nextInt(10));
-        }
-        try {
-            // both versions should fail
-            if (gen.nextBoolean()) {
-                digest.add(Double.NaN);
-            } else {
-                digest.add(Double.NaN, 1);
+        try (TDigest digest = factory().create()) {
+            Random gen = random();
+            final int iters = gen.nextInt(100);
+            for (int i = 0; i < iters; ++i) {
+                digest.add(gen.nextDouble(), 1 + gen.nextInt(10));
+            }
+            try {
+                // both versions should fail
+                if (gen.nextBoolean()) {
+                    digest.add(Double.NaN);
+                } else {
+                    digest.add(Double.NaN, 1);
+                }
+                fail("NaN should be an illegal argument");
+            } catch (IllegalArgumentException e) {
+                // expected
             }
-            fail("NaN should be an illegal argument");
-        } catch (IllegalArgumentException e) {
-            // expected
         }
     }
 
     public void testMidPointRule() {
-        TDigest dist = factory(200).create();
-        dist.add(1);
-        dist.add(2);
-
-        for (int i = 0; i < 1000; i++) {
+        try (TDigest dist = factory(200).create()) {
             dist.add(1);
             dist.add(2);
-            if (i % 8 == 0) {
-                String message = String.format(Locale.ROOT, "i = %d", i);
-                assertEquals(message, 0, dist.cdf(1 - 1e-9), 0);
-                assertEquals(message, 0.3, dist.cdf(1), 0.2);
-                assertEquals(message, 0.8, dist.cdf(2), 0.2);
-                assertEquals(message, 1, dist.cdf(2 + 1e-9), 0);
-
-                assertEquals(1.0, dist.quantile(0.0), 1e-5);
-                assertEquals(1.0, dist.quantile(0.1), 1e-5);
-                assertEquals(1.0, dist.quantile(0.2), 1e-5);
-
-                assertTrue(dist.quantile(0.5) > 1.0);
-                assertTrue(dist.quantile(0.5) < 2.0);
-
-                assertEquals(2.0, dist.quantile(0.7), 1e-5);
-                assertEquals(2.0, dist.quantile(0.8), 1e-5);
-                assertEquals(2.0, dist.quantile(0.9), 1e-5);
-                assertEquals(2.0, dist.quantile(1.0), 1e-5);
+
+            for (int i = 0; i < 1000; i++) {
+                dist.add(1);
+                dist.add(2);
+                if (i % 8 == 0) {
+                    String message = String.format(Locale.ROOT, "i = %d", i);
+                    assertEquals(message, 0, dist.cdf(1 - 1e-9), 0);
+                    assertEquals(message, 0.3, dist.cdf(1), 0.2);
+                    assertEquals(message, 0.8, dist.cdf(2), 0.2);
+                    assertEquals(message, 1, dist.cdf(2 + 1e-9), 0);
+
+                    assertEquals(1.0, dist.quantile(0.0), 1e-5);
+                    assertEquals(1.0, dist.quantile(0.1), 1e-5);
+                    assertEquals(1.0, dist.quantile(0.2), 1e-5);
+
+                    assertTrue(dist.quantile(0.5) > 1.0);
+                    assertTrue(dist.quantile(0.5) < 2.0);
+
+                    assertEquals(2.0, dist.quantile(0.7), 1e-5);
+                    assertEquals(2.0, dist.quantile(0.8), 1e-5);
+                    assertEquals(2.0, dist.quantile(0.9), 1e-5);
+                    assertEquals(2.0, dist.quantile(1.0), 1e-5);
+                }
             }
         }
-
     }
 
     public void testThreePointExample() {
-        TDigest tdigest = factory().create();
-        double x0 = 0.18615591526031494;
-        double x1 = 0.4241943657398224;
-        double x2 = 0.8813006281852722;
-
-        tdigest.add(x0);
-        tdigest.add(x1);
-        tdigest.add(x2);
-
-        double p10 = tdigest.quantile(0.1);
-        double p50 = tdigest.quantile(0.5);
-        double p90 = tdigest.quantile(0.9);
-        double p95 = tdigest.quantile(0.95);
-        double p99 = tdigest.quantile(0.99);
-
-        assertTrue(Double.compare(p10, p50) <= 0);
-        assertTrue(Double.compare(p50, p90) <= 0);
-        assertTrue(Double.compare(p90, p95) <= 0);
-        assertTrue(Double.compare(p95, p99) <= 0);
-
-        assertEquals(x0, tdigest.quantile(0.0), 0);
-        assertEquals(x2, tdigest.quantile(1.0), 0);
-
-        assertTrue(String.valueOf(p10), Double.compare(x0, p10) <= 0);
-        assertTrue(String.valueOf(p10), Double.compare(x1, p10) >= 0);
-        assertTrue(String.valueOf(p99), Double.compare(x1, p99) <= 0);
-        assertTrue(String.valueOf(p99), Double.compare(x2, p99) >= 0);
+        try (TDigest tdigest = factory().create()) {
+            double x0 = 0.18615591526031494;
+            double x1 = 0.4241943657398224;
+            double x2 = 0.8813006281852722;
+
+            tdigest.add(x0);
+            tdigest.add(x1);
+            tdigest.add(x2);
+
+            double p10 = tdigest.quantile(0.1);
+            double p50 = tdigest.quantile(0.5);
+            double p90 = tdigest.quantile(0.9);
+            double p95 = tdigest.quantile(0.95);
+            double p99 = tdigest.quantile(0.99);
+
+            assertTrue(Double.compare(p10, p50) <= 0);
+            assertTrue(Double.compare(p50, p90) <= 0);
+            assertTrue(Double.compare(p90, p95) <= 0);
+            assertTrue(Double.compare(p95, p99) <= 0);
+
+            assertEquals(x0, tdigest.quantile(0.0), 0);
+            assertEquals(x2, tdigest.quantile(1.0), 0);
+
+            assertTrue(String.valueOf(p10), Double.compare(x0, p10) <= 0);
+            assertTrue(String.valueOf(p10), Double.compare(x1, p10) >= 0);
+            assertTrue(String.valueOf(p99), Double.compare(x1, p99) <= 0);
+            assertTrue(String.valueOf(p99), Double.compare(x2, p99) >= 0);
+        }
     }
 
     public void testSingletonInACrowd() {
-        TDigest dist = factory().create();
-        for (int i = 0; i < 10000; i++) {
-            dist.add(10);
+        try (TDigest dist = factory().create()) {
+            for (int i = 0; i < 10000; i++) {
+                dist.add(10);
+            }
+            dist.add(20);
+            dist.compress();
+
+            // The actual numbers depend on how the digest get constructed.
+            // A singleton on the right boundary yields much better accuracy, e.g. q(0.9999) == 10.
+            // Otherwise, quantiles above 0.9 use interpolation between 10 and 20, thus returning higher values.
+            assertEquals(10.0, dist.quantile(0), 0);
+            assertEquals(10.0, dist.quantile(0.9), 0);
+            assertEquals(19.0, dist.quantile(0.99999), 1);
+            assertEquals(20.0, dist.quantile(1), 0);
         }
-        dist.add(20);
-        dist.compress();
-
-        // The actual numbers depend on how the digest get constructed.
-        // A singleton on the right boundary yields much better accuracy, e.g. q(0.9999) == 10.
-        // Otherwise, quantiles above 0.9 use interpolation between 10 and 20, thus returning higher values.
-        assertEquals(10.0, dist.quantile(0), 0);
-        assertEquals(10.0, dist.quantile(0.9), 0);
-        assertEquals(19.0, dist.quantile(0.99999), 1);
-        assertEquals(20.0, dist.quantile(1), 0);
     }
 
     public void testScaling() {
@@ -503,41 +522,43 @@ public abstract class TDigestTests extends TDigestTestCase {
         Collections.sort(data);
 
         for (double compression : new double[] { 10, 20, 50, 100, 200, 500, 1000 }) {
-            TDigest dist = factory(compression).create();
-            for (Double x : data) {
-                dist.add(x);
-            }
-            dist.compress();
-
-            for (double q : new double[] { 0.001, 0.01, 0.1, 0.5 }) {
-                double estimate = dist.quantile(q);
-                double actual = data.get((int) (q * data.size()));
-                if (Double.compare(estimate, 0) != 0) {
-                    assertTrue(Double.compare(Math.abs(actual - estimate) / estimate, 1) < 0);
-                } else {
-                    assertEquals(Double.compare(estimate, 0), 0);
+            try (TDigest dist = factory(compression).create()) {
+                for (Double x : data) {
+                    dist.add(x);
+                }
+                dist.compress();
+
+                for (double q : new double[] { 0.001, 0.01, 0.1, 0.5 }) {
+                    double estimate = dist.quantile(q);
+                    double actual = data.get((int) (q * data.size()));
+                    if (Double.compare(estimate, 0) != 0) {
+                        assertTrue(Double.compare(Math.abs(actual - estimate) / estimate, 1) < 0);
+                    } else {
+                        assertEquals(Double.compare(estimate, 0), 0);
+                    }
                 }
             }
         }
     }
 
     public void testMonotonicity() {
-        TDigest digest = factory().create();
-        final Random gen = random();
-        for (int i = 0; i < 100000; i++) {
-            digest.add(gen.nextDouble());
-        }
+        try (TDigest digest = factory().create()) {
+            final Random gen = random();
+            for (int i = 0; i < 100000; i++) {
+                digest.add(gen.nextDouble());
+            }
 
-        double lastQuantile = -1;
-        double lastX = -1;
-        for (double z = 0; z <= 1; z += 1e-4) {
-            double x = digest.quantile(z);
-            assertTrue("q: " + z + " x: " + x + " last: " + lastX, Double.compare(x, lastX) >= 0);
-            lastX = x;
+            double lastQuantile = -1;
+            double lastX = -1;
+            for (double z = 0; z <= 1; z += 1e-4) {
+                double x = digest.quantile(z);
+                assertTrue("q: " + z + " x: " + x + " last: " + lastX, Double.compare(x, lastX) >= 0);
+                lastX = x;
 
-            double q = digest.cdf(z);
-            assertTrue("Q: " + z, Double.compare(q, lastQuantile) >= 0);
-            lastQuantile = q;
+                double q = digest.cdf(z);
+                assertTrue("Q: " + z, Double.compare(q, lastQuantile) >= 0);
+                lastQuantile = q;
+            }
         }
     }
 }

+ 0 - 1
server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalMedianAbsoluteDeviation.java

@@ -25,7 +25,6 @@ import java.util.Objects;
 public class InternalMedianAbsoluteDeviation extends InternalNumericMetricsAggregation.SingleValue implements MedianAbsoluteDeviation {
 
     public static double computeMedianAbsoluteDeviation(TDigestState valuesSketch) {
-
         if (valuesSketch.size() == 0) {
             return Double.NaN;
         } else {

+ 12 - 3
server/src/main/java/org/elasticsearch/search/aggregations/metrics/MemoryTrackingTDigestArrays.java

@@ -21,7 +21,6 @@ import org.elasticsearch.tdigest.arrays.TDigestIntArray;
 import org.elasticsearch.tdigest.arrays.TDigestLongArray;
 
 import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * TDigestArrays with raw arrays and circuit breaking.
@@ -34,6 +33,15 @@ public class MemoryTrackingTDigestArrays implements TDigestArrays {
         this.breaker = breaker;
     }
 
+    @Override
+    public void adjustBreaker(long size) {
+        if (size > 0) {
+            breaker.addEstimateBytesAndMaybeBreak(size, "tdigest-adjust-breaker");
+        } else {
+            breaker.addWithoutBreaking(size);
+        }
+    }
+
     @Override
     public MemoryTrackingTDigestDoubleArray newDoubleArray(int initialSize) {
         breaker.addEstimateBytesAndMaybeBreak(
@@ -80,7 +88,7 @@ public class MemoryTrackingTDigestArrays implements TDigestArrays {
 
     private abstract static class AbstractMemoryTrackingArray implements Releasable, Accountable {
         protected final CircuitBreaker breaker;
-        private final AtomicBoolean closed = new AtomicBoolean(false);
+        private boolean closed = false;
 
         AbstractMemoryTrackingArray(CircuitBreaker breaker) {
             this.breaker = breaker;
@@ -88,7 +96,8 @@ public class MemoryTrackingTDigestArrays implements TDigestArrays {
 
         @Override
         public final void close() {
-            if (closed.compareAndSet(false, true)) {
+            if (closed == false) {
+                closed = true;
                 breaker.addWithoutBreaking(-ramBytesUsed());
             }
         }

+ 58 - 11
server/src/main/java/org/elasticsearch/search/aggregations/metrics/TDigestState.java

@@ -8,6 +8,8 @@
  */
 package org.elasticsearch.search.aggregations.metrics;
 
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
@@ -27,11 +29,13 @@ import java.util.Iterator;
  * through factory method params, providing one optimized for performance (e.g. MergingDigest or HybridDigest) by default, or optionally one
  * that produces highly accurate results regardless of input size but its construction over the sample population takes 2x-10x longer.
  */
-public class TDigestState implements Releasable {
+public class TDigestState implements Releasable, Accountable {
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(TDigestState.class);
 
-    protected static final CircuitBreaker DEFAULT_NOOP_BREAKER = new NoopCircuitBreaker("default-tdigest-state-noop-breaker");
+    private static final CircuitBreaker DEFAULT_NOOP_BREAKER = new NoopCircuitBreaker("default-tdigest-state-noop-breaker");
 
     private final CircuitBreaker breaker;
+    private boolean closed = false;
 
     private final double compression;
 
@@ -71,7 +75,23 @@ public class TDigestState implements Releasable {
      * @return a TDigestState object that's optimized for performance
      */
     public static TDigestState create(CircuitBreaker breaker, double compression) {
-        return new TDigestState(breaker, Type.defaultValue(), compression);
+        breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create");
+        try {
+            return new TDigestState(breaker, Type.defaultValue(), compression);
+        } catch (Exception e) {
+            breaker.addWithoutBreaking(-SHALLOW_SIZE);
+            throw e;
+        }
+    }
+
+    static TDigestState create(CircuitBreaker breaker, Type type, double compression) {
+        breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-with-type");
+        try {
+            return new TDigestState(breaker, type, compression);
+        } catch (Exception e) {
+            breaker.addWithoutBreaking(-SHALLOW_SIZE);
+            throw e;
+        }
     }
 
     /**
@@ -80,7 +100,13 @@ public class TDigestState implements Releasable {
      * @return a TDigestState object that's optimized for performance
      */
     static TDigestState createOptimizedForAccuracy(CircuitBreaker breaker, double compression) {
-        return new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
+        breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-optimized-for-accuracy");
+        try {
+            return new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
+        } catch (Exception e) {
+            breaker.addWithoutBreaking(-SHALLOW_SIZE);
+            throw e;
+        }
     }
 
     /**
@@ -114,7 +140,13 @@ public class TDigestState implements Releasable {
      * @return a TDigestState object
      */
     public static TDigestState createUsingParamsFrom(TDigestState state) {
-        return new TDigestState(state.breaker, state.type, state.compression);
+        state.breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-create-using-params-from");
+        try {
+            return new TDigestState(state.breaker, state.type, state.compression);
+        } catch (Exception e) {
+            state.breaker.addWithoutBreaking(-SHALLOW_SIZE);
+            throw e;
+        }
     }
 
     protected TDigestState(CircuitBreaker breaker, Type type, double compression) {
@@ -130,6 +162,11 @@ public class TDigestState implements Releasable {
         this.compression = compression;
     }
 
+    @Override
+    public long ramBytesUsed() {
+        return SHALLOW_SIZE + tdigest.ramBytesUsed();
+    }
+
     public final double compression() {
         return compression;
     }
@@ -161,11 +198,17 @@ public class TDigestState implements Releasable {
         double compression = in.readDouble();
         TDigestState state;
         long size = 0;
-        if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
-            state = new TDigestState(breaker, Type.valueOf(in.readString()), compression);
-            size = in.readVLong();
-        } else {
-            state = new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
+        breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE, "tdigest-state-read");
+        try {
+            if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
+                state = new TDigestState(breaker, Type.valueOf(in.readString()), compression);
+                size = in.readVLong();
+            } else {
+                state = new TDigestState(breaker, Type.valueForHighAccuracy(), compression);
+            }
+        } catch (Exception e) {
+            breaker.addWithoutBreaking(-SHALLOW_SIZE);
+            throw e;
         }
         int n = in.readVInt();
         if (size > 0) {
@@ -281,6 +324,10 @@ public class TDigestState implements Releasable {
 
     @Override
     public void close() {
-        Releasables.close(tdigest);
+        if (closed == false) {
+            closed = true;
+            breaker.addWithoutBreaking(-SHALLOW_SIZE);
+            Releasables.close(tdigest);
+        }
     }
 }

+ 59 - 0
server/src/test/java/org/elasticsearch/search/aggregations/metrics/TDigestStateReleasingTests.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.search.aggregations.metrics;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.Arrays;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class TDigestStateReleasingTests extends ESTestCase {
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() {
+        return Arrays.stream(TDigestState.Type.values()).map(type -> new Object[] { type }).toList();
+    }
+
+    private final TDigestState.Type digestType;
+
+    public TDigestStateReleasingTests(TDigestState.Type digestType) {
+        this.digestType = digestType;
+    }
+
+    /**
+     * Tests that a circuit breaker trip leaves no unreleased memory.
+     */
+    public void testCircuitBreakerTrip() {
+        for (int bytes = randomIntBetween(0, 16); bytes < 50_000; bytes += 17) {
+            CircuitBreaker breaker = newLimitedBreaker(ByteSizeValue.ofBytes(bytes));
+
+            try (TDigestState state = TDigestState.create(breaker, digestType, 100)) {
+                // Add some data to make it trip. It won't work in all digest types
+                for (int i = 0; i < 100; i++) {
+                    state.add(randomDoubleBetween(-Double.MAX_VALUE, Double.MAX_VALUE, true));
+                }
+
+                // Testing with more memory shouldn't change anything, we finished the test
+                return;
+            } catch (CircuitBreakingException e) {
+                // Expected
+            } finally {
+                assertThat("unreleased bytes with a " + bytes + " bytes limit", breaker.getUsed(), equalTo(0L));
+            }
+        }
+
+        fail("Test case didn't reach a non-tripping breaker limit");
+    }
+}

+ 1 - 5
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/QuantileStates.java

@@ -18,7 +18,6 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.aggregations.metrics.InternalMedianAbsoluteDeviation;
 import org.elasticsearch.search.aggregations.metrics.TDigestState;
@@ -235,10 +234,7 @@ public final class QuantileStates {
 
         @Override
         public void close() {
-            Releasables.close(
-                Releasables.wrap(LongStream.range(0, digests.size()).mapToObj(i -> (Releasable) digests.get(i)).toList()),
-                digests
-            );
+            Releasables.close(Releasables.wrap(LongStream.range(0, digests.size()).mapToObj(i -> digests.get(i)).toList()), digests);
         }
     }
 }