Browse Source

Move InternalAggregations to Writeable (#41841)

Relates to #34389
Luca Cavanna 6 years ago
parent
commit
3eb85751d2
18 changed files with 46 additions and 59 deletions
  1. 6 5
      server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java
  2. 20 34
      server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java
  3. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java
  4. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java
  5. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java
  6. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java
  7. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java
  8. 2 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java
  9. 2 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java
  10. 2 2
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java
  11. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java
  12. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java
  13. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java
  14. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java
  15. 1 1
      server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
  16. 2 2
      server/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java
  17. 1 1
      server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java
  18. 1 1
      server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

+ 6 - 5
server/src/main/java/org/elasticsearch/search/aggregations/Aggregations.java

@@ -34,6 +34,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableMap;
 import static org.elasticsearch.common.xcontent.XContentParserUtils.parseTypedKeysObject;
 
@@ -44,14 +45,14 @@ public class Aggregations implements Iterable<Aggregation>, ToXContentFragment {
 
     public static final String AGGREGATIONS_FIELD = "aggregations";
 
-    protected List<? extends Aggregation> aggregations = Collections.emptyList();
-    protected Map<String, Aggregation> aggregationsAsMap;
-
-    protected Aggregations() {
-    }
+    protected final List<? extends Aggregation> aggregations;
+    private Map<String, Aggregation> aggregationsAsMap;
 
     public Aggregations(List<? extends Aggregation> aggregations) {
         this.aggregations = aggregations;
+        if (aggregations.isEmpty()) {
+            aggregationsAsMap = emptyMap();
+        }
     }
 
     /**

+ 20 - 34
server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java

@@ -20,7 +20,7 @@ package org.elasticsearch.search.aggregations;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@@ -34,14 +34,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-import static java.util.Collections.emptyMap;
-
 /**
  * An internal implementation of {@link Aggregations}.
  */
-public final class InternalAggregations extends Aggregations implements Streamable {
+public final class InternalAggregations extends Aggregations implements Writeable {
+
+    public static final InternalAggregations EMPTY = new InternalAggregations(Collections.emptyList());
 
-    public static final InternalAggregations EMPTY = new InternalAggregations();
     private static final Comparator<InternalAggregation> INTERNAL_AGG_COMPARATOR = (agg1, agg2) -> {
         if (agg1.isMapped() == agg2.isMapped()) {
             return 0;
@@ -52,16 +51,14 @@ public final class InternalAggregations extends Aggregations implements Streamab
         }
     };
 
-    private List<SiblingPipelineAggregator> topLevelPipelineAggregators = Collections.emptyList();
-
-    private InternalAggregations() {
-    }
+    private final List<SiblingPipelineAggregator> topLevelPipelineAggregators;
 
     /**
      * Constructs a new aggregation.
      */
     public InternalAggregations(List<InternalAggregation> aggregations) {
         super(aggregations);
+        this.topLevelPipelineAggregators = Collections.emptyList();
     }
 
     /**
@@ -72,6 +69,19 @@ public final class InternalAggregations extends Aggregations implements Streamab
         this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators);
     }
 
+    public InternalAggregations(StreamInput in) throws IOException {
+        super(in.readList(stream -> in.readNamedWriteable(InternalAggregation.class)));
+        this.topLevelPipelineAggregators = in.readList(
+            stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class));
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
+        out.writeNamedWriteableList(topLevelPipelineAggregators);
+    }
+
     /**
      * Returns the top-level pipeline aggregators.
      * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
@@ -86,8 +96,7 @@ public final class InternalAggregations extends Aggregations implements Streamab
      * {@link InternalAggregations} object found in the list.
      * Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
      */
-    public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
-                                              ReduceContext context) {
+    public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
         if (aggregationsList.isEmpty()) {
             return null;
         }
@@ -123,27 +132,4 @@ public final class InternalAggregations extends Aggregations implements Streamab
         }
         return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
     }
-
-    public static InternalAggregations readAggregations(StreamInput in) throws IOException {
-        InternalAggregations result = new InternalAggregations();
-        result.readFrom(in);
-        return result;
-    }
-
-    @Override
-    public void readFrom(StreamInput in) throws IOException {
-        aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class));
-        if (aggregations.isEmpty()) {
-            aggregationsAsMap = emptyMap();
-        }
-        this.topLevelPipelineAggregators = in.readList(
-                stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class));
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void writeTo(StreamOutput out) throws IOException {
-        out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
-        out.writeNamedWriteableList(topLevelPipelineAggregators);
-    }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java

@@ -60,7 +60,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
     protected InternalSingleBucketAggregation(StreamInput in) throws IOException {
         super(in);
         docCount = in.readVLong();
-        aggregations = InternalAggregations.readAggregations(in);
+        aggregations = new InternalAggregations(in);
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java

@@ -58,7 +58,7 @@ public class InternalAdjacencyMatrix
         public InternalBucket(StreamInput in) throws IOException {
             key = in.readOptionalString();
             docCount = in.readVLong();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

@@ -237,7 +237,7 @@ public class InternalComposite
         InternalBucket(StreamInput in, List<String> sourceNames, List<DocValueFormat> formats, int[] reverseMuls) throws IOException {
             this.key = new CompositeKey(in);
             this.docCount = in.readVLong();
-            this.aggregations = InternalAggregations.readAggregations(in);
+            this.aggregations = new InternalAggregations(in);
             this.reverseMuls = reverseMuls;
             this.sourceNames = sourceNames;
             this.formats = formats;

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

@@ -57,7 +57,7 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
             this.keyed = keyed;
             key = in.readOptionalString();
             docCount = in.readVLong();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java

@@ -51,7 +51,7 @@ public abstract class InternalGeoGridBucket<B extends InternalGeoGridBucket>
     public InternalGeoGridBucket(StreamInput in) throws IOException {
         hashAsLong = in.readLong();
         docCount = in.readVLong();
-        aggregations = InternalAggregations.readAggregations(in);
+        aggregations = new InternalAggregations(in);
     }
 
     @Override

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

@@ -73,7 +73,7 @@ public final class InternalAutoDateHistogram extends
             this.format = format;
             key = in.readLong();
             docCount = in.readVLong();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override
@@ -175,7 +175,7 @@ public final class InternalAutoDateHistogram extends
                 roundingInfos[i] = new RoundingInfo(in);
             }
             roundingIdx = in.readVInt();
-            emptySubAggregations = InternalAggregations.readAggregations(in);
+            emptySubAggregations = new InternalAggregations(in);
         }
 
         void writeTo(StreamOutput out) throws IOException {

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

@@ -77,7 +77,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
             this.keyed = keyed;
             key = in.readLong();
             docCount = in.readVLong();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override
@@ -186,7 +186,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
 
         EmptyBucketInfo(StreamInput in) throws IOException {
             rounding = Rounding.read(in);
-            subAggregations = InternalAggregations.readAggregations(in);
+            subAggregations = new InternalAggregations(in);
             bounds = in.readOptionalWriteable(ExtendedBounds::new);
         }
 

+ 2 - 2
server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

@@ -73,7 +73,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
             this.keyed = keyed;
             key = in.readDouble();
             docCount = in.readVLong();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override
@@ -178,7 +178,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
         }
 
         EmptyBucketInfo(StreamInput in) throws IOException {
-            this(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), InternalAggregations.readAggregations(in));
+            this(in.readDouble(), in.readDouble(), in.readDouble(), in.readDouble(), new InternalAggregations(in));
         }
 
         public void writeTo(StreamOutput out) throws IOException {

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java

@@ -77,7 +77,7 @@ public final class InternalBinaryRange
             BytesRef from = in.readBoolean() ? in.readBytesRef() : null;
             BytesRef to = in.readBoolean() ? in.readBytesRef() : null;
             long docCount = in.readLong();
-            InternalAggregations aggregations = InternalAggregations.readAggregations(in);
+            InternalAggregations aggregations = new InternalAggregations(in);
 
             return new Bucket(format, keyed, key, from, to, docCount, aggregations);
         }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java

@@ -256,7 +256,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
         for (int i = 0; i < size; i++) {
             String key = in.readString();
             ranges.add(getFactory().createBucket(key, in.readDouble(), in.readDouble(), in.readVLong(),
-                    InternalAggregations.readAggregations(in), keyed, format));
+                    new InternalAggregations(in), keyed, format));
         }
         this.ranges = ranges;
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantLongTerms.java

@@ -59,7 +59,7 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
             supersetDf = in.readVLong();
             term = in.readLong();
             score = in.readDouble();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/SignificantStringTerms.java

@@ -57,7 +57,7 @@ public class SignificantStringTerms extends InternalMappedSignificantTerms<Signi
             subsetDf = in.readVLong();
             supersetDf = in.readVLong();
             score = in.readDouble();
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java

@@ -85,7 +85,7 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
             if (showDocCountError) {
                 docCountError = in.readLong();
             }
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
 
         @Override

+ 2 - 2
server/src/main/java/org/elasticsearch/search/internal/InternalSearchResponse.java

@@ -52,7 +52,7 @@ public class InternalSearchResponse extends SearchResponseSections implements Wr
     public InternalSearchResponse(StreamInput in) throws IOException {
         super(
                 new SearchHits(in),
-                in.readBoolean() ? InternalAggregations.readAggregations(in) : null,
+                in.readBoolean() ? new InternalAggregations(in) : null,
                 in.readBoolean() ? new Suggest(in) : null,
                 in.readBoolean(),
                 in.readOptionalBoolean(),
@@ -64,7 +64,7 @@ public class InternalSearchResponse extends SearchResponseSections implements Wr
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         hits.writeTo(out);
-        out.writeOptionalStreamable((InternalAggregations)aggregations);
+        out.writeOptionalWriteable((InternalAggregations)aggregations);
         out.writeOptionalWriteable(suggest);
         out.writeBoolean(timedOut);
         out.writeOptionalBoolean(terminatedEarly);

+ 1 - 1
server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

@@ -284,7 +284,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
         }
         setTopDocs(readTopDocs(in));
         if (hasAggs = in.readBoolean()) {
-            aggregations = InternalAggregations.readAggregations(in);
+            aggregations = new InternalAggregations(in);
         }
         if (in.getVersion().before(Version.V_7_2_0)) {
             List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationsTests.java

@@ -132,7 +132,7 @@ public class InternalAggregationsTests extends ESTestCase {
             aggregations.writeTo(out);
             try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), registry)) {
                 in.setVersion(version);
-                InternalAggregations deserialized = InternalAggregations.readAggregations(in);
+                InternalAggregations deserialized = new InternalAggregations(in);
                 assertEquals(aggregations.aggregations, deserialized.aggregations);
                 if (aggregations.getTopLevelPipelineAggregators() == null) {
                     assertEquals(0, deserialized.getTopLevelPipelineAggregators().size());