Browse Source

Clean up serialization of terms aggregation results

Move to NamedWriteable and remove a lot of duplication.
Nik Everett 9 years ago
parent
commit
fe0f28965a
19 changed files with 517 additions and 493 deletions
  1. 0 5
      buildSrc/src/main/resources/checkstyle_suppressions.xml
  2. 6 5
      core/src/main/java/org/elasticsearch/search/SearchModule.java
  3. 3 3
      core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java
  4. 4 4
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java
  5. 42 102
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java
  6. 10 14
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java
  7. 10 11
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java
  8. 113 0
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java
  9. 99 76
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java
  10. 40 99
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java
  11. 12 12
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java
  12. 42 102
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java
  13. 6 6
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java
  14. 6 3
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java
  15. 2 2
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java
  16. 64 45
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java
  17. 4 4
      core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/BucketPriorityQueue.java
  18. 27 0
      modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/DoubleTermsTests.java
  19. 27 0
      modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/LongTermsTests.java

+ 0 - 5
buildSrc/src/main/resources/checkstyle_suppressions.xml

@@ -599,19 +599,14 @@
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]ScriptHeuristic.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]significant[/\\]heuristics[/\\]SignificanceHeuristic.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]AbstractTermsParametersParser.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]DoubleTerms.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]DoubleTermsAggregator.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]GlobalOrdinalsStringTermsAggregator.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]InternalOrder.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]InternalTerms.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]LongTerms.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]LongTermsAggregator.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]StringTerms.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]StringTermsAggregator.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]TermsAggregator.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]TermsAggregatorFactory.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]TermsParametersParser.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]UnmappedTerms.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]bucket[/\\]terms[/\\]support[/\\]IncludeExclude.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]ValuesSourceMetricsAggregationBuilder.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]metrics[/\\]cardinality[/\\]CardinalityAggregator.java" checks="LineLength" />

+ 6 - 5
core/src/main/java/org/elasticsearch/search/SearchModule.java

@@ -563,7 +563,12 @@ public class SearchModule extends AbstractModule {
                         .addResultReader(UnmappedSampler.NAME, UnmappedSampler::new));
         registerAggregation(DiversifiedAggregationBuilder::new, new DiversifiedSamplerParser(),
                 DiversifiedAggregationBuilder.AGGREGATION_NAME_FIELD);
-        registerAggregation(TermsAggregationBuilder::new, new TermsParser(), TermsAggregationBuilder.AGGREGATION_NAME_FIELD);
+        registerAggregation(
+                new AggregationSpec(TermsAggregationBuilder::new, new TermsParser(), TermsAggregationBuilder.AGGREGATION_NAME_FIELD)
+                    .addResultReader(StringTerms.NAME, StringTerms::new)
+                    .addResultReader(UnmappedTerms.NAME, UnmappedTerms::new)
+                    .addResultReader(LongTerms.NAME, LongTerms::new)
+                    .addResultReader(DoubleTerms.NAME, DoubleTerms::new));
         registerAggregation(SignificantTermsAggregationBuilder::new,
                 new SignificantTermsParser(significanceHeuristicParserRegistry, queryParserRegistry),
                 SignificantTermsAggregationBuilder.AGGREGATION_NAME_FIELD);
@@ -770,14 +775,10 @@ public class SearchModule extends AbstractModule {
 
     static {
         // buckets
-        StringTerms.registerStreams();
-        LongTerms.registerStreams();
         SignificantStringTerms.registerStreams();
         SignificantLongTerms.registerStreams();
         UnmappedSignificantTerms.registerStreams();
         InternalGeoHashGrid.registerStreams();
-        DoubleTerms.registerStreams();
-        UnmappedTerms.registerStreams();
         InternalRange.registerStream();
         InternalDateRange.registerStream();
         InternalBinaryRange.registerStream();

+ 3 - 3
core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

@@ -236,9 +236,9 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
     }
 
     /**
-     * Reduces the given addAggregation to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
-     * addAggregation are of the same type (the same type as this aggregation). For best efficiency, when implementing,
-     * try reusing an existing get instance (typically the first in the given list) to save on redundant object
+     * Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
+     * aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,
+     * try reusing an existing instance (typically the first in the given list) to save on redundant object
      * construction.
      */
     public final InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {

+ 4 - 4
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

@@ -27,10 +27,11 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.emptyList;
+
 abstract class AbstractStringTermsAggregator extends TermsAggregator {
 
     protected final boolean showTermDocCountError;
@@ -44,9 +45,8 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator {
 
     @Override
     public InternalAggregation buildEmptyAggregation() {
-        return new StringTerms(name, order, format, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(),
-                bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket> emptyList(), showTermDocCountError, 0, 0,
-                pipelineAggregators(), metaData());
+        return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                pipelineAggregators(), metaData(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
     }
 
 }

+ 42 - 102
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTerms.java

@@ -22,42 +22,21 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
- *
+ * Result of the {@link TermsAggregator} when the field is some kind of decimal number like a float, double, or distance.
  */
-public class DoubleTerms extends InternalTerms<DoubleTerms, DoubleTerms.Bucket> {
-
-    public static final Type TYPE = new Type("terms", "dterms");
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public DoubleTerms readResult(StreamInput in) throws IOException {
-            DoubleTerms buckets = new DoubleTerms();
-            buckets.readFrom(in);
-            return buckets;
-        }
-    };
+public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bucket> {
+    public static final String NAME = "dterms";
 
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
-    }
-
-    static class Bucket extends InternalTerms.Bucket {
-
-        double term;
-
-        public Bucket(DocValueFormat format, boolean showDocCountError) {
-            super(format, showDocCountError);
-        }
+    static class Bucket extends InternalTerms.Bucket<Bucket> {
+        private final double term;
 
         public Bucket(double term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError,
                 DocValueFormat format) {
@@ -65,9 +44,22 @@ public class DoubleTerms extends InternalTerms<DoubleTerms, DoubleTerms.Bucket>
             this.term = term;
         }
 
+        /**
+         * Read from a stream.
+         */
+        public Bucket(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException {
+            super(in, format, showDocCountError);
+            term = in.readDouble();
+        }
+
+        @Override
+        protected void writeTermTo(StreamOutput out) throws IOException {
+            out.writeDouble(term);
+        }
+
         @Override
         public String getKeyAsString() {
-            return String.valueOf(term);
+            return format.format(term);
         }
 
         @Override
@@ -90,27 +82,6 @@ public class DoubleTerms extends InternalTerms<DoubleTerms, DoubleTerms.Bucket>
             return new Bucket(term, docCount, aggs, showDocCountError, docCountError, format);
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            term = in.readDouble();
-            docCount = in.readVLong();
-            docCountError = -1;
-            if (showDocCountError) {
-                docCountError = in.readLong();
-            }
-            aggregations = InternalAggregations.readAggregations(in);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            out.writeDouble(term);
-            out.writeVLong(getDocCount());
-            if (showDocCountError) {
-                out.writeLong(docCountError);
-            }
-            aggregations.writeTo(out);
-        }
-
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
@@ -128,25 +99,29 @@ public class DoubleTerms extends InternalTerms<DoubleTerms, DoubleTerms.Bucket>
         }
     }
 
-    DoubleTerms() {
-    } // for serialization
+    public DoubleTerms(String name, Terms.Order order, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
+            Map<String, Object> metaData, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
+            List<Bucket> buckets, long docCountError) {
+        super(name, order, requiredSize, minDocCount, pipelineAggregators, metaData, format, shardSize, showTermDocCountError,
+                otherDocCount, buckets, docCountError);
+    }
 
-    public DoubleTerms(String name, Terms.Order order, DocValueFormat format, int requiredSize, int shardSize,
-            long minDocCount, List<? extends InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError,
-            long otherDocCount, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
-        super(name, order, format, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount, pipelineAggregators,
-                metaData);
+    /**
+     * Read from a stream.
+     */
+    public DoubleTerms(StreamInput in) throws IOException {
+        super(in, Bucket::new);
     }
 
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return NAME;
     }
 
     @Override
     public DoubleTerms create(List<Bucket> buckets) {
-        return new DoubleTerms(this.name, this.order, this.format, this.requiredSize, this.shardSize, this.minDocCount, buckets,
-                this.showTermDocCountError, this.docCountError, this.otherDocCount, this.pipelineAggregators(), this.metaData);
+        return new DoubleTerms(name, order, requiredSize, minDocCount, this.pipelineAggregators(), metaData, format, shardSize,
+                showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
     @Override
@@ -156,48 +131,9 @@ public class DoubleTerms extends InternalTerms<DoubleTerms, DoubleTerms.Bucket>
     }
 
     @Override
-    protected DoubleTerms create(String name, List<org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.Bucket> buckets,
-            long docCountError, long otherDocCount, InternalTerms prototype) {
-        return new DoubleTerms(name, prototype.order, ((DoubleTerms) prototype).format, prototype.requiredSize, prototype.shardSize,
-                prototype.minDocCount, buckets, prototype.showTermDocCountError, docCountError, otherDocCount, prototype.pipelineAggregators(),
-                prototype.getMetaData());
-    }
-
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        this.docCountError = in.readLong();
-        this.order = InternalOrder.Streams.readOrder(in);
-        this.format = in.readNamedWriteable(DocValueFormat.class);
-        this.requiredSize = readSize(in);
-        this.shardSize = readSize(in);
-        this.showTermDocCountError = in.readBoolean();
-        this.minDocCount = in.readVLong();
-        this.otherDocCount = in.readVLong();
-        int size = in.readVInt();
-        List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            Bucket bucket = new Bucket(format, showTermDocCountError);
-            bucket.readFrom(in);
-            buckets.add(bucket);
-        }
-        this.buckets = buckets;
-        this.bucketMap = null;
-    }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        out.writeLong(docCountError);
-        InternalOrder.Streams.writeOrder(order, out);
-        out.writeNamedWriteable(format);
-        writeSize(requiredSize, out);
-        writeSize(shardSize, out);
-        out.writeBoolean(showTermDocCountError);
-        out.writeVLong(minDocCount);
-        out.writeVLong(otherDocCount);
-        out.writeVInt(buckets.size());
-        for (InternalTerms.Bucket bucket : buckets) {
-            bucket.writeTo(out);
-        }
+    protected DoubleTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
+        return new DoubleTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), getMetaData(), format,
+                shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
     @Override
@@ -205,11 +141,15 @@ public class DoubleTerms extends InternalTerms<DoubleTerms, DoubleTerms.Bucket>
         builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
         builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
         builder.startArray(CommonFields.BUCKETS);
-        for (InternalTerms.Bucket bucket : buckets) {
+        for (Bucket bucket : buckets) {
             bucket.toXContent(builder, params);
         }
         builder.endArray();
         return builder;
     }
 
+    @Override
+    protected Bucket[] createBucketsArray(int size) {
+        return new Bucket[size];
+    }
 }

+ 10 - 14
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/DoubleTermsAggregator.java

@@ -32,9 +32,9 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
 import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  *
@@ -66,20 +66,16 @@ public class DoubleTermsAggregator extends LongTermsAggregator {
         return convertToDouble(terms);
     }
 
-    private static DoubleTerms.Bucket convertToDouble(InternalTerms.Bucket bucket) {
-        final long term = ((Number) bucket.getKey()).longValue();
-        final double value = NumericUtils.sortableLongToDouble(term);
-        return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError, bucket.format);
-    }
-
     private static DoubleTerms convertToDouble(LongTerms terms) {
-        final InternalTerms.Bucket[] buckets = terms.getBuckets().toArray(new InternalTerms.Bucket[0]);
-        for (int i = 0; i < buckets.length; ++i) {
-            buckets[i] = convertToDouble(buckets[i]);
-        }
-        return new DoubleTerms(terms.getName(), terms.order, terms.format, terms.requiredSize, terms.shardSize, terms.minDocCount,
-                Arrays.asList(buckets), terms.showTermDocCountError, terms.docCountError, terms.otherDocCount, terms.pipelineAggregators(),
-                terms.getMetaData());
+        List<DoubleTerms.Bucket> buckets = terms.buckets.stream().map(DoubleTermsAggregator::convertToDouble).collect(Collectors.toList());
+        return new DoubleTerms(terms.getName(), terms.order, terms.requiredSize, terms.minDocCount, terms.pipelineAggregators(),
+                terms.getMetaData(), terms.format, terms.shardSize, terms.showTermDocCountError, terms.otherDocCount, buckets,
+                terms.docCountError);
     }
 
+    private static DoubleTerms.Bucket convertToDouble(LongTerms.Bucket bucket) {
+        double value = NumericUtils.sortableLongToDouble(bucket.term);
+        return new DoubleTerms.Bucket(value, bucket.docCount, bucket.aggregations, bucket.showDocCountError, bucket.docCountError,
+                bucket.format);
+    }
 }

+ 10 - 11
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

@@ -41,7 +41,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.LeafBucketCollector;
 import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
-import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.Bucket;
 import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
 import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -156,7 +155,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
         }
         long otherDocCount = 0;
-        BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
+        BucketPriorityQueue<OrdBucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
         OrdBucket spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
         for (long globalTermOrd = 0; globalTermOrd < globalOrds.getValueCount(); ++globalTermOrd) {
             if (includeExclude != null && !acceptedGlobalOrdinals.get(globalTermOrd)) {
@@ -172,7 +171,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
             spare.bucketOrd = bucketOrd;
             spare.docCount = bucketDocCount;
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
-                spare = (OrdBucket) ordered.insertWithOverflow(spare);
+                spare = ordered.insertWithOverflow(spare);
                 if (spare == null) {
                     spare = new OrdBucket(-1, 0, null, showTermDocCountError, 0);
                 }
@@ -180,7 +179,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
         }
 
         // Get the top buckets
-        final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
+        final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()];
         long survivingBucketOrds[] = new long[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; --i) {
             final OrdBucket bucket = (OrdBucket) ordered.pop();
@@ -196,20 +195,20 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
 
         //Now build the aggs
         for (int i = 0; i < list.length; i++) {
-            Bucket bucket = list[i];
+            StringTerms.Bucket bucket = list[i];
             bucket.aggregations = bucket.docCount == 0 ? bucketEmptyAggregations() : bucketAggregations(bucket.bucketOrd);
             bucket.docCountError = 0;
         }
 
-        return new StringTerms(name, order, format, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(),
-                bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount, pipelineAggregators(),
-                metaData());
+        return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                pipelineAggregators(), metaData(), format, bucketCountThresholds.getShardSize(), showTermDocCountError,
+                otherDocCount, Arrays.asList(list), 0);
     }
 
     /**
      * This is used internally only, just for compare using global ordinal instead of term bytes in the PQ
      */
-    static class OrdBucket extends InternalTerms.Bucket {
+    static class OrdBucket extends InternalTerms.Bucket<OrdBucket> {
         long globalOrd;
 
         OrdBucket(long globalOrd, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError) {
@@ -233,7 +232,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
         }
 
         @Override
-        Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {
+        OrdBucket newBucket(long docCount, InternalAggregations aggs, long docCountError) {
             throw new UnsupportedOperationException();
         }
 
@@ -248,7 +247,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
         }
 
         @Override
-        public void writeTo(StreamOutput out) throws IOException {
+        protected void writeTermTo(StreamOutput out) throws IOException {
             throw new UnsupportedOperationException();
         }
 

+ 113 - 0
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedTerms.java

@@ -0,0 +1,113 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.bucket.terms;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Common superclass for results of the terms aggregation on mapped fields.
+ */
+public abstract class InternalMappedTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>> extends InternalTerms<A, B> {
+    protected final DocValueFormat format;
+    protected final int shardSize;
+    protected final boolean showTermDocCountError;
+    protected final long otherDocCount;
+    protected final List<B> buckets;
+    protected Map<String, B> bucketMap;
+
+    protected long docCountError;
+
+    protected InternalMappedTerms(String name, Terms.Order order, int requiredSize, long minDocCount,
+            List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData, DocValueFormat format, int shardSize,
+            boolean showTermDocCountError, long otherDocCount, List<B> buckets, long docCountError) {
+        super(name, order, requiredSize, minDocCount, pipelineAggregators, metaData);
+        this.format = format;
+        this.shardSize = shardSize;
+        this.showTermDocCountError = showTermDocCountError;
+        this.otherDocCount = otherDocCount;
+        this.docCountError = docCountError;
+        this.buckets = buckets;
+    }
+
+    /**
+     * Read from a stream.
+     */
+    protected InternalMappedTerms(StreamInput in, Bucket.Reader<B> bucketReader) throws IOException {
+        super(in);
+        docCountError = in.readZLong();
+        format = in.readNamedWriteable(DocValueFormat.class);
+        shardSize = readSize(in);
+        showTermDocCountError = in.readBoolean();
+        otherDocCount = in.readVLong();
+        buckets = in.readList(stream -> bucketReader.read(stream, format, showTermDocCountError));
+    }
+
+    @Override
+    protected final void writeTermTypeInfoTo(StreamOutput out) throws IOException {
+        out.writeZLong(docCountError);
+        out.writeNamedWriteable(format);
+        writeSize(shardSize, out);
+        out.writeBoolean(showTermDocCountError);
+        out.writeVLong(otherDocCount);
+        out.writeList(buckets);
+    }
+
+    @Override
+    protected void setDocCountError(long docCountError) {
+        this.docCountError = docCountError; 
+    }
+
+    @Override
+    protected int getShardSize() {
+        return shardSize;
+    }
+
+    @Override
+    public long getDocCountError() {
+        return docCountError;
+    }
+
+    @Override
+    public long getSumOfOtherDocCounts() {
+        return otherDocCount;
+    }
+
+    @Override
+    public List<B> getBucketsInternal() {
+        return buckets;
+    }
+
+    @Override
+    public B getBucketByKey(String term) {
+        if (bucketMap == null) {
+            bucketMap = buckets.stream().collect(Collectors.toMap(Bucket::getKeyAsString, Function.identity()));
+        }
+        return bucketMap.get(term);
+    }
+}

+ 99 - 76
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java

@@ -18,7 +18,8 @@
  */
 package org.elasticsearch.search.aggregations.bucket.terms;
 
-import org.elasticsearch.common.io.stream.Streamable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationExecutionException;
@@ -29,45 +30,73 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.terms.support.BucketPriorityQueue;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-/**
- *
- */
-public abstract class InternalTerms<A extends InternalTerms, B extends InternalTerms.Bucket> extends InternalMultiBucketAggregation<A, B>
-        implements Terms, ToXContent, Streamable {
+import static java.util.Collections.unmodifiableList;
+
+public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends InternalTerms.Bucket<B>>
+        extends InternalMultiBucketAggregation<A, B> implements Terms, ToXContent {
 
     protected static final String DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME = "doc_count_error_upper_bound";
     protected static final String SUM_OF_OTHER_DOC_COUNTS = "sum_other_doc_count";
 
-    public abstract static class Bucket extends Terms.Bucket {
+    public abstract static class Bucket<B extends Bucket<B>> extends Terms.Bucket {
+        /**
+         * Reads a bucket. Should be a constructor reference.
+         */
+        @FunctionalInterface
+        public interface Reader<B extends Bucket<B>> {
+            B read(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException;
+        }
 
         long bucketOrd;
 
         protected long docCount;
         protected long docCountError;
         protected InternalAggregations aggregations;
-        protected boolean showDocCountError;
-        final transient DocValueFormat format;
-
-        protected Bucket(DocValueFormat formatter, boolean showDocCountError) {
-            // for serialization
-            this.showDocCountError = showDocCountError;
-            this.format = formatter;
-        }
+        protected final boolean showDocCountError;
+        protected final DocValueFormat format;
 
         protected Bucket(long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError,
                 DocValueFormat formatter) {
-            this(formatter, showDocCountError);
+            this.showDocCountError = showDocCountError;
+            this.format = formatter;
             this.docCount = docCount;
             this.aggregations = aggregations;
             this.docCountError = docCountError;
         }
 
+        /**
+         * Read from a stream.
+         */
+        protected Bucket(StreamInput in, DocValueFormat formatter, boolean showDocCountError) throws IOException {
+            this.showDocCountError = showDocCountError;
+            this.format = formatter;
+            docCount = in.readVLong();
+            docCountError = -1;
+            if (showDocCountError) {
+                docCountError = in.readLong();
+            }
+            aggregations = InternalAggregations.readAggregations(in);
+        }
+
+        @Override
+        public final void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(getDocCount());
+            if (showDocCountError) {
+                out.writeLong(docCountError);
+            }
+            aggregations.writeTo(out);
+            writeTermTo(out);
+        }
+
+        protected abstract void writeTermTo(StreamOutput out) throws IOException;
+
         @Override
         public long getDocCount() {
             return docCount;
@@ -86,13 +115,13 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
             return aggregations;
         }
 
-        abstract Bucket newBucket(long docCount, InternalAggregations aggs, long docCountError);
+        abstract B newBucket(long docCount, InternalAggregations aggs, long docCountError);
 
-        public Bucket reduce(List<? extends Bucket> buckets, ReduceContext context) {
+        public B reduce(List<B> buckets, ReduceContext context) {
             long docCount = 0;
             long docCountError = 0;
             List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
-            for (Bucket bucket : buckets) {
+            for (B bucket : buckets) {
                 docCount += bucket.docCount;
                 if (docCountError != -1) {
                     if (bucket.docCountError == -1) {
@@ -108,72 +137,59 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
         }
     }
 
-    protected Terms.Order order;
-    protected int requiredSize;
-    protected DocValueFormat format;
-    protected int shardSize;
-    protected long minDocCount;
-    protected List<? extends Bucket> buckets;
-    protected Map<String, Bucket> bucketMap;
-    protected long docCountError;
-    protected boolean showTermDocCountError;
-    protected long otherDocCount;
-
-    protected InternalTerms() {} // for serialization
-
-    protected InternalTerms(String name, Terms.Order order, DocValueFormat format, int requiredSize, int shardSize, long minDocCount,
-            List<? extends Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount, List<PipelineAggregator> pipelineAggregators,
-            Map<String, Object> metaData) {
+    protected final Terms.Order order;
+    protected final int requiredSize;
+    protected final long minDocCount;
+
+    protected InternalTerms(String name, Terms.Order order, int requiredSize, long minDocCount,
+            List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
         super(name, pipelineAggregators, metaData);
         this.order = order;
-        this.format = format;
         this.requiredSize = requiredSize;
-        this.shardSize = shardSize;
         this.minDocCount = minDocCount;
-        this.buckets = buckets;
-        this.showTermDocCountError = showTermDocCountError;
-        this.docCountError = docCountError;
-        this.otherDocCount = otherDocCount;
     }
 
-    @Override
-    public List<Terms.Bucket> getBuckets() {
-        Object o = buckets;
-        return (List<Terms.Bucket>) o;
+    /**
+     * Read from a stream.
+     */
+    protected InternalTerms(StreamInput in) throws IOException {
+       super(in);
+       order = InternalOrder.Streams.readOrder(in);
+       requiredSize = readSize(in);
+       minDocCount = in.readVLong();
     }
 
     @Override
-    public Terms.Bucket getBucketByKey(String term) {
-        if (bucketMap == null) {
-            bucketMap = new HashMap<>(buckets.size());
-            for (Bucket bucket : buckets) {
-                bucketMap.put(bucket.getKeyAsString(), bucket);
-            }
-        }
-        return bucketMap.get(term);
+    protected final void doWriteTo(StreamOutput out) throws IOException {
+        InternalOrder.Streams.writeOrder(order, out);
+        writeSize(requiredSize, out);
+        out.writeVLong(minDocCount);
+        writeTermTypeInfoTo(out);
     }
 
+    protected abstract void writeTermTypeInfoTo(StreamOutput out) throws IOException;
+
     @Override
-    public long getDocCountError() {
-        return docCountError;
+    public final List<Terms.Bucket> getBuckets() {
+        return unmodifiableList(getBucketsInternal());
     }
 
+    protected abstract List<B> getBucketsInternal();
+
     @Override
-    public long getSumOfOtherDocCounts() {
-        return otherDocCount;
-    }
+    public abstract B getBucketByKey(String term);
 
     @Override
     public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
-
-        Map<Object, List<InternalTerms.Bucket>> buckets = new HashMap<>();
+        Map<Object, List<B>> buckets = new HashMap<>();
         long sumDocCountError = 0;
         long otherDocCount = 0;
         InternalTerms<A, B> referenceTerms = null;
         for (InternalAggregation aggregation : aggregations) {
+            @SuppressWarnings("unchecked")
             InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
-            if (referenceTerms == null && !terms.getClass().equals(UnmappedTerms.class)) {
-                referenceTerms = (InternalTerms<A, B>) aggregation;
+            if (referenceTerms == null && !aggregation.getClass().equals(UnmappedTerms.class)) {
+                referenceTerms = terms;
             }
             if (referenceTerms != null &&
                     !referenceTerms.getClass().equals(terms.getClass()) &&
@@ -190,10 +206,10 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
             }
             otherDocCount += terms.getSumOfOtherDocCounts();
             final long thisAggDocCountError;
-            if (terms.buckets.size() < this.shardSize || InternalOrder.isTermOrder(order)) {
+            if (terms.getBucketsInternal().size() < getShardSize() || InternalOrder.isTermOrder(order)) {
                 thisAggDocCountError = 0;
             } else if (InternalOrder.isCountDesc(this.order)) {
-                thisAggDocCountError = terms.buckets.get(terms.buckets.size() - 1).docCount;
+                thisAggDocCountError = terms.getBucketsInternal().get(terms.getBucketsInternal().size() - 1).docCount;
             } else {
                 thisAggDocCountError = -1;
             }
@@ -204,10 +220,10 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
                     sumDocCountError += thisAggDocCountError;
                 }
             }
-            terms.docCountError = thisAggDocCountError;
-            for (Bucket bucket : terms.buckets) {
+            setDocCountError(thisAggDocCountError);
+            for (B bucket : terms.getBucketsInternal()) {
                 bucket.docCountError = thisAggDocCountError;
-                List<Bucket> bucketList = buckets.get(bucket.getKey());
+                List<B> bucketList = buckets.get(bucket.getKey());
                 if (bucketList == null) {
                     bucketList = new ArrayList<>();
                     buckets.put(bucket.getKey(), bucketList);
@@ -217,9 +233,9 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
         }
 
         final int size = Math.min(requiredSize, buckets.size());
-        BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(null));
-        for (List<Bucket> sameTermBuckets : buckets.values()) {
-            final Bucket b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
+        BucketPriorityQueue<B> ordered = new BucketPriorityQueue<>(size, order.comparator(null));
+        for (List<B> sameTermBuckets : buckets.values()) {
+            final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext);
             if (b.docCountError != -1) {
                 if (sumDocCountError == -1) {
                     b.docCountError = -1;
@@ -228,15 +244,15 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
                 }
             }
             if (b.docCount >= minDocCount) {
-                Terms.Bucket removed = ordered.insertWithOverflow(b);
+                B removed = ordered.insertWithOverflow(b);
                 if (removed != null) {
                     otherDocCount += removed.getDocCount();
                 }
             }
         }
-        Bucket[] list = new Bucket[ordered.size()];
+        B[] list = createBucketsArray(ordered.size());
         for (int i = ordered.size() - 1; i >= 0; i--) {
-            list[i] = (Bucket) ordered.pop();
+            list[i] = ordered.pop();
         }
         long docCountError;
         if (sumDocCountError == -1) {
@@ -244,10 +260,17 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
         } else {
             docCountError = aggregations.size() == 1 ? 0 : sumDocCountError;
         }
-        return create(name, Arrays.asList(list), docCountError, otherDocCount, this);
+        return create(name, Arrays.asList(list), docCountError, otherDocCount);
     }
 
-    protected abstract A create(String name, List<InternalTerms.Bucket> buckets, long docCountError, long otherDocCount,
-            InternalTerms prototype);
+    protected abstract void setDocCountError(long docCountError);
+
+    protected abstract int getShardSize();
+
+    protected abstract A create(String name, List<B> buckets, long docCountError, long otherDocCount);
 
+    /**
+     * Create an array to hold some buckets. Used in collecting the results.
+     */
+    protected abstract B[] createBucketsArray(int size);
 }

+ 40 - 99
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTerms.java

@@ -22,49 +22,41 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
- *
+ * Result of the {@link TermsAggregator} when the field is some kind of whole number like a integer, long, or a date.
  */
-public class LongTerms extends InternalTerms<LongTerms, LongTerms.Bucket> {
-
-    public static final Type TYPE = new Type("terms", "lterms");
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public LongTerms readResult(StreamInput in) throws IOException {
-            LongTerms buckets = new LongTerms();
-            buckets.readFrom(in);
-            return buckets;
-        }
-    };
-
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
-    }
-
-    static class Bucket extends InternalTerms.Bucket {
+public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket> {
+    public static final String NAME = "lterms";
 
+    public static class Bucket extends InternalTerms.Bucket<Bucket> {
         long term;
 
-        public Bucket(DocValueFormat format, boolean showDocCountError) {
-            super(format, showDocCountError);
-        }
-
         public Bucket(long term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError,
                 DocValueFormat format) {
             super(docCount, aggregations, showDocCountError, docCountError, format);
             this.term = term;
         }
 
+        /**
+         * Read from a stream.
+         */
+        public Bucket(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException {
+            super(in, format, showDocCountError);
+            term = in.readLong();
+        }
+
+        @Override
+        protected void writeTermTo(StreamOutput out) throws IOException {
+            out.writeLong(term);
+        }
+
         @Override
         public String getKeyAsString() {
             return format.format(term);
@@ -90,27 +82,6 @@ public class LongTerms extends InternalTerms<LongTerms, LongTerms.Bucket> {
             return new Bucket(term, docCount, aggs, showDocCountError, docCountError, format);
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            term = in.readLong();
-            docCount = in.readVLong();
-            docCountError = -1;
-            if (showDocCountError) {
-                docCountError = in.readLong();
-            }
-            aggregations = InternalAggregations.readAggregations(in);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            out.writeLong(term);
-            out.writeVLong(getDocCount());
-            if (showDocCountError) {
-                out.writeLong(docCountError);
-            }
-            aggregations.writeTo(out);
-        }
-
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
@@ -128,24 +99,29 @@ public class LongTerms extends InternalTerms<LongTerms, LongTerms.Bucket> {
         }
     }
 
-    LongTerms() {} // for serialization
+    public LongTerms(String name, Terms.Order order, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
+            Map<String, Object> metaData, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
+            List<Bucket> buckets, long docCountError) {
+        super(name, order, requiredSize, minDocCount, pipelineAggregators, metaData, format, shardSize, showTermDocCountError,
+                otherDocCount, buckets, docCountError);
+    }
 
-    public LongTerms(String name, Terms.Order order, DocValueFormat format, int requiredSize, int shardSize, long minDocCount,
-            List<? extends InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount,
-            List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
-        super(name, order, format, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount,
-                pipelineAggregators, metaData);
+    /**
+     * Read from a stream.
+     */
+    public LongTerms(StreamInput in) throws IOException {
+        super(in, Bucket::new);
     }
 
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return NAME;
     }
 
     @Override
     public LongTerms create(List<Bucket> buckets) {
-        return new LongTerms(this.name, this.order, this.format, this.requiredSize, this.shardSize, this.minDocCount, buckets,
-                this.showTermDocCountError, this.docCountError, this.otherDocCount, this.pipelineAggregators(), this.metaData);
+        return new LongTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData, format, shardSize,
+                showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
     @Override
@@ -155,48 +131,9 @@ public class LongTerms extends InternalTerms<LongTerms, LongTerms.Bucket> {
     }
 
     @Override
-    protected LongTerms create(String name, List<org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.Bucket> buckets,
-            long docCountError, long otherDocCount, InternalTerms prototype) {
-        return new LongTerms(name, prototype.order, ((LongTerms) prototype).format, prototype.requiredSize, prototype.shardSize,
-                prototype.minDocCount, buckets, prototype.showTermDocCountError, docCountError, otherDocCount, prototype.pipelineAggregators(),
-                prototype.getMetaData());
-    }
-
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        this.docCountError = in.readLong();
-        this.order = InternalOrder.Streams.readOrder(in);
-        this.format = in.readNamedWriteable(DocValueFormat.class);
-        this.requiredSize = readSize(in);
-        this.shardSize = readSize(in);
-        this.showTermDocCountError = in.readBoolean();
-        this.minDocCount = in.readVLong();
-        this.otherDocCount = in.readVLong();
-        int size = in.readVInt();
-        List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            Bucket bucket = new Bucket(format, showTermDocCountError);
-            bucket.readFrom(in);
-            buckets.add(bucket);
-        }
-        this.buckets = buckets;
-        this.bucketMap = null;
-    }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        out.writeLong(docCountError);
-        InternalOrder.Streams.writeOrder(order, out);
-        out.writeNamedWriteable(format);
-        writeSize(requiredSize, out);
-        writeSize(shardSize, out);
-        out.writeBoolean(showTermDocCountError);
-        out.writeVLong(minDocCount);
-        out.writeVLong(otherDocCount);
-        out.writeVInt(buckets.size());
-        for (InternalTerms.Bucket bucket : buckets) {
-            bucket.writeTo(out);
-        }
+    protected LongTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
+        return new LongTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), getMetaData(), format, shardSize,
+                showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
     @Override
@@ -204,11 +141,15 @@ public class LongTerms extends InternalTerms<LongTerms, LongTerms.Bucket> {
         builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
         builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
         builder.startArray(CommonFields.BUCKETS);
-        for (InternalTerms.Bucket bucket : buckets) {
+        for (Bucket bucket : buckets) {
             bucket.toXContent(builder, params);
         }
         builder.endArray();
         return builder;
     }
 
+    @Override
+    protected Bucket[] createBucketsArray(int size) {
+        return new Bucket[size];
+    }
 }

+ 12 - 12
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongTermsAggregator.java

@@ -37,10 +37,11 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.emptyList;
+
 /**
  *
  */
@@ -124,7 +125,7 @@ public class LongTermsAggregator extends TermsAggregator {
         final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
 
         long otherDocCount = 0;
-        BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
+        BucketPriorityQueue<LongTerms.Bucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
         LongTerms.Bucket spare = null;
         for (long i = 0; i < bucketOrds.size(); i++) {
             if (spare == null) {
@@ -140,7 +141,7 @@ public class LongTermsAggregator extends TermsAggregator {
         }
 
         // Get the top buckets
-        final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
+        final LongTerms.Bucket[] list = new LongTerms.Bucket[ordered.size()];
         long survivingBucketOrds[] = new long[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; --i) {
             final LongTerms.Bucket bucket = (LongTerms.Bucket) ordered.pop();
@@ -151,22 +152,21 @@ public class LongTermsAggregator extends TermsAggregator {
 
         runDeferredCollections(survivingBucketOrds);
 
-        //Now build the aggs
+        // Now build the aggs
         for (int i = 0; i < list.length; i++) {
-          list[i].aggregations = bucketAggregations(list[i].bucketOrd);
-          list[i].docCountError = 0;
+            list[i].aggregations = bucketAggregations(list[i].bucketOrd);
+            list[i].docCountError = 0;
         }
 
-        return new LongTerms(name, order, format, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(),
-                bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount, pipelineAggregators(),
-                metaData());
+        return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                pipelineAggregators(), metaData(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
+                Arrays.asList(list), 0);
     }
 
     @Override
     public InternalAggregation buildEmptyAggregation() {
-        return new LongTerms(name, order, format, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(),
-                bucketCountThresholds.getMinDocCount(), Collections.<InternalTerms.Bucket> emptyList(), showTermDocCountError, 0, 0,
-                pipelineAggregators(), metaData());
+        return new LongTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                pipelineAggregators(), metaData(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0);
     }
 
     @Override

+ 42 - 102
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTerms.java

@@ -23,50 +23,40 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
-import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 /**
- *
+ * Result of the {@link TermsAggregator} when the field is a String.
  */
-public class StringTerms extends InternalTerms<StringTerms, StringTerms.Bucket> {
-
-    public static final InternalAggregation.Type TYPE = new Type("terms", "sterms");
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public StringTerms readResult(StreamInput in) throws IOException {
-            StringTerms buckets = new StringTerms();
-            buckets.readFrom(in);
-            return buckets;
-        }
-    };
-
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
-    }
-
-    public static class Bucket extends InternalTerms.Bucket {
-
+public class StringTerms extends InternalMappedTerms<StringTerms, StringTerms.Bucket> {
+    public static final String NAME = "sterms";
+    public static class Bucket extends InternalTerms.Bucket<Bucket> {
         BytesRef termBytes;
 
-        public Bucket(DocValueFormat format, boolean showDocCountError) {
-            super(format, showDocCountError);
-        }
-
         public Bucket(BytesRef term, long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError,
                 DocValueFormat format) {
             super(docCount, aggregations, showDocCountError, docCountError, format);
             this.termBytes = term;
         }
 
+        /**
+         * Read from a stream.
+         */
+        public Bucket(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException {
+            super(in, format, showDocCountError);
+            termBytes = in.readBytesRef();
+        }
+
+        @Override
+        protected void writeTermTo(StreamOutput out) throws IOException {
+            out.writeBytesRef(termBytes);
+        }
+
         @Override
         public Object getKey() {
             return getKeyAsString();
@@ -93,27 +83,6 @@ public class StringTerms extends InternalTerms<StringTerms, StringTerms.Bucket>
             return new Bucket(termBytes, docCount, aggs, showDocCountError, docCountError, format);
         }
 
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            termBytes = in.readBytesRef();
-            docCount = in.readVLong();
-            docCountError = -1;
-            if (showDocCountError) {
-                docCountError = in.readLong();
-            }
-            aggregations = InternalAggregations.readAggregations(in);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            out.writeBytesRef(termBytes);
-            out.writeVLong(getDocCount());
-            if (showDocCountError) {
-                out.writeLong(docCountError);
-            }
-            aggregations.writeTo(out);
-        }
-
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
@@ -128,74 +97,41 @@ public class StringTerms extends InternalTerms<StringTerms, StringTerms.Bucket>
         }
     }
 
-    StringTerms() {
-    } // for serialization
+    public StringTerms(String name, Terms.Order order, int requiredSize, long minDocCount, List<PipelineAggregator> pipelineAggregators,
+            Map<String, Object> metaData, DocValueFormat format, int shardSize, boolean showTermDocCountError, long otherDocCount,
+            List<Bucket> buckets, long docCountError) {
+        super(name, order, requiredSize, minDocCount, pipelineAggregators, metaData, format,
+                shardSize, showTermDocCountError, otherDocCount, buckets, docCountError);
+    }
 
-    public StringTerms(String name, Terms.Order order, DocValueFormat format, int requiredSize, int shardSize, long minDocCount,
-            List<? extends InternalTerms.Bucket> buckets, boolean showTermDocCountError, long docCountError, long otherDocCount,
-            List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
-        super(name, order, format, requiredSize, shardSize, minDocCount, buckets, showTermDocCountError, docCountError, otherDocCount, pipelineAggregators,
-                metaData);
+    /**
+     * Read from a stream.
+     */
+    public StringTerms(StreamInput in) throws IOException {
+        super(in, Bucket::new);
     }
 
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return NAME;
     }
 
     @Override
     public StringTerms create(List<Bucket> buckets) {
-        return new StringTerms(this.name, this.order, this.format, this.requiredSize, this.shardSize, this.minDocCount, buckets,
-                this.showTermDocCountError, this.docCountError, this.otherDocCount, this.pipelineAggregators(), this.metaData);
+        return new StringTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData, format, shardSize,
+                showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
     @Override
     public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
-        return new Bucket(prototype.termBytes, prototype.docCount, aggregations, prototype.showDocCountError, prototype.docCountError, prototype.format);
+        return new Bucket(prototype.termBytes, prototype.docCount, aggregations, prototype.showDocCountError, prototype.docCountError,
+                prototype.format);
     }
 
     @Override
-    protected StringTerms create(String name, List<org.elasticsearch.search.aggregations.bucket.terms.InternalTerms.Bucket> buckets,
-            long docCountError, long otherDocCount, InternalTerms prototype) {
-        return new StringTerms(name, prototype.order, prototype.format, prototype.requiredSize, prototype.shardSize, prototype.minDocCount, buckets,
-                prototype.showTermDocCountError, docCountError, otherDocCount, prototype.pipelineAggregators(), prototype.getMetaData());
-    }
-
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        this.docCountError = in.readLong();
-        this.order = InternalOrder.Streams.readOrder(in);
-        this.format = in.readNamedWriteable(DocValueFormat.class);
-        this.requiredSize = readSize(in);
-        this.shardSize = readSize(in);
-        this.showTermDocCountError = in.readBoolean();
-        this.minDocCount = in.readVLong();
-        this.otherDocCount = in.readVLong();
-        int size = in.readVInt();
-        List<InternalTerms.Bucket> buckets = new ArrayList<>(size);
-        for (int i = 0; i < size; i++) {
-            Bucket bucket = new Bucket(format, showTermDocCountError);
-            bucket.readFrom(in);
-            buckets.add(bucket);
-        }
-        this.buckets = buckets;
-        this.bucketMap = null;
-    }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        out.writeLong(docCountError);
-        InternalOrder.Streams.writeOrder(order, out);
-        out.writeNamedWriteable(format);
-        writeSize(requiredSize, out);
-        writeSize(shardSize, out);
-        out.writeBoolean(showTermDocCountError);
-        out.writeVLong(minDocCount);
-        out.writeVLong(otherDocCount);
-        out.writeVInt(buckets.size());
-        for (InternalTerms.Bucket bucket : buckets) {
-            bucket.writeTo(out);
-        }
+    protected StringTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
+        return new StringTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), getMetaData(), format, shardSize,
+                showTermDocCountError, otherDocCount, buckets, docCountError);
     }
 
     @Override
@@ -203,11 +139,15 @@ public class StringTerms extends InternalTerms<StringTerms, StringTerms.Bucket>
         builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
         builder.field(SUM_OF_OTHER_DOC_COUNTS, otherDocCount);
         builder.startArray(CommonFields.BUCKETS);
-        for (InternalTerms.Bucket bucket : buckets) {
+        for (Bucket bucket : buckets) {
             bucket.toXContent(builder, params);
         }
         builder.endArray();
         return builder;
     }
 
+    @Override
+    protected Bucket[] createBucketsArray(int size) {
+        return new Bucket[size];
+    }
 }

+ 6 - 6
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java

@@ -129,7 +129,7 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
         final int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
 
         long otherDocCount = 0;
-        BucketPriorityQueue ordered = new BucketPriorityQueue(size, order.comparator(this));
+        BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(size, order.comparator(this));
         StringTerms.Bucket spare = null;
         for (int i = 0; i < bucketOrds.size(); i++) {
             if (spare == null) {
@@ -140,12 +140,12 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
             otherDocCount += spare.docCount;
             spare.bucketOrd = i;
             if (bucketCountThresholds.getShardMinDocCount() <= spare.docCount) {
-                spare = (StringTerms.Bucket) ordered.insertWithOverflow(spare);
+                spare = ordered.insertWithOverflow(spare);
             }
         }
 
         // Get the top buckets
-        final InternalTerms.Bucket[] list = new InternalTerms.Bucket[ordered.size()];
+        final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()];
         long survivingBucketOrds[] = new long[ordered.size()];
         for (int i = ordered.size() - 1; i >= 0; --i) {
             final StringTerms.Bucket bucket = (StringTerms.Bucket) ordered.pop();
@@ -164,9 +164,9 @@ public class StringTermsAggregator extends AbstractStringTermsAggregator {
           bucket.docCountError = 0;
         }
 
-        return new StringTerms(name, order, format, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getShardSize(),
-                bucketCountThresholds.getMinDocCount(), Arrays.asList(list), showTermDocCountError, 0, otherDocCount, pipelineAggregators(),
-                metaData());
+        return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(),
+                pipelineAggregators(), metaData(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount,
+                Arrays.asList(list), 0);
     }
 
     @Override

+ 6 - 3
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregationBuilder.java

@@ -23,8 +23,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
 import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
+import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
 import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude;
 import org.elasticsearch.search.aggregations.support.AggregationContext;
@@ -39,7 +41,8 @@ import java.util.List;
 import java.util.Objects;
 
 public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<ValuesSource, TermsAggregationBuilder> {
-    public static final String NAME = StringTerms.TYPE.name();
+    public static final String NAME = "terms";
+    private static final InternalAggregation.Type TYPE = new Type("terms");
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     public static final ParseField EXECUTION_HINT_FIELD_NAME = new ParseField("execution_hint");
@@ -62,14 +65,14 @@ public class TermsAggregationBuilder extends ValuesSourceAggregationBuilder<Valu
     private boolean showTermDocCountError = false;
 
     public TermsAggregationBuilder(String name, ValueType valueType) {
-        super(name, StringTerms.TYPE, ValuesSourceType.ANY, valueType);
+        super(name, TYPE, ValuesSourceType.ANY, valueType);
     }
 
     /**
      * Read from a stream.
      */
     public TermsAggregationBuilder(StreamInput in) throws IOException {
-        super(in, StringTerms.TYPE, ValuesSourceType.ANY);
+        super(in, TYPE, ValuesSourceType.ANY);
         bucketCountThresholds = new BucketCountThresholds(in);
         collectMode = in.readOptionalWriteable(SubAggCollectionMode::readFromStream);
         executionHint = in.readOptionalString();

+ 2 - 2
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

@@ -69,8 +69,8 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory<Values
     @Override
     protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
             throws IOException {
-        final InternalAggregation aggregation = new UnmappedTerms(name, order, config.format(), bucketCountThresholds.getRequiredSize(),
-                bucketCountThresholds.getShardSize(), bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData);
+        final InternalAggregation aggregation = new UnmappedTerms(name, order, bucketCountThresholds.getRequiredSize(),
+                bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData);
         return new NonCollectingAggregator(name, context, parent, factories, pipelineAggregators, metaData) {
             {
                 // even in the case of an unmapped aggregator, validate the

+ 64 - 45
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/UnmappedTerms.java

@@ -22,81 +22,67 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static java.util.Collections.emptyList;
+
 /**
- *
+ * Result of the {@link TermsAggregator} when the field is unmapped.
  */
-public class UnmappedTerms extends InternalTerms<UnmappedTerms, InternalTerms.Bucket> {
-
-    public static final Type TYPE = new Type("terms", "umterms");
-
-    private static final List<Bucket> BUCKETS = Collections.emptyList();
-    private static final Map<String, Bucket> BUCKETS_MAP = Collections.emptyMap();
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public UnmappedTerms readResult(StreamInput in) throws IOException {
-            UnmappedTerms buckets = new UnmappedTerms();
-            buckets.readFrom(in);
-            return buckets;
+public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bucket> {
+    public static final String NAME = "umterms";
+
+    /**
+     * Concrete type that can't be built because Java needs a concrent type so {@link InternalTerms.Bucket} can have a self type but
+     * {@linkplain UnmappedTerms} doesn't ever need to build it because it never returns any buckets.
+     */
+    protected abstract static class Bucket extends InternalTerms.Bucket<Bucket> {
+        private Bucket(long docCount, InternalAggregations aggregations, boolean showDocCountError, long docCountError,
+                DocValueFormat formatter) {
+            super(docCount, aggregations, showDocCountError, docCountError, formatter);
         }
-    };
-
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
     }
 
-    UnmappedTerms() {} // for serialization
-
-    public UnmappedTerms(String name, Terms.Order order, DocValueFormat format, int requiredSize, int shardSize, long minDocCount,
+    public UnmappedTerms(String name, Terms.Order order, int requiredSize, long minDocCount,
             List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
-        super(name, order, format, requiredSize, shardSize, minDocCount, BUCKETS, false, 0, 0, pipelineAggregators, metaData);
+        super(name, order, requiredSize, minDocCount, pipelineAggregators, metaData);
     }
 
-    @Override
-    public Type type() {
-        return TYPE;
+    /**
+     * Read from a stream.
+     */
+    public UnmappedTerms(StreamInput in) throws IOException {
+        super(in);
     }
 
     @Override
-    public UnmappedTerms create(List<InternalTerms.Bucket> buckets) {
-        return new UnmappedTerms(this.name, this.order, this.format, this.requiredSize, this.shardSize, this.minDocCount, this.pipelineAggregators(), this.metaData);
+    protected void writeTermTypeInfoTo(StreamOutput out) throws IOException {
     }
 
     @Override
-    public InternalTerms.Bucket createBucket(InternalAggregations aggregations, InternalTerms.Bucket prototype) {
-        throw new UnsupportedOperationException("not supported for UnmappedTerms");
+    public String getWriteableName() {
+        return NAME;
     }
 
     @Override
-    protected UnmappedTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount, InternalTerms prototype) {
-        throw new UnsupportedOperationException("not supported for UnmappedTerms");
+    public UnmappedTerms create(List<Bucket> buckets) {
+        return new UnmappedTerms(name, order, requiredSize, minDocCount, pipelineAggregators(), metaData);
     }
 
     @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        this.docCountError = 0;
-        this.order = InternalOrder.Streams.readOrder(in);
-        this.requiredSize = readSize(in);
-        this.minDocCount = in.readVLong();
-        this.buckets = BUCKETS;
-        this.bucketMap = BUCKETS_MAP;
+    public Bucket createBucket(InternalAggregations aggregations, Bucket prototype) {
+        throw new UnsupportedOperationException("not supported for UnmappedTerms");
     }
 
     @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        InternalOrder.Streams.writeOrder(order, out);
-        writeSize(requiredSize, out);
-        out.writeVLong(minDocCount);
+    protected UnmappedTerms create(String name, List<Bucket> buckets, long docCountError, long otherDocCount) {
+        throw new UnsupportedOperationException("not supported for UnmappedTerms");
     }
 
     @Override
@@ -111,10 +97,43 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, InternalTerms.Bu
 
     @Override
     public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
-        builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, docCountError);
+        builder.field(InternalTerms.DOC_COUNT_ERROR_UPPER_BOUND_FIELD_NAME, 0);
         builder.field(SUM_OF_OTHER_DOC_COUNTS, 0);
         builder.startArray(CommonFields.BUCKETS).endArray();
         return builder;
     }
 
+    @Override
+    protected void setDocCountError(long docCountError) {
+    }
+
+    @Override
+    protected int getShardSize() {
+        return 0;
+    }
+
+    @Override
+    public long getDocCountError() {
+        return 0;
+    }
+
+    @Override
+    public long getSumOfOtherDocCounts() {
+        return 0;
+    }
+
+    @Override
+    protected List<Bucket> getBucketsInternal() {
+        return emptyList();
+    }
+
+    @Override
+    public Bucket getBucketByKey(String term) {
+        return null;
+    }
+
+    @Override
+    protected Bucket[] createBucketsArray(int size) {
+        return new Bucket[size];
+    }
 }

+ 4 - 4
core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/support/BucketPriorityQueue.java

@@ -23,17 +23,17 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 
 import java.util.Comparator;
 
-public class BucketPriorityQueue extends PriorityQueue<Terms.Bucket> {
+public class BucketPriorityQueue<B extends Terms.Bucket> extends PriorityQueue<B> {
 
-    private final Comparator<Terms.Bucket> comparator;
+    private final Comparator<? super B> comparator;
 
-    public BucketPriorityQueue(int size, Comparator<Terms.Bucket> comparator) {
+    public BucketPriorityQueue(int size, Comparator<? super B> comparator) {
         super(size);
         this.comparator = comparator;
     }
 
     @Override
-    protected boolean lessThan(Terms.Bucket a, Terms.Bucket b) {
+    protected boolean lessThan(B a, B b) {
         return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list
     }
 }

+ 27 - 0
modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/DoubleTermsTests.java

@@ -48,6 +48,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -624,6 +625,32 @@ public class DoubleTermsTests extends AbstractTermsTestCase {
         }
     }
 
+    public void testPartiallyUnmappedWithFormat() throws Exception {
+        SearchResponse response = client().prepareSearch("idx_unmapped", "idx").setTypes("type")
+                .addAggregation(terms("terms")
+                        .field(SINGLE_VALUED_FIELD_NAME)
+                        .collectMode(randomFrom(SubAggCollectionMode.values()))
+                        .format("0000.00"))
+                .execute().actionGet();
+
+        assertSearchResponse(response);
+
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        assertThat(terms.getBuckets().size(), equalTo(5));
+
+        for (int i = 0; i < 5; i++) {
+            String key = String.format(Locale.ROOT, "%07.2f", (double) i);
+            Terms.Bucket bucket = terms.getBucketByKey(key);
+            assertThat(bucket, notNullValue());
+            assertThat(key(bucket), equalTo(key));
+            assertThat(bucket.getKeyAsNumber().intValue(), equalTo(i));
+            assertThat(bucket.getDocCount(), equalTo(1L));
+        }
+    }
+
     public void testEmptyAggregation() throws Exception {
         SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx")
                 .setQuery(matchAllQuery())

+ 27 - 0
modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/LongTermsTests.java

@@ -47,6 +47,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -629,6 +630,32 @@ public class LongTermsTests extends AbstractTermsTestCase {
         }
     }
 
+    public void testPartiallyUnmappedWithFormat() throws Exception {
+        SearchResponse response = client().prepareSearch("idx_unmapped", "idx").setTypes("type")
+                .addAggregation(terms("terms")
+                        .field(SINGLE_VALUED_FIELD_NAME)
+                        .collectMode(randomFrom(SubAggCollectionMode.values()))
+                        .format("0000"))
+                .execute().actionGet();
+
+        assertSearchResponse(response);
+
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        assertThat(terms.getBuckets().size(), equalTo(5));
+
+        for (int i = 0; i < 5; i++) {
+            String key = String.format(Locale.ROOT, "%04d", i);
+            Terms.Bucket bucket = terms.getBucketByKey(key);
+            assertThat(bucket, notNullValue());
+            assertThat(key(bucket), equalTo(key));
+            assertThat(bucket.getKeyAsNumber().intValue(), equalTo(i));
+            assertThat(bucket.getDocCount(), equalTo(1L));
+        }
+    }
+
     public void testEmptyAggregation() throws Exception {
         SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx")
                 .setQuery(matchAllQuery())