Переглянути джерело

Revert "Reduce InternalBinaryRange and InternalRandomSampler in a streaming fashion (#105381)" (#105539)

This reverts commit ef9609b33433ae9c06094e85638a9234d136ee7d.
Ignacio Vera 1 рік тому
батько
коміт
e64cf991a9

+ 39 - 15
server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java

@@ -18,11 +18,12 @@ import org.elasticsearch.search.aggregations.AggregatorReducer;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
-import org.elasticsearch.search.aggregations.bucket.FixedMultiBucketAggregatorsReducer;
 import org.elasticsearch.search.aggregations.support.SamplingContext;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -247,27 +248,50 @@ public final class InternalBinaryRange extends InternalMultiBucketAggregation<In
 
         return new AggregatorReducer() {
 
-            final FixedMultiBucketAggregatorsReducer<Bucket> reducer = new FixedMultiBucketAggregatorsReducer<>(
-                reduceContext,
-                size,
-                getBuckets()
-            ) {
-
-                @Override
-                protected Bucket createBucket(Bucket proto, long docCount, InternalAggregations aggregations) {
-                    return new Bucket(proto.format, proto.keyed, proto.key, proto.from, proto.to, docCount, aggregations);
-                }
-            };
+            final List<InternalBinaryRange> aggregations = new ArrayList<>(size);
 
             @Override
             public void accept(InternalAggregation aggregation) {
-                InternalBinaryRange binaryRange = (InternalBinaryRange) aggregation;
-                reducer.accept(binaryRange.getBuckets());
+                aggregations.add((InternalBinaryRange) aggregation);
             }
 
             @Override
             public InternalAggregation get() {
-                return new InternalBinaryRange(name, format, keyed, reducer.get(), metadata);
+                reduceContext.consumeBucketsAndMaybeBreak(buckets.size());
+                long[] docCounts = new long[buckets.size()];
+                InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];
+                for (int i = 0; i < aggs.length; ++i) {
+                    aggs[i] = new InternalAggregations[aggregations.size()];
+                }
+                for (int i = 0; i < aggregations.size(); ++i) {
+                    InternalBinaryRange range = aggregations.get(i);
+                    if (range.buckets.size() != buckets.size()) {
+                        throw new IllegalStateException(
+                            "Expected [" + buckets.size() + "] buckets, but got [" + range.buckets.size() + "]"
+                        );
+                    }
+                    for (int j = 0; j < buckets.size(); ++j) {
+                        Bucket bucket = range.buckets.get(j);
+                        docCounts[j] += bucket.docCount;
+                        aggs[j][i] = bucket.aggregations;
+                    }
+                }
+                List<Bucket> buckets = new ArrayList<>(getBuckets().size());
+                for (int i = 0; i < getBuckets().size(); ++i) {
+                    Bucket b = getBuckets().get(i);
+                    buckets.add(
+                        new Bucket(
+                            format,
+                            keyed,
+                            b.key,
+                            b.from,
+                            b.to,
+                            docCounts[i],
+                            InternalAggregations.reduce(Arrays.asList(aggs[i]), reduceContext)
+                        )
+                    );
+                }
+                return new InternalBinaryRange(name, format, keyed, buckets, metadata);
             }
         };
     }

+ 6 - 4
server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/InternalRandomSampler.java

@@ -12,7 +12,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.AggregatorReducer;
-import org.elasticsearch.search.aggregations.AggregatorsReducer;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
@@ -21,6 +20,8 @@ import org.elasticsearch.search.aggregations.support.SamplingContext;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class InternalRandomSampler extends InternalSingleBucketAggregation implements Sampler {
@@ -78,21 +79,22 @@ public class InternalRandomSampler extends InternalSingleBucketAggregation imple
     protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
         return new AggregatorReducer() {
             long docCount = 0L;
-            final AggregatorsReducer subAggregatorReducer = new AggregatorsReducer(reduceContext, size);
+            final List<InternalAggregations> subAggregationsList = new ArrayList<>(size);
 
             @Override
             public void accept(InternalAggregation aggregation) {
                 docCount += ((InternalSingleBucketAggregation) aggregation).getDocCount();
-                subAggregatorReducer.accept(((InternalSingleBucketAggregation) aggregation).getAggregations());
+                subAggregationsList.add(((InternalSingleBucketAggregation) aggregation).getAggregations());
             }
 
             @Override
             public InternalAggregation get() {
-                InternalAggregations aggs = subAggregatorReducer.get();
+                InternalAggregations aggs = InternalAggregations.reduce(subAggregationsList, reduceContext);
                 if (reduceContext.isFinalReduce() && aggs != null) {
                     SamplingContext context = buildContext();
                     aggs = InternalAggregations.from(aggs.asList().stream().map(agg -> agg.finalizeSampling(context)).toList());
                 }
+
                 return newAggregation(getName(), docCount, aggs);
             }
         };