Browse Source

Release aggregations early during the aggregation phase (#96220)

Ignacio Vera 2 years ago
parent
commit
7b212cd072

+ 5 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java

@@ -294,6 +294,11 @@ public class AggConstructionContentionBenchmark {
             releaseMe.add(aggregator);
         }
 
+        @Override
+        public void removeReleasable(Aggregator aggregator) {
+            releaseMe.remove(aggregator);
+        }
+
         @Override
         public int maxBuckets() {
             return Integer.MAX_VALUE;

+ 3 - 0
server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

@@ -671,6 +671,9 @@ public class TransportSearchIT extends ESIntegTestCase {
             return new InternalAggregation[] { buildEmptyAggregation() };
         }
 
+        @Override
+        public void releaseAggregations() {}
+
         @Override
         public InternalAggregation buildEmptyAggregation() {
             return new Max(name(), Double.NaN, DocValueFormat.RAW, null);

+ 5 - 0
server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java

@@ -50,6 +50,11 @@ public abstract class AdaptingAggregator extends Aggregator {
      */
     protected abstract InternalAggregation adapt(InternalAggregation delegateResult) throws IOException;
 
+    @Override
+    public void releaseAggregations() {
+        delegate.releaseAggregations();
+    }
+
     @Override
     public final void close() {
         delegate.close();

+ 2 - 0
server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

@@ -103,6 +103,8 @@ public class AggregationPhase {
             } catch (IOException e) {
                 throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
             }
+            // release the aggregator to claim the used bytes as we don't need it anymore
+            aggregator.releaseAggregations();
         }
         context.queryResult().aggregations(InternalAggregations.from(aggregations));
 

+ 5 - 0
server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java

@@ -143,6 +143,11 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
      */
     public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException;
 
+    /**
+     * Release this aggregation and its sub-aggregations.
+     */
+    public abstract void releaseAggregations();
+
     /**
      * Build the result of this aggregation if it is at the "top level"
      * of the aggregation tree. If, instead, it is a sub-aggregation of

+ 9 - 0
server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java

@@ -273,6 +273,15 @@ public abstract class AggregatorBase extends Aggregator {
         return subAggregatorbyName.get(aggName);
     }
 
+    @Override
+    public void releaseAggregations() {
+        // release sub aggregations
+        Arrays.stream(subAggregators).forEach(Aggregator::releaseAggregations);
+        // release this aggregation
+        close();
+        context.removeReleasable(this);
+    }
+
     /**
      * Called after collection of all document is done.
      * <p>

+ 5 - 0
server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java

@@ -83,6 +83,11 @@ public abstract class DeferringBucketCollector extends BucketCollector {
             return in.buildAggregations(owningBucketOrds);
         }
 
+        @Override
+        public void releaseAggregations() {
+            in.releaseAggregations();
+        }
+
         @Override
         public InternalAggregation buildEmptyAggregation() {
             return in.buildEmptyAggregation();

+ 14 - 0
server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java

@@ -229,6 +229,11 @@ public abstract class AggregationContext implements Releasable {
      */
     public abstract void addReleasable(Aggregator aggregator);
 
+    /**
+     * Cause this aggregation to be released when the search is finished.
+     */
+    public abstract void removeReleasable(Aggregator aggregator);
+
     /**
      * Max buckets provided by the search.max_buckets setting
      */
@@ -517,9 +522,18 @@ public abstract class AggregationContext implements Releasable {
 
         @Override
         public void addReleasable(Aggregator aggregator) {
+            assert releaseMe.contains(aggregator) == false
+                : "adding aggregator [" + aggregator.name() + "] twice in the aggregation context";
             releaseMe.add(aggregator);
         }
 
+        @Override
+        public void removeReleasable(Aggregator aggregator) {
+            assert releaseMe.contains(aggregator)
+                : "removing non-existing aggregator [" + aggregator.name() + "] from the the aggregation context";
+            releaseMe.remove(aggregator);
+        }
+
         @Override
         public int maxBuckets() {
             return maxBuckets;

+ 5 - 0
server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java

@@ -81,6 +81,11 @@ public class ProfilingAggregator extends Aggregator {
         return result;
     }
 
+    @Override
+    public void releaseAggregations() {
+        delegate.releaseAggregations();
+    }
+
     @Override
     public InternalAggregation buildEmptyAggregation() {
         return delegate.buildEmptyAggregation();

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java

@@ -489,6 +489,11 @@ public abstract class MapperServiceTestCase extends ESTestCase {
                 // TODO we'll have to handle this in the tests eventually
             }
 
+            @Override
+            public void removeReleasable(Aggregator aggregator) {
+                // TODO we'll have to handle this in the tests eventually
+            }
+
             @Override
             public int maxBuckets() {
                 return Integer.MAX_VALUE;