Преглед на файлове

Migrate remaining aggregations to NamedWriteable

After this we'll be able to remove AggregationStreams and
PipelineAggregatorStreams.
Nik Everett преди 9 години
родител
ревизия
9e2221cae5
променени са 24 файла, в които са добавени 256 реда и са изтрити 459 реда
  1. 67 64
      core/src/main/java/org/elasticsearch/search/SearchModule.java
  2. 7 32
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java
  3. 1 1
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java
  4. 2 16
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java
  5. 20 37
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/InternalBucketMetricValue.java
  6. 2 2
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregationBuilder.java
  7. 9 23
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java
  8. 2 2
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregationBuilder.java
  9. 9 23
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregator.java
  10. 2 2
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregationBuilder.java
  11. 10 25
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregator.java
  12. 22 39
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java
  13. 2 2
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregationBuilder.java
  14. 16 35
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java
  15. 2 2
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregationBuilder.java
  16. 9 23
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregator.java
  17. 3 4
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregationBuilder.java
  18. 26 44
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregator.java
  19. 3 4
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregationBuilder.java
  20. 20 39
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregator.java
  21. 3 3
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregationBuilder.java
  22. 16 31
      core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java
  23. 2 4
      core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java
  24. 1 2
      core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/PercentilesBucketTests.java

+ 67 - 64
core/src/main/java/org/elasticsearch/search/SearchModule.java

@@ -211,6 +211,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucke
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.InternalPercentilesBucket;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.InternalStatsBucket;
@@ -437,18 +438,16 @@ public class SearchModule extends AbstractModule {
             pipelineAggregationParserRegistry.register(spec.parser, spec.name);
         }
         namedWriteableRegistry.register(PipelineAggregationBuilder.class, spec.name.getPreferredName(), spec.builderReader);
-        for (Map.Entry<String, Writeable.Reader<? extends PipelineAggregator>> resultReader : spec.resultReaders.entrySet()) {
-            namedWriteableRegistry.register(PipelineAggregator.class, resultReader.getKey(), resultReader.getValue());
-        }
-        for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders : spec.bucketReaders.entrySet()) {
-            namedWriteableRegistry.register(InternalAggregation.class, bucketReaders.getKey(), bucketReaders.getValue());
+        namedWriteableRegistry.register(PipelineAggregator.class, spec.name.getPreferredName(), spec.aggregatorReader);
+        for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.resultReaders.entrySet()) {
+            namedWriteableRegistry.register(InternalAggregation.class, resultReader.getKey(), resultReader.getValue());
         }
     }
 
     public static class PipelineAggregationSpec {
-        private final Map<String, Writeable.Reader<? extends PipelineAggregator>> resultReaders = new TreeMap<>();
-        private final Map<String, Writeable.Reader<? extends InternalAggregation>> bucketReaders = new TreeMap<>();
+        private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
         private final Writeable.Reader<? extends PipelineAggregationBuilder> builderReader;
+        private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;
         private final PipelineAggregator.Parser parser;
         private final ParseField name;
 
@@ -456,13 +455,16 @@ public class SearchModule extends AbstractModule {
          * Register a pipeline aggregation.
          *
          * @param builderReader reads the {@link PipelineAggregationBuilder} from a stream
+         * @param aggregatorReader reads the {@link PipelineAggregator} from a stream
          * @param parser reads the aggregation builder from XContent
          * @param name names by which the aggregation may be parsed. The first name is special because it is the name that the reader is
          *        registered under.
          */
         public PipelineAggregationSpec(Reader<? extends PipelineAggregationBuilder> builderReader,
+                Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
                 PipelineAggregator.Parser parser, ParseField name) {
             this.builderReader = builderReader;
+            this.aggregatorReader = aggregatorReader;
             this.parser = parser;
             this.name = name;
         }
@@ -471,33 +473,17 @@ public class SearchModule extends AbstractModule {
          * Add a reader for the shard level results of the aggregation with {@linkplain #name}'s {@link ParseField#getPreferredName()} as
          * the {@link NamedWriteable#getWriteableName()}.
          */
-        public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends PipelineAggregator> resultReader) {
+        public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
             return addResultReader(name.getPreferredName(), resultReader);
         }
 
         /**
          * Add a reader for the shard level results of the aggregation.
          */
-        public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends PipelineAggregator> resultReader) {
+        public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
             resultReaders.put(writeableName, resultReader);
             return this;
         }
-
-        /**
-         * Add a reader for the shard level bucket results of the aggregation with {@linkplain name}'s {@link ParseField#getPreferredName()}
-         * as the {@link NamedWriteable#getWriteableName()}.
-         */
-        public PipelineAggregationSpec addBucketReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
-            return addBucketReader(name.getPreferredName(), resultReader);
-        }
-
-        /**
-         * Add a reader for the shard level results of the aggregation.
-         */
-        public PipelineAggregationSpec addBucketReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
-            bucketReaders.put(writeableName, resultReader);
-            return this;
-        }
     }
 
     public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
@@ -606,49 +592,80 @@ public class SearchModule extends AbstractModule {
 
         registerPipelineAggregation(new PipelineAggregationSpec(
                 DerivativePipelineAggregationBuilder::new,
+                DerivativePipelineAggregator::new,
                 DerivativePipelineAggregationBuilder::parse,
                 DerivativePipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
-                    .addResultReader(DerivativePipelineAggregator::new)
-                    .addBucketReader(InternalDerivative::new));
-        registerPipelineAggregation(MaxBucketPipelineAggregationBuilder::new, MaxBucketPipelineAggregationBuilder.PARSER,
-                MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
-        registerPipelineAggregation(MinBucketPipelineAggregationBuilder::new, MinBucketPipelineAggregationBuilder.PARSER,
-                MinBucketPipelineAggregationBuilder.AGGREGATION_FIELD_NAME);
-        registerPipelineAggregation(AvgBucketPipelineAggregationBuilder::new, AvgBucketPipelineAggregationBuilder.PARSER,
-                AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
-        registerPipelineAggregation(SumBucketPipelineAggregationBuilder::new, SumBucketPipelineAggregationBuilder.PARSER,
-                SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+                    .addResultReader(InternalDerivative::new));
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                MaxBucketPipelineAggregationBuilder::new,
+                MaxBucketPipelineAggregator::new,
+                MaxBucketPipelineAggregationBuilder.PARSER,
+                MaxBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+                    // This bucket is used by many pipeline aggreations.
+                    .addResultReader(InternalBucketMetricValue.NAME, InternalBucketMetricValue::new));
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                MinBucketPipelineAggregationBuilder::new,
+                MinBucketPipelineAggregator::new,
+                MinBucketPipelineAggregationBuilder.PARSER,
+                MinBucketPipelineAggregationBuilder.AGGREGATION_FIELD_NAME)
+                    /* Uses InternalBucketMetricValue */);
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                AvgBucketPipelineAggregationBuilder::new,
+                AvgBucketPipelineAggregator::new,
+                AvgBucketPipelineAggregationBuilder.PARSER,
+                AvgBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+                    // This bucket is used by many pipeline aggreations.
+                    .addResultReader(InternalSimpleValue.NAME, InternalSimpleValue::new));
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                SumBucketPipelineAggregationBuilder::new,
+                SumBucketPipelineAggregator::new,
+                SumBucketPipelineAggregationBuilder.PARSER,
+                SumBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+                    /* Uses InternalSimpleValue */);
         registerPipelineAggregation(new PipelineAggregationSpec(
                 StatsBucketPipelineAggregationBuilder::new,
+                StatsBucketPipelineAggregator::new,
                 StatsBucketPipelineAggregationBuilder.PARSER,
                 StatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
-                    .addResultReader(StatsBucketPipelineAggregator::new)
-                    .addBucketReader(InternalStatsBucket::new));
+                    .addResultReader(InternalStatsBucket::new));
         registerPipelineAggregation(new PipelineAggregationSpec(
                 ExtendedStatsBucketPipelineAggregationBuilder::new,
+                ExtendedStatsBucketPipelineAggregator::new,
                 new ExtendedStatsBucketParser(),
                 ExtendedStatsBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
-                    .addResultReader(ExtendedStatsBucketPipelineAggregator::new)
-                    .addBucketReader(InternalExtendedStatsBucket::new));
-        registerPipelineAggregation(PercentilesBucketPipelineAggregationBuilder::new, PercentilesBucketPipelineAggregationBuilder.PARSER,
-                PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+                    .addResultReader(InternalExtendedStatsBucket::new));
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                PercentilesBucketPipelineAggregationBuilder::new,
+                PercentilesBucketPipelineAggregator::new,
+                PercentilesBucketPipelineAggregationBuilder.PARSER,
+                PercentilesBucketPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
+                    .addResultReader(InternalPercentilesBucket::new));
         registerPipelineAggregation(new PipelineAggregationSpec(
                 MovAvgPipelineAggregationBuilder::new,
+                MovAvgPipelineAggregator::new,
                 (n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c),
                 MovAvgPipelineAggregationBuilder.AGGREGATION_FIELD_NAME)
-                    .addResultReader(MovAvgPipelineAggregator::new)
                     /* Uses InternalHistogram for buckets */);
-        registerPipelineAggregation(CumulativeSumPipelineAggregationBuilder::new, CumulativeSumPipelineAggregationBuilder::parse,
-                CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
-        registerPipelineAggregation(BucketScriptPipelineAggregationBuilder::new, BucketScriptPipelineAggregationBuilder::parse,
-                BucketScriptPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
-        registerPipelineAggregation(BucketSelectorPipelineAggregationBuilder::new, BucketSelectorPipelineAggregationBuilder::parse,
-                BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                CumulativeSumPipelineAggregationBuilder::new,
+                CumulativeSumPipelineAggregator::new,
+                CumulativeSumPipelineAggregationBuilder::parse,
+                CumulativeSumPipelineAggregationBuilder.AGGREGATION_NAME_FIELD));
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                BucketScriptPipelineAggregationBuilder::new,
+                BucketScriptPipelineAggregator::new,
+                BucketScriptPipelineAggregationBuilder::parse,
+                BucketScriptPipelineAggregationBuilder.AGGREGATION_NAME_FIELD));
+        registerPipelineAggregation(new PipelineAggregationSpec(
+                BucketSelectorPipelineAggregationBuilder::new,
+                BucketSelectorPipelineAggregator::new,
+                BucketSelectorPipelineAggregationBuilder::parse,
+                BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD));
         registerPipelineAggregation(new PipelineAggregationSpec(
                 SerialDiffPipelineAggregationBuilder::new,
+                SerialDiffPipelineAggregator::new,
                 SerialDiffPipelineAggregationBuilder::parse,
-                SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD)
-                    .addResultReader(SerialDiffPipelineAggregator::new));
+                SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD));
     }
 
     protected void configureSearch() {
@@ -879,18 +896,4 @@ public class SearchModule extends AbstractModule {
             registerQuery(GeoShapeQueryBuilder::new, GeoShapeQueryBuilder::fromXContent, GeoShapeQueryBuilder.QUERY_NAME_FIELD);
         }
     }
-
-    static {
-        // Pipeline Aggregations
-        InternalSimpleValue.registerStreams();
-        InternalBucketMetricValue.registerStreams();
-        MaxBucketPipelineAggregator.registerStreams();
-        MinBucketPipelineAggregator.registerStreams();
-        AvgBucketPipelineAggregator.registerStreams();
-        SumBucketPipelineAggregator.registerStreams();
-        PercentilesBucketPipelineAggregator.registerStreams();
-        CumulativeSumPipelineAggregator.registerStreams();
-        BucketScriptPipelineAggregator.registerStreams();
-        BucketSelectorPipelineAggregator.registerStreams();
-    }
 }

+ 7 - 32
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java

@@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
@@ -33,26 +32,8 @@ import java.util.List;
 import java.util.Map;
 
 public class InternalSimpleValue extends InternalNumericMetricsAggregation.SingleValue implements SimpleValue {
-
-    public static final Type TYPE = new Type("simple_value");
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public InternalSimpleValue readResult(StreamInput in) throws IOException {
-            InternalSimpleValue result = new InternalSimpleValue();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
-    }
-
-    private double value;
-
-    protected InternalSimpleValue() { // NORELEASE remove and make value final if possible
-    } // for serialization
+    public static final String NAME = "simple_value";
+    private final double value;
 
     public InternalSimpleValue(String name, double value, DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
             Map<String, Object> metaData) {
@@ -70,18 +51,17 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
         value = in.readDouble();
     }
 
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        format = in.readNamedWriteable(DocValueFormat.class);
-        value = in.readDouble();
-    }
-
     @Override
     protected void doWriteTo(StreamOutput out) throws IOException {
         out.writeNamedWriteable(format);
         out.writeDouble(value);
     }
 
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+
     @Override
     public double value() {
         return value;
@@ -91,11 +71,6 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
         return value;
     }
 
-    @Override
-    public Type type() {
-        return TYPE;
-    }
-
     @Override
     public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
         throw new UnsupportedOperationException("Not supported");

+ 1 - 1
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java

@@ -37,7 +37,7 @@ import java.util.stream.StreamSupport;
 
 public abstract class SiblingPipelineAggregator extends PipelineAggregator {
 
-    protected SiblingPipelineAggregator() { // for Serialisation
+    protected SiblingPipelineAggregator() { // NOCOMMIT remove me
         super();
     }
 

+ 2 - 16
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsPipelineAggregator.java

@@ -45,12 +45,8 @@ import java.util.Map;
  */
 public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAggregator {
 
-    protected DocValueFormat format;
-    protected GapPolicy gapPolicy;
-
-    public BucketMetricsPipelineAggregator() {
-        super();
-    }
+    protected final DocValueFormat format;
+    protected final GapPolicy gapPolicy;
 
     protected BucketMetricsPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat format,
             Map<String, Object> metaData) {
@@ -68,16 +64,6 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
         gapPolicy = GapPolicy.readFrom(in);
     }
 
-    @Override
-    public final void doReadFrom(StreamInput in) throws IOException {
-        format = in.readNamedWriteable(DocValueFormat.class);
-        gapPolicy = GapPolicy.readFrom(in);
-        innerReadFrom(in);
-    }
-
-    protected void innerReadFrom(StreamInput in) throws IOException {
-    }
-
     @Override
     public final void doWriteTo(StreamOutput out) throws IOException {
         out.writeNamedWriteable(format);

+ 20 - 37
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/InternalBucketMetricValue.java

@@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@@ -33,30 +32,11 @@ import java.util.List;
 import java.util.Map;
 
 public class InternalBucketMetricValue extends InternalNumericMetricsAggregation.SingleValue {
-
-    public static final Type TYPE = new Type("bucket_metric_value");
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public InternalBucketMetricValue readResult(StreamInput in) throws IOException {
-            InternalBucketMetricValue result = new InternalBucketMetricValue();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
-    }
+    public static final String NAME = "bucket_metric_value";
 
     private double value;
-
     private String[] keys;
 
-    protected InternalBucketMetricValue() {
-        super();
-    }
-
     public InternalBucketMetricValue(String name, String[] keys, double value, DocValueFormat formatter,
             List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
         super(name, pipelineAggregators, metaData);
@@ -65,9 +45,26 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation
         this.format = formatter;
     }
 
+    /**
+     * Read from a stream.
+     */
+    public InternalBucketMetricValue(StreamInput in) throws IOException {
+        super(in);
+        format = in.readNamedWriteable(DocValueFormat.class);
+        value = in.readDouble();
+        keys = in.readStringArray();
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(format);
+        out.writeDouble(value);
+        out.writeStringArray(keys);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
     }
 
     @Override
@@ -97,20 +94,6 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation
         }
     }
 
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        format = in.readNamedWriteable(DocValueFormat.class);
-        value = in.readDouble();
-        keys = in.readStringArray();
-    }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        out.writeNamedWriteable(format);
-        out.writeDouble(value);
-        out.writeStringArray(keys);
-    }
-
     @Override
     public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
         boolean hasValue = !Double.isInfinite(value);

+ 2 - 2
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregationBuilder.java

@@ -34,11 +34,11 @@ import java.util.List;
 import java.util.Map;
 
 public class AvgBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<AvgBucketPipelineAggregationBuilder> {
-    public static final String NAME = AvgBucketPipelineAggregator.TYPE.name();
+    public static final String NAME = "avg_bucket";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     public AvgBucketPipelineAggregationBuilder(String name, String bucketsPath) {
-        super(name, AvgBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+        super(name, NAME, new String[] { bucketsPath });
     }
 
     /**

+ 9 - 23
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/avg/AvgBucketPipelineAggregator.java

@@ -22,11 +22,9 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
 
 import java.io.IOException;
@@ -34,36 +32,24 @@ import java.util.List;
 import java.util.Map;
 
 public class AvgBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
-    public static final Type TYPE = new Type("avg_bucket");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
-        @Override
-        public AvgBucketPipelineAggregator readResult(StreamInput in) throws IOException {
-            AvgBucketPipelineAggregator result = new AvgBucketPipelineAggregator();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
     private int count = 0;
     private double sum = 0;
 
-    private AvgBucketPipelineAggregator() {
-    }
-
     protected AvgBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat format,
             Map<String, Object> metaData) {
         super(name, bucketsPaths, gapPolicy, format, metaData);
     }
 
+    /**
+     * Read from a stream.
+     */
+    public AvgBucketPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return AvgBucketPipelineAggregationBuilder.NAME;
     }
 
     @Override

+ 2 - 2
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregationBuilder.java

@@ -34,11 +34,11 @@ import java.util.List;
 import java.util.Map;
 
 public class MaxBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<MaxBucketPipelineAggregationBuilder> {
-    public static final String NAME = MaxBucketPipelineAggregator.TYPE.name();
+    public static final String NAME = "max_bucket";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     public MaxBucketPipelineAggregationBuilder(String name, String bucketsPath) {
-        super(name, MaxBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+        super(name, NAME, new String[] { bucketsPath });
     }
 
     /**

+ 9 - 23
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/max/MaxBucketPipelineAggregator.java

@@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
 
@@ -36,36 +34,24 @@ import java.util.List;
 import java.util.Map;
 
 public class MaxBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
-    public static final Type TYPE = new Type("max_bucket");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
-        @Override
-        public MaxBucketPipelineAggregator readResult(StreamInput in) throws IOException {
-            MaxBucketPipelineAggregator result = new MaxBucketPipelineAggregator();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
     private List<String> maxBucketKeys;
     private double maxValue;
 
-    private MaxBucketPipelineAggregator() {
-    }
-
     protected MaxBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter,
             Map<String, Object> metaData) {
         super(name, bucketsPaths, gapPolicy, formatter, metaData);
     }
 
+    /**
+     * Read from a stream.
+     */
+    public MaxBucketPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return MaxBucketPipelineAggregationBuilder.NAME;
     }
 
     @Override

+ 2 - 2
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregationBuilder.java

@@ -34,11 +34,11 @@ import java.util.List;
 import java.util.Map;
 
 public class MinBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<MinBucketPipelineAggregationBuilder> {
-    public static final String NAME = MinBucketPipelineAggregator.TYPE.name();
+    public static final String NAME = "min_bucket";
     public static final ParseField AGGREGATION_FIELD_NAME = new ParseField(NAME);
 
     public MinBucketPipelineAggregationBuilder(String name, String bucketsPath) {
-        super(name, MinBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+        super(name, NAME, new String[] { bucketsPath });
     }
 
     /**

+ 10 - 25
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/min/MinBucketPipelineAggregator.java

@@ -22,10 +22,8 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.InternalBucketMetricValue;
 
@@ -36,36 +34,24 @@ import java.util.List;
 import java.util.Map;
 
 public class MinBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
-    public static final Type TYPE = new Type("min_bucket");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
-        @Override
-        public MinBucketPipelineAggregator readResult(StreamInput in) throws IOException {
-            MinBucketPipelineAggregator result = new MinBucketPipelineAggregator();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
     private List<String> minBucketKeys;
     private double minValue;
 
-    private MinBucketPipelineAggregator() {
-    }
-
     protected MinBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter,
             Map<String, Object> metaData) {
         super(name, bucketsPaths, gapPolicy, formatter, metaData);
     }
 
+    /**
+     * Read from a stream.
+     */
+    public MinBucketPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return MinBucketPipelineAggregationBuilder.NAME;
     }
 
     @Override
@@ -86,8 +72,7 @@ public class MinBucketPipelineAggregator extends BucketMetricsPipelineAggregator
     }
 
     @Override
-    protected InternalAggregation buildAggregation(java.util.List<PipelineAggregator> pipelineAggregators,
-            java.util.Map<String, Object> metadata) {
+    protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
         String[] keys = minBucketKeys.toArray(new String[minBucketKeys.size()]);
         return new InternalBucketMetricValue(name(), keys, minValue, format, Collections.emptyList(), metaData());
     }

+ 22 - 39
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java

@@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.DocValueFormat;
-import org.elasticsearch.search.aggregations.AggregationStreams;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
@@ -38,28 +37,9 @@ import java.util.List;
 import java.util.Map;
 
 public class InternalPercentilesBucket extends InternalNumericMetricsAggregation.MultiValue implements PercentilesBucket {
-
-    public static final Type TYPE = new Type("percentiles_bucket");
-
-    public static final AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
-        @Override
-        public InternalPercentilesBucket readResult(StreamInput in) throws IOException {
-            InternalPercentilesBucket result = new InternalPercentilesBucket();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        AggregationStreams.registerStream(STREAM, TYPE.stream());
-    }
-
     private double[] percentiles;
     private double[] percents;
 
-    protected InternalPercentilesBucket() {
-    } // for serialization
-
     public InternalPercentilesBucket(String name, double[] percents, double[] percentiles,
                                      DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators,
                                      Map<String, Object> metaData) {
@@ -69,6 +49,28 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation
         this.percents = percents;
     }
 
+    /**
+     * Read from a stream.
+     */
+    public InternalPercentilesBucket(StreamInput in) throws IOException {
+        super(in);
+        format = in.readNamedWriteable(DocValueFormat.class);
+        percentiles = in.readDoubleArray();
+        percents = in.readDoubleArray();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(format);
+        out.writeDoubleArray(percentiles);
+        out.writeDoubleArray(percents);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return PercentilesBucketPipelineAggregationBuilder.NAME;
+    }
+
     @Override
     public double percentile(double percent) throws IllegalArgumentException {
         int index = Arrays.binarySearch(percents, percent);
@@ -94,30 +96,11 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation
         return percentile(Double.parseDouble(name));
     }
 
-    @Override
-    public Type type() {
-        return TYPE;
-    }
-
     @Override
     public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
         throw new UnsupportedOperationException("Not supported");
     }
 
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        format = in.readNamedWriteable(DocValueFormat.class);
-        percentiles = in.readDoubleArray();
-        percents = in.readDoubleArray();
-    }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        out.writeNamedWriteable(format);
-        out.writeDoubleArray(percentiles);
-        out.writeDoubleArray(percents);
-    }
-
     @Override
     public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
         builder.startObject("values");

+ 2 - 2
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregationBuilder.java

@@ -40,7 +40,7 @@ import java.util.Objects;
 
 public class PercentilesBucketPipelineAggregationBuilder
         extends BucketMetricsPipelineAggregationBuilder<PercentilesBucketPipelineAggregationBuilder> {
-    public static final String NAME = PercentilesBucketPipelineAggregator.TYPE.name();
+    public static final String NAME = "percentiles_bucket";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     private static final ParseField PERCENTS_FIELD = new ParseField("percents");
@@ -48,7 +48,7 @@ public class PercentilesBucketPipelineAggregationBuilder
     private double[] percents = new double[] { 1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0 };
 
     public PercentilesBucketPipelineAggregationBuilder(String name, String bucketsPath) {
-        super(name, PercentilesBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+        super(name, NAME, new String[] { bucketsPath });
     }
 
     /**

+ 16 - 35
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java

@@ -24,10 +24,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
 
 import java.io.IOException;
@@ -37,39 +35,33 @@ import java.util.List;
 import java.util.Map;
 
 public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
-    public static final Type TYPE = new Type("percentiles_bucket");
     public final ParseField PERCENTS_FIELD = new ParseField("percents");
 
-    public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
-        @Override
-        public PercentilesBucketPipelineAggregator readResult(StreamInput in) throws IOException {
-            PercentilesBucketPipelineAggregator result = new PercentilesBucketPipelineAggregator();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-        InternalPercentilesBucket.registerStreams();
-    }
-
-    private double[] percents;
+    private final double[] percents;
     private List<Double> data;
 
-    private PercentilesBucketPipelineAggregator() {
-    }
-
     protected PercentilesBucketPipelineAggregator(String name, double[] percents, String[] bucketsPaths, GapPolicy gapPolicy,
                                                   DocValueFormat formatter, Map<String, Object> metaData) {
         super(name, bucketsPaths, gapPolicy, formatter, metaData);
         this.percents = percents;
     }
 
+    /**
+     * Read from a stream.
+     */
+    public PercentilesBucketPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+        percents = in.readDoubleArray();
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public void innerWriteTo(StreamOutput out) throws IOException {
+        out.writeDoubleArray(percents);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return PercentilesBucketPipelineAggregationBuilder.NAME;
     }
 
     @Override
@@ -105,15 +97,4 @@ public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAg
 
         return new InternalPercentilesBucket(name(), percents, percentiles, format, pipelineAggregators, metadata);
     }
-
-    @Override
-    public void innerReadFrom(StreamInput in) throws IOException {
-        percents = in.readDoubleArray();
-    }
-
-    @Override
-    public void innerWriteTo(StreamOutput out) throws IOException {
-        out.writeDoubleArray(percents);
-    }
-
 }

+ 2 - 2
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregationBuilder.java

@@ -34,11 +34,11 @@ import java.util.List;
 import java.util.Map;
 
 public class SumBucketPipelineAggregationBuilder extends BucketMetricsPipelineAggregationBuilder<SumBucketPipelineAggregationBuilder> {
-    public static final String NAME = SumBucketPipelineAggregator.TYPE.name();
+    public static final String NAME = "sum_bucket";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     public SumBucketPipelineAggregationBuilder(String name, String bucketsPath) {
-        super(name, SumBucketPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+        super(name, NAME, new String[] { bucketsPath });
     }
 
     /**

+ 9 - 23
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/sum/SumBucketPipelineAggregator.java

@@ -22,11 +22,9 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.InternalAggregation;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
 
 import java.io.IOException;
@@ -34,35 +32,23 @@ import java.util.List;
 import java.util.Map;
 
 public class SumBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
-
-    public static final Type TYPE = new Type("sum_bucket");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
-        @Override
-        public SumBucketPipelineAggregator readResult(StreamInput in) throws IOException {
-            SumBucketPipelineAggregator result = new SumBucketPipelineAggregator();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
     private double sum = 0;
 
-    private SumBucketPipelineAggregator() {
-    }
-
     protected SumBucketPipelineAggregator(String name, String[] bucketsPaths, GapPolicy gapPolicy, DocValueFormat formatter,
             Map<String, Object> metaData) {
         super(name, bucketsPaths, gapPolicy, formatter, metaData);
     }
 
+    /**
+     * Read from a stream.
+     */
+    public SumBucketPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return SumBucketPipelineAggregationBuilder.NAME;
     }
 
     @Override

+ 3 - 4
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregationBuilder.java

@@ -47,7 +47,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
 import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
 
 public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<BucketScriptPipelineAggregationBuilder> {
-    public static final String NAME = BucketScriptPipelineAggregator.TYPE.name();
+    public static final String NAME = "bucket_script";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     private final Script script;
@@ -56,8 +56,7 @@ public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggr
     private GapPolicy gapPolicy = GapPolicy.SKIP;
 
     public BucketScriptPipelineAggregationBuilder(String name, Map<String, String> bucketsPathsMap, Script script) {
-        super(name, BucketScriptPipelineAggregator.TYPE.name(), new TreeMap<>(bucketsPathsMap).values()
-                .toArray(new String[bucketsPathsMap.size()]));
+        super(name, NAME, new TreeMap<>(bucketsPathsMap).values().toArray(new String[bucketsPathsMap.size()]));
         this.bucketsPathsMap = bucketsPathsMap;
         this.script = script;
     }
@@ -70,7 +69,7 @@ public class BucketScriptPipelineAggregationBuilder extends AbstractPipelineAggr
      * Read from a stream.
      */
     public BucketScriptPipelineAggregationBuilder(StreamInput in) throws IOException {
-        super(in, BucketScriptPipelineAggregator.TYPE.name());
+        super(in, NAME);
         int mapSize = in.readVInt();
         bucketsPathsMap = new HashMap<String, String>(mapSize);
         for (int i = 0; i < mapSize; i++) {

+ 26 - 44
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketscript/BucketScriptPipelineAggregator.java

@@ -29,14 +29,12 @@ import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.AggregationExecutionException;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,28 +48,10 @@ import java.util.stream.StreamSupport;
 import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
 
 public class BucketScriptPipelineAggregator extends PipelineAggregator {
-
-    public static final Type TYPE = new Type("bucket_script");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = in -> {
-        BucketScriptPipelineAggregator result = new BucketScriptPipelineAggregator();
-        result.readFrom(in);
-        return result;
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
-    private DocValueFormat formatter;
-    private GapPolicy gapPolicy;
-
-    private Script script;
-
-    private Map<String, String> bucketsPathsMap;
-
-    public BucketScriptPipelineAggregator() {
-    }
+    private final DocValueFormat formatter;
+    private final GapPolicy gapPolicy;
+    private final Script script;
+    private final Map<String, String> bucketsPathsMap;
 
     public BucketScriptPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, DocValueFormat formatter,
             GapPolicy gapPolicy, Map<String, Object> metadata) {
@@ -82,9 +62,29 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
         this.gapPolicy = gapPolicy;
     }
 
+    /**
+     * Read from a stream.
+     */
+    @SuppressWarnings("unchecked")
+    public BucketScriptPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+        script = new Script(in);
+        formatter = in.readNamedWriteable(DocValueFormat.class);
+        gapPolicy = GapPolicy.readFrom(in);
+        bucketsPathsMap = (Map<String, String>) in.readGenericValue();
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        script.writeTo(out);
+        out.writeNamedWriteable(formatter);
+        gapPolicy.writeTo(out);
+        out.writeGenericValue(bucketsPathsMap);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return BucketScriptPipelineAggregationBuilder.NAME;
     }
 
     @Override
@@ -136,22 +136,4 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
         }
         return originalAgg.create(newBuckets);
     }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        script.writeTo(out);
-        out.writeNamedWriteable(formatter);
-        gapPolicy.writeTo(out);
-        out.writeGenericValue(bucketsPathsMap);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        script = new Script(in);
-        formatter = in.readNamedWriteable(DocValueFormat.class);
-        gapPolicy = GapPolicy.readFrom(in);
-        bucketsPathsMap = (Map<String, String>) in.readGenericValue();
-    }
-
 }

+ 3 - 4
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregationBuilder.java

@@ -45,7 +45,7 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
 import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
 
 public class BucketSelectorPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<BucketSelectorPipelineAggregationBuilder> {
-    public static final String NAME = BucketSelectorPipelineAggregator.TYPE.name();
+    public static final String NAME = "bucket_selector";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     private final Map<String, String> bucketsPathsMap;
@@ -53,8 +53,7 @@ public class BucketSelectorPipelineAggregationBuilder extends AbstractPipelineAg
     private GapPolicy gapPolicy = GapPolicy.SKIP;
 
     public BucketSelectorPipelineAggregationBuilder(String name, Map<String, String> bucketsPathsMap, Script script) {
-        super(name, BucketSelectorPipelineAggregator.TYPE.name(), new TreeMap<>(bucketsPathsMap).values()
-                .toArray(new String[bucketsPathsMap.size()]));
+        super(name, NAME, new TreeMap<>(bucketsPathsMap).values().toArray(new String[bucketsPathsMap.size()]));
         this.bucketsPathsMap = bucketsPathsMap;
         this.script = script;
     }
@@ -67,7 +66,7 @@ public class BucketSelectorPipelineAggregationBuilder extends AbstractPipelineAg
      * Read from a stream.
      */
     public BucketSelectorPipelineAggregationBuilder(StreamInput in) throws IOException {
-        super(in, BucketSelectorPipelineAggregator.TYPE.name());
+        super(in, NAME);
         int mapSize = in.readVInt();
         bucketsPathsMap = new HashMap<String, String>(mapSize);
         for (int i = 0; i < mapSize; i++) {

+ 20 - 39
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketselector/BucketSelectorPipelineAggregator.java

@@ -28,12 +28,10 @@ import org.elasticsearch.script.Script;
 import org.elasticsearch.script.ScriptContext;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,31 +43,12 @@ import java.util.Map;
 import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
 
 public class BucketSelectorPipelineAggregator extends PipelineAggregator {
-
-    public static final Type TYPE = new Type("bucket_selector");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
-        @Override
-        public BucketSelectorPipelineAggregator readResult(StreamInput in) throws IOException {
-            BucketSelectorPipelineAggregator result = new BucketSelectorPipelineAggregator();
-            result.readFrom(in);
-            return result;
-        }
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
     private GapPolicy gapPolicy;
 
     private Script script;
 
     private Map<String, String> bucketsPathsMap;
 
-    public BucketSelectorPipelineAggregator() {
-    }
-
     public BucketSelectorPipelineAggregator(String name, Map<String, String> bucketsPathsMap, Script script, GapPolicy gapPolicy,
             Map<String, Object> metadata) {
         super(name, bucketsPathsMap.values().toArray(new String[bucketsPathsMap.size()]), metadata);
@@ -78,9 +57,27 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
         this.gapPolicy = gapPolicy;
     }
 
+    /**
+     * Read from a stream.
+     */
+    @SuppressWarnings("unchecked")
+    public BucketSelectorPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+        script = new Script(in);
+        gapPolicy = GapPolicy.readFrom(in);
+        bucketsPathsMap = (Map<String, String>) in.readGenericValue();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        script.writeTo(out);
+        gapPolicy.writeTo(out);
+        out.writeGenericValue(bucketsPathsMap);
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return BucketSelectorPipelineAggregationBuilder.NAME;
     }
 
     @Override
@@ -119,20 +116,4 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
         }
         return originalAgg.create(newBuckets);
     }
-
-    @Override
-    protected void doWriteTo(StreamOutput out) throws IOException {
-        script.writeTo(out);
-        gapPolicy.writeTo(out);
-        out.writeGenericValue(bucketsPathsMap);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected void doReadFrom(StreamInput in) throws IOException {
-        script = new Script(in);
-        gapPolicy = GapPolicy.readFrom(in);
-        bucketsPathsMap = (Map<String, String>) in.readGenericValue();
-    }
-
 }

+ 3 - 3
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregationBuilder.java

@@ -44,20 +44,20 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.
 import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
 
 public class CumulativeSumPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<CumulativeSumPipelineAggregationBuilder> {
-    public static final String NAME = CumulativeSumPipelineAggregator.TYPE.name();
+    public static final String NAME = "cumulative_sum";
     public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
 
     private String format;
 
     public CumulativeSumPipelineAggregationBuilder(String name, String bucketsPath) {
-        super(name, CumulativeSumPipelineAggregator.TYPE.name(), new String[] { bucketsPath });
+        super(name, NAME, new String[] { bucketsPath });
     }
 
     /**
      * Read from a stream.
      */
     public CumulativeSumPipelineAggregationBuilder(StreamInput in) throws IOException {
-        super(in, CumulativeSumPipelineAggregator.TYPE.name());
+        super(in, NAME);
         format = in.readOptionalString();
     }
 

+ 16 - 31
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java

@@ -24,13 +24,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
-import org.elasticsearch.search.aggregations.InternalAggregation.Type;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
 import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
 import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
 import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
-import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,23 +40,7 @@ import java.util.stream.StreamSupport;
 import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
 
 public class CumulativeSumPipelineAggregator extends PipelineAggregator {
-
-    public static final Type TYPE = new Type("cumulative_sum");
-
-    public static final PipelineAggregatorStreams.Stream STREAM = in -> {
-        CumulativeSumPipelineAggregator result = new CumulativeSumPipelineAggregator();
-        result.readFrom(in);
-        return result;
-    };
-
-    public static void registerStreams() {
-        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
-    }
-
-    private DocValueFormat formatter;
-
-    public CumulativeSumPipelineAggregator() {
-    }
+    private final DocValueFormat formatter;
 
     public CumulativeSumPipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter,
             Map<String, Object> metadata) {
@@ -66,9 +48,22 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator {
         this.formatter = formatter;
     }
 
+    /**
+     * Read from a stream.
+     */
+    public CumulativeSumPipelineAggregator(StreamInput in) throws IOException {
+        super(in);
+        formatter = in.readNamedWriteable(DocValueFormat.class);
+    }
+
+    @Override
+    public void doWriteTo(StreamOutput out) throws IOException {
+        out.writeNamedWriteable(formatter);
+    }
+
     @Override
-    public Type type() {
-        return TYPE;
+    public String getWriteableName() {
+        return CumulativeSumPipelineAggregationBuilder.NAME;
     }
 
     @Override
@@ -92,14 +87,4 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator {
         }
         return factory.create(newBuckets, histo);
     }
-
-    @Override
-    public void doReadFrom(StreamInput in) throws IOException {
-        formatter = in.readNamedWriteable(DocValueFormat.class);
-    }
-
-    @Override
-    public void doWriteTo(StreamOutput out) throws IOException {
-        out.writeNamedWriteable(formatter);
-    }
 }

+ 2 - 4
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java

@@ -1,5 +1,3 @@
-package org.elasticsearch.search.aggregations.pipeline;
-
 /*
  * Licensed to Elasticsearch under one or more contributor
  * license agreements. See the NOTICE file distributed with
@@ -19,7 +17,8 @@ package org.elasticsearch.search.aggregations.pipeline;
  * under the License.
  */
 
-import org.elasticsearch.ElasticsearchException;
+package org.elasticsearch.search.aggregations.pipeline;
+
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
@@ -45,7 +44,6 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
 import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.percentilesBucket;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
-import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.core.IsNull.notNullValue;

+ 1 - 2
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/PercentilesBucketTests.java

@@ -22,7 +22,6 @@ package org.elasticsearch.search.aggregations.pipeline.bucketmetrics;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.index.query.QueryParseContext;
-import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregationBuilder;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -56,7 +55,7 @@ public class PercentilesBucketTests extends AbstractBucketMetricsTestCase<Percen
         parser.nextToken(); // skip object start
 
         PercentilesBucketPipelineAggregationBuilder builder = (PercentilesBucketPipelineAggregationBuilder) aggParsers
-            .pipelineParser(PercentilesBucketPipelineAggregator.TYPE.name(), parseFieldMatcher)
+            .pipelineParser(PercentilesBucketPipelineAggregationBuilder.NAME, parseFieldMatcher)
             .parse("test", parseContext);
 
         assertThat(builder.percents(), equalTo(new double[]{0.0, 20.0, 50.0, 75.99}));