Browse Source

Merge pull request #13186 from polyfractal/feature/percentile_pipeline

Aggregations: Add percentiles_bucket pipeline aggregation
Zachary Tong 10 years ago
parent
commit
f8ad4da61c

+ 4 - 3
core/src/main/java/org/elasticsearch/search/SearchModule.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.search;
 
-import org.elasticsearch.common.Classes;
 import org.elasticsearch.common.inject.AbstractModule;
 import org.elasticsearch.common.inject.multibindings.Multibinder;
 import org.elasticsearch.common.settings.Settings;
@@ -110,6 +109,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucke
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketParser;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser;
@@ -143,8 +144,6 @@ import org.elasticsearch.search.highlight.HighlightPhase;
 import org.elasticsearch.search.highlight.Highlighter;
 import org.elasticsearch.search.highlight.Highlighters;
 import org.elasticsearch.search.query.QueryPhase;
-import org.elasticsearch.search.suggest.SuggestParseElement;
-import org.elasticsearch.search.suggest.SuggestPhase;
 import org.elasticsearch.search.suggest.Suggester;
 import org.elasticsearch.search.suggest.Suggesters;
 
@@ -301,6 +300,7 @@ public class SearchModule extends AbstractModule {
         multibinderPipelineAggParser.addBinding().to(MinBucketParser.class);
         multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class);
         multibinderPipelineAggParser.addBinding().to(SumBucketParser.class);
+        multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class);
         multibinderPipelineAggParser.addBinding().to(MovAvgParser.class);
         multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class);
         multibinderPipelineAggParser.addBinding().to(BucketScriptParser.class);
@@ -393,6 +393,7 @@ public class SearchModule extends AbstractModule {
         MinBucketPipelineAggregator.registerStreams();
         AvgBucketPipelineAggregator.registerStreams();
         SumBucketPipelineAggregator.registerStreams();
+        PercentilesBucketPipelineAggregator.registerStreams();
         MovAvgPipelineAggregator.registerStreams();
         CumulativeSumPipelineAggregator.registerStreams();
         BucketScriptPipelineAggregator.registerStreams();

+ 5 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java

@@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.pipeline;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder;
 import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder;
@@ -55,6 +56,10 @@ public final class PipelineAggregatorBuilders {
         return new SumBucketBuilder(name);
     }
 
+    public static final PercentilesBucketBuilder percentilesBucket(String name) {
+        return new PercentilesBucketBuilder(name);
+    }
+
     public static final MovAvgBuilder movingAvg(String name) {
         return new MovAvgBuilder(name);
     }

+ 1 - 1
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java

@@ -61,7 +61,7 @@ public abstract class BucketMetricsBuilder<B extends BucketMetricsBuilder<B>> ex
         return builder;
     }
 
-    protected void doInternalXContent(XContentBuilder builder, Params params) {
+    protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException {
     }
 
 }

+ 2 - 1
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java

@@ -105,7 +105,8 @@ public abstract class BucketMetricsParser implements PipelineAggregator.Parser {
     protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
             ValueFormatter formatter);
 
-    protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token, XContentParser parser, SearchContext context) {
+    protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token,
+                              XContentParser parser, SearchContext context) throws IOException {
         return false;
     }
 

+ 163 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java

@@ -0,0 +1,163 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.AggregationStreams;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
+import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
+import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile;
+import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class InternalPercentilesBucket extends InternalNumericMetricsAggregation.MultiValue implements PercentilesBucket {
+
+    public final static Type TYPE = new Type("percentiles_bucket");
+
+    public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
+        @Override
+        public InternalPercentilesBucket readResult(StreamInput in) throws IOException {
+            InternalPercentilesBucket result = new InternalPercentilesBucket();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        AggregationStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    private double[] percentiles;
+    private double[] percents;
+
+    protected InternalPercentilesBucket() {
+    } // for serialization
+
+    public InternalPercentilesBucket(String name, double[] percents, double[] percentiles,
+                                     ValueFormatter formatter, List<PipelineAggregator> pipelineAggregators,
+                                     Map<String, Object> metaData) {
+        super(name, pipelineAggregators, metaData);
+        this.valueFormatter = formatter;
+        this.percentiles = percentiles;
+        this.percents = percents;
+    }
+
+    @Override
+    public double percentile(double percent) throws IllegalArgumentException {
+        int index = Arrays.binarySearch(percents, percent);
+        if (index < 0) {
+            throw new IllegalArgumentException("Percent requested [" + String.valueOf(percent) + "] was not" +
+                    " one of the computed percentiles.  Available keys are: " + Arrays.toString(percents));
+        }
+        return percentiles[index];
+    }
+
+    @Override
+    public String percentileAsString(double percent) {
+        return valueFormatter.format(percentile(percent));
+    }
+
+    @Override
+    public Iterator<Percentile> iterator() {
+        return new Iter(percents, percentiles);
+    }
+
+    @Override
+    public double value(String name) {
+        return percentile(Double.parseDouble(name));
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
+        throw new UnsupportedOperationException("Not supported");
+    }
+
+    @Override
+    protected void doReadFrom(StreamInput in) throws IOException {
+        valueFormatter = ValueFormatterStreams.readOptional(in);
+        percentiles = in.readDoubleArray();
+        percents = in.readDoubleArray();
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) throws IOException {
+        ValueFormatterStreams.writeOptional(valueFormatter, out);
+        out.writeDoubleArray(percentiles);
+        out.writeDoubleArray(percents);
+    }
+
+    @Override
+    public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject("values");
+        for (double percent : percents) {
+            double value = percentile(percent);
+            boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value));
+            String key = String.valueOf(percent);
+            builder.field(key, hasValue ? value : null);
+            if (hasValue && !(valueFormatter instanceof ValueFormatter.Raw)) {
+                builder.field(key + "_as_string", percentileAsString(percent));
+            }
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    public static class Iter extends UnmodifiableIterator<Percentile> {
+
+        private final double[] percents;
+        private final double[] percentiles;
+        private int i;
+
+        public Iter(double[] percents, double[] percentiles) {
+            this.percents = percents;
+            this.percentiles = percentiles;
+            i = 0;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return i < percents.length;
+        }
+
+        @Override
+        public Percentile next() {
+            final Percentile next = new InternalPercentile(percents[i], percentiles[i]);
+            ++i;
+            return next;
+        }
+    }
+}

+ 25 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
+
+import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
+
+public interface PercentilesBucket extends Percentiles {
+}

+ 49 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder;
+
+import java.io.IOException;
+
+public class PercentilesBucketBuilder extends BucketMetricsBuilder<PercentilesBucketBuilder> {
+
+    Double[] percents;
+
+    public PercentilesBucketBuilder(String name) {
+        super(name, PercentilesBucketPipelineAggregator.TYPE.name());
+    }
+
+    public PercentilesBucketBuilder percents(Double[] percents) {
+        this.percents = percents;
+        return this;
+    }
+
+    @Override
+    protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException {
+        if (percents != null) {
+            builder.field(PercentilesBucketParser.PERCENTS.getPreferredName(), percents);
+        }
+    }
+
+
+}
+

+ 67 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
+
+import com.google.common.primitives.Doubles;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
+
+
+public class PercentilesBucketParser extends BucketMetricsParser {
+
+    public static final ParseField PERCENTS = new ParseField("percents");
+    double[] percents = new double[] { 1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0 };
+
+    @Override
+    public String type() {
+        return PercentilesBucketPipelineAggregator.TYPE.name();
+    }
+
+    @Override
+    protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
+                                                     ValueFormatter formatter) {
+        return new PercentilesBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter, percents);
+    }
+
+    @Override
+    protected boolean doParse(String pipelineAggregatorName, String currentFieldName,
+                              XContentParser.Token token, XContentParser parser, SearchContext context) throws IOException {
+        if (context.parseFieldMatcher().match(currentFieldName, PERCENTS)) {
+
+            List<Double> parsedPercents = new ArrayList<>();
+            while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                parsedPercents.add(parser.doubleValue());
+            }
+            percents = Doubles.toArray(parsedPercents);
+            return true;
+        }
+        return false;
+    }
+}

+ 155 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java

@@ -0,0 +1,155 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregation.Type;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
+
+public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
+
+    public final static Type TYPE = new Type("percentiles_bucket");
+
+    public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
+        @Override
+        public PercentilesBucketPipelineAggregator readResult(StreamInput in) throws IOException {
+            PercentilesBucketPipelineAggregator result = new PercentilesBucketPipelineAggregator();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
+        InternalPercentilesBucket.registerStreams();
+    }
+
+    private double[] percents;
+    private List<Double> data;
+
+    private PercentilesBucketPipelineAggregator() {
+    }
+
+    protected PercentilesBucketPipelineAggregator(String name, double[] percents, String[] bucketsPaths, GapPolicy gapPolicy,
+                                                  ValueFormatter formatter, Map<String, Object> metaData) {
+        super(name, bucketsPaths, gapPolicy, formatter, metaData);
+        this.percents = percents;
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    protected void preCollection() {
+       data = new ArrayList<>(1024);
+    }
+
+    @Override
+    protected void collectBucketValue(String bucketKey, Double bucketValue) {
+        data.add(bucketValue);
+    }
+
+    @Override
+    protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
+
+        // Perform the sorting and percentile collection now that all the data
+        // has been collected.
+        Collections.sort(data);
+
+        double[] percentiles = new double[percents.length];
+        if (data.size() == 0) {
+            for (int i = 0; i < percents.length; i++) {
+                percentiles[i] = Double.NaN;
+            }
+        } else {
+            for (int i = 0; i < percents.length; i++) {
+                int index = (int)((percents[i] / 100.0) * data.size());
+                percentiles[i] = data.get(index);
+            }
+        }
+
+        // todo need postCollection() to clean up temp sorted data?
+
+        return new InternalPercentilesBucket(name(), percents, percentiles, formatter, pipelineAggregators, metadata);
+    }
+
+    @Override
+    public void doReadFrom(StreamInput in) throws IOException {
+        super.doReadFrom(in);
+        percents = in.readDoubleArray();
+    }
+
+    @Override
+    public void doWriteTo(StreamOutput out) throws IOException {
+        super.doWriteTo(out);
+        out.writeDoubleArray(percents);
+    }
+
+    public static class Factory extends PipelineAggregatorFactory {
+
+        private final ValueFormatter formatter;
+        private final GapPolicy gapPolicy;
+        private final double[] percents;
+
+        public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter, double[] percents) {
+            super(name, TYPE.name(), bucketsPaths);
+            this.gapPolicy = gapPolicy;
+            this.formatter = formatter;
+            this.percents = percents;
+        }
+
+        @Override
+        protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
+            return new PercentilesBucketPipelineAggregator(name, percents, bucketsPaths, gapPolicy, formatter, metaData);
+        }
+
+        @Override
+        public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories,
+                               List<PipelineAggregatorFactory> pipelineAggregatorFactories) {
+            if (bucketsPaths.length != 1) {
+                throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+                        + " must contain a single entry for aggregation [" + name + "]");
+            }
+
+            for (Double p : percents) {
+                if (p == null || p < 0.0 || p > 100.0) {
+                    throw new IllegalStateException(PercentilesBucketParser.PERCENTS.getPreferredName()
+                            + " must only contain non-null doubles from 0.0-100.0 inclusive");
+                }
+            }
+        }
+
+    }
+
+}

+ 625 - 0
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java

@@ -0,0 +1,625 @@
+package org.elasticsearch.search.aggregations.pipeline;
+
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchParseException;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucket;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
+import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.percentilesBucket;
+import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.sumBucket;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+@ESIntegTestCase.SuiteScopeTestCase
+public class PercentilesBucketIT extends ESIntegTestCase {
+
+    private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
+    private static final Double[] PERCENTS = {1.0, 25.0, 50.0, 75.0, 99.0};
+    static int numDocs;
+    static int interval;
+    static int minRandomValue;
+    static int maxRandomValue;
+    static int numValueBuckets;
+    static long[] valueCounts;
+
+    @Override
+    public void setupSuiteScopeCluster() throws Exception {
+        createIndex("idx");
+        createIndex("idx_unmapped");
+
+        numDocs = randomIntBetween(6, 20);
+        interval = randomIntBetween(2, 5);
+
+        minRandomValue = 0;
+        maxRandomValue = 20;
+
+        numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
+        valueCounts = new long[numValueBuckets];
+
+        List<IndexRequestBuilder> builders = new ArrayList<>();
+
+        for (int i = 0; i < numDocs; i++) {
+            int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
+            builders.add(client().prepareIndex("idx", "type").setSource(
+                    jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
+                            .endObject()));
+            final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
+            valueCounts[bucket]++;
+        }
+
+        assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
+        for (int i = 0; i < 2; i++) {
+            builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
+                    jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
+        }
+        indexRandom(true, builders);
+        ensureSearchable();
+    }
+
+    @Test
+    public void testDocCount_topLevel() throws Exception {
+        SearchResponse response = client().prepareSearch("idx")
+                .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                        .extendedBounds((long) minRandomValue, (long) maxRandomValue))
+                .addAggregation(percentilesBucket("percentiles_bucket")
+                        .setBucketsPaths("histo>_count")
+                        .percents(PERCENTS)).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Histogram histo = response.getAggregations().get("histo");
+        assertThat(histo, notNullValue());
+        assertThat(histo.getName(), equalTo("histo"));
+        List<? extends Histogram.Bucket> buckets = histo.getBuckets();
+        assertThat(buckets.size(), equalTo(numValueBuckets));
+
+        double[] values = new double[numValueBuckets];
+        for (int i = 0; i < numValueBuckets; ++i) {
+            Histogram.Bucket bucket = buckets.get(i);
+            assertThat(bucket, notNullValue());
+            assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
+            assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
+            values[i] = bucket.getDocCount();
+        }
+
+        Arrays.sort(values);
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+        for (Double p : PERCENTS) {
+            double expected = values[(int)((p / 100) * values.length)];
+            assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+        }
+
+    }
+
+    @Test
+    public void testDocCount_asSubAgg() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        terms("terms")
+                                .field("tag")
+                                .order(Terms.Order.term(true))
+                                .subAggregation(
+                                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                                .extendedBounds((long) minRandomValue, (long) maxRandomValue))
+                                .subAggregation(percentilesBucket("percentiles_bucket")
+                                        .setBucketsPaths("histo>_count")
+                                        .percents(PERCENTS))).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> termsBuckets = terms.getBuckets();
+        assertThat(termsBuckets.size(), equalTo(interval));
+
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket termsBucket = termsBuckets.get(i);
+            assertThat(termsBucket, notNullValue());
+            assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
+
+            Histogram histo = termsBucket.getAggregations().get("histo");
+            assertThat(histo, notNullValue());
+            assertThat(histo.getName(), equalTo("histo"));
+            List<? extends Histogram.Bucket> buckets = histo.getBuckets();
+
+            double[] values = new double[numValueBuckets];
+            for (int j = 0; j < numValueBuckets; ++j) {
+                Histogram.Bucket bucket = buckets.get(j);
+                assertThat(bucket, notNullValue());
+                assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
+                values[j] = bucket.getDocCount();
+            }
+
+            Arrays.sort(values);
+
+            PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket");
+            assertThat(percentilesBucketValue, notNullValue());
+            assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+            for (Double p : PERCENTS) {
+                double expected = values[(int)((p / 100) * values.length)];
+                assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testMetric_topLevel() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                .addAggregation(percentilesBucket("percentiles_bucket")
+                        .setBucketsPaths("terms>sum")
+                        .percents(PERCENTS)).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> buckets = terms.getBuckets();
+        assertThat(buckets.size(), equalTo(interval));
+
+        double[] values = new double[interval];
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket bucket = buckets.get(i);
+            assertThat(bucket, notNullValue());
+            assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
+            assertThat(bucket.getDocCount(), greaterThan(0l));
+            Sum sum = bucket.getAggregations().get("sum");
+            assertThat(sum, notNullValue());
+            values[i] = sum.value();
+        }
+
+        Arrays.sort(values);
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+        for (Double p : PERCENTS) {
+            double expected = values[(int)((p / 100) * values.length)];
+            assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+        }
+    }
+
+    @Test
+    public void testMetric_topLevelDefaultPercents() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                .addAggregation(percentilesBucket("percentiles_bucket")
+                        .setBucketsPaths("terms>sum")).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> buckets = terms.getBuckets();
+        assertThat(buckets.size(), equalTo(interval));
+
+        double[] values = new double[interval];
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket bucket = buckets.get(i);
+            assertThat(bucket, notNullValue());
+            assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
+            assertThat(bucket.getDocCount(), greaterThan(0l));
+            Sum sum = bucket.getAggregations().get("sum");
+            assertThat(sum, notNullValue());
+            values[i] = sum.value();
+        }
+
+        Arrays.sort(values);
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+        for (Percentile p : percentilesBucketValue) {
+            double expected = values[(int)((p.getPercent() / 100) * values.length)];
+            assertThat(percentilesBucketValue.percentile(p.getPercent()), equalTo(expected));
+            assertThat(p.getValue(), equalTo(expected));
+        }
+    }
+
+    @Test
+    public void testMetric_asSubAgg() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        terms("terms")
+                                .field("tag")
+                                .order(Terms.Order.term(true))
+                                .subAggregation(
+                                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                                .extendedBounds((long) minRandomValue, (long) maxRandomValue)
+                                                .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                                .subAggregation(percentilesBucket("percentiles_bucket")
+                                        .setBucketsPaths("histo>sum")
+                                        .percents(PERCENTS))).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> termsBuckets = terms.getBuckets();
+        assertThat(termsBuckets.size(), equalTo(interval));
+
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket termsBucket = termsBuckets.get(i);
+            assertThat(termsBucket, notNullValue());
+            assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
+
+            Histogram histo = termsBucket.getAggregations().get("histo");
+            assertThat(histo, notNullValue());
+            assertThat(histo.getName(), equalTo("histo"));
+            List<? extends Histogram.Bucket> buckets = histo.getBuckets();
+
+            List<Double> values = new ArrayList<>(numValueBuckets);
+            for (int j = 0; j < numValueBuckets; ++j) {
+                Histogram.Bucket bucket = buckets.get(j);
+                assertThat(bucket, notNullValue());
+                assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
+                if (bucket.getDocCount() != 0) {
+                    Sum sum = bucket.getAggregations().get("sum");
+                    assertThat(sum, notNullValue());
+                    values.add(sum.value());
+                }
+            }
+
+            Collections.sort(values);
+
+            PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket");
+            assertThat(percentilesBucketValue, notNullValue());
+            assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+            for (Double p : PERCENTS) {
+                double expected = values.get((int) ((p / 100) * values.size()));
+                assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testMetric_asSubAggWithInsertZeros() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        terms("terms")
+                                .field("tag")
+                                .order(Terms.Order.term(true))
+                                .subAggregation(
+                                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                                .extendedBounds((long) minRandomValue, (long) maxRandomValue)
+                                                .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                                .subAggregation(percentilesBucket("percentiles_bucket")
+                                        .setBucketsPaths("histo>sum")
+                                        .gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS)
+                                        .percents(PERCENTS)))
+                .execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> termsBuckets = terms.getBuckets();
+        assertThat(termsBuckets.size(), equalTo(interval));
+
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket termsBucket = termsBuckets.get(i);
+            assertThat(termsBucket, notNullValue());
+            assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
+
+            Histogram histo = termsBucket.getAggregations().get("histo");
+            assertThat(histo, notNullValue());
+            assertThat(histo.getName(), equalTo("histo"));
+            List<? extends Histogram.Bucket> buckets = histo.getBuckets();
+
+            double[] values = new double[numValueBuckets];
+            for (int j = 0; j < numValueBuckets; ++j) {
+                Histogram.Bucket bucket = buckets.get(j);
+                assertThat(bucket, notNullValue());
+                assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
+                Sum sum = bucket.getAggregations().get("sum");
+                assertThat(sum, notNullValue());
+
+                values[j] = sum.value();
+            }
+
+            Arrays.sort(values);
+
+            PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket");
+            assertThat(percentilesBucketValue, notNullValue());
+            assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+            for (Double p : PERCENTS) {
+                double expected = values[(int)((p / 100) * values.length)];
+                assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testNoBuckets() throws Exception {
+        SearchResponse response = client().prepareSearch("idx")
+                .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                .addAggregation(percentilesBucket("percentiles_bucket")
+                        .setBucketsPaths("terms>sum")
+                        .percents(PERCENTS)).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> buckets = terms.getBuckets();
+        assertThat(buckets.size(), equalTo(0));
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+        for (Double p : PERCENTS) {
+            assertThat(percentilesBucketValue.percentile(p), equalTo(Double.NaN));
+        }
+    }
+
+    @Test
+    public void testWrongPercents() throws Exception {
+        SearchResponse response = client().prepareSearch("idx")
+                .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                .addAggregation(percentilesBucket("percentiles_bucket")
+                        .setBucketsPaths("terms>sum")
+                        .percents(PERCENTS)).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> buckets = terms.getBuckets();
+        assertThat(buckets.size(), equalTo(0));
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
+
+        try {
+            percentilesBucketValue.percentile(2.0);
+            fail("2.0 was not a valid percent, should have thrown exception");
+        } catch (IllegalArgumentException exception) {
+            // All good
+        }
+    }
+
+    @Test
+    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/13179")
+    public void testBadPercents() throws Exception {
+        Double[] badPercents = {-1.0, 110.0};
+
+        try {
+            SearchResponse response = client().prepareSearch("idx")
+                    .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
+                    .addAggregation(percentilesBucket("percentiles_bucket")
+                            .setBucketsPaths("terms>sum")
+                            .percents(badPercents)).execute().actionGet();
+
+            assertSearchResponse(response);
+
+            Terms terms = response.getAggregations().get("terms");
+            assertThat(terms, notNullValue());
+            assertThat(terms.getName(), equalTo("terms"));
+            List<Terms.Bucket> buckets = terms.getBuckets();
+            assertThat(buckets.size(), equalTo(0));
+
+            PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+
+            fail("Illegal percent's were provided but no exception was thrown.");
+        } catch (SearchPhaseExecutionException exception) {
+            // All good
+        }
+
+    }
+
+    @Test
+    public void testBadPercents_asSubAgg() throws Exception {
+        Double[] badPercents = {-1.0, 110.0};
+
+        try {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        terms("terms")
+                                .field("tag")
+                                .order(Terms.Order.term(true))
+                                .subAggregation(
+                                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                                .extendedBounds((long) minRandomValue, (long) maxRandomValue))
+                                .subAggregation(percentilesBucket("percentiles_bucket")
+                                        .setBucketsPaths("histo>_count")
+                                        .percents(badPercents))).execute().actionGet();
+
+            PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
+
+            fail("Illegal percent's were provided but no exception was thrown.");
+        } catch (SearchPhaseExecutionException exception) {
+            // All good
+        }
+
+    }
+
+    @Test
+    public void testNested() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        terms("terms")
+                                .field("tag")
+                                .order(Terms.Order.term(true))
+                                .subAggregation(
+                                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                                .extendedBounds((long) minRandomValue, (long) maxRandomValue))
+                                .subAggregation(percentilesBucket("percentile_histo_bucket").setBucketsPaths("histo>_count")))
+                .addAggregation(percentilesBucket("percentile_terms_bucket")
+                        .setBucketsPaths("terms>percentile_histo_bucket.50")
+                        .percents(PERCENTS)).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> termsBuckets = terms.getBuckets();
+        assertThat(termsBuckets.size(), equalTo(interval));
+
+        double[] values = new double[termsBuckets.size()];
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket termsBucket = termsBuckets.get(i);
+            assertThat(termsBucket, notNullValue());
+            assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
+
+            Histogram histo = termsBucket.getAggregations().get("histo");
+            assertThat(histo, notNullValue());
+            assertThat(histo.getName(), equalTo("histo"));
+            List<? extends Histogram.Bucket> buckets = histo.getBuckets();
+
+            double[] innerValues = new double[numValueBuckets];
+            for (int j = 0; j < numValueBuckets; ++j) {
+                Histogram.Bucket bucket = buckets.get(j);
+                assertThat(bucket, notNullValue());
+                assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
+
+                innerValues[j] = bucket.getDocCount();
+            }
+            Arrays.sort(innerValues);
+
+            PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket");
+            assertThat(percentilesBucketValue, notNullValue());
+            assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket"));
+            for (Double p : PERCENTS) {
+                double expected = innerValues[(int)((p / 100) * innerValues.length)];
+                assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+            }
+            values[i] = percentilesBucketValue.percentile(50.0);
+        }
+
+        Arrays.sort(values);
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket"));
+        for (Double p : PERCENTS) {
+            double expected = values[(int)((p / 100) * values.length)];
+            assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+        }
+    }
+
+    @Test
+    public void testNestedWithDecimal() throws Exception {
+        Double[] percent = {99.9};
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        terms("terms")
+                                .field("tag")
+                                .order(Terms.Order.term(true))
+                                .subAggregation(
+                                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                                .extendedBounds((long) minRandomValue, (long) maxRandomValue))
+                                .subAggregation(percentilesBucket("percentile_histo_bucket")
+                                        .percents(percent)
+                                        .setBucketsPaths("histo>_count")))
+                .addAggregation(percentilesBucket("percentile_terms_bucket")
+                        .setBucketsPaths("terms>percentile_histo_bucket[99.9]")
+                        .percents(percent)).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Terms terms = response.getAggregations().get("terms");
+        assertThat(terms, notNullValue());
+        assertThat(terms.getName(), equalTo("terms"));
+        List<Terms.Bucket> termsBuckets = terms.getBuckets();
+        assertThat(termsBuckets.size(), equalTo(interval));
+
+        double[] values = new double[termsBuckets.size()];
+        for (int i = 0; i < interval; ++i) {
+            Terms.Bucket termsBucket = termsBuckets.get(i);
+            assertThat(termsBucket, notNullValue());
+            assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
+
+            Histogram histo = termsBucket.getAggregations().get("histo");
+            assertThat(histo, notNullValue());
+            assertThat(histo.getName(), equalTo("histo"));
+            List<? extends Histogram.Bucket> buckets = histo.getBuckets();
+
+            double[] innerValues = new double[numValueBuckets];
+            for (int j = 0; j < numValueBuckets; ++j) {
+                Histogram.Bucket bucket = buckets.get(j);
+                assertThat(bucket, notNullValue());
+                assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
+
+                innerValues[j] = bucket.getDocCount();
+            }
+            Arrays.sort(innerValues);
+
+            PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket");
+            assertThat(percentilesBucketValue, notNullValue());
+            assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket"));
+            for (Double p : percent) {
+                double expected = innerValues[(int)((p / 100) * innerValues.length)];
+                assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+            }
+            values[i] = percentilesBucketValue.percentile(99.9);
+        }
+
+        Arrays.sort(values);
+
+        PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket");
+        assertThat(percentilesBucketValue, notNullValue());
+        assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket"));
+        for (Double p : percent) {
+            double expected = values[(int)((p / 100) * values.length)];
+            assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
+        }
+    }
+}

+ 1 - 0
docs/reference/aggregations/pipeline.asciidoc

@@ -163,6 +163,7 @@ include::pipeline/derivative-aggregation.asciidoc[]
 include::pipeline/max-bucket-aggregation.asciidoc[]
 include::pipeline/min-bucket-aggregation.asciidoc[]
 include::pipeline/sum-bucket-aggregation.asciidoc[]
+include::pipeline/percentiles-bucket-aggregation.asciidoc[]
 include::pipeline/movavg-aggregation.asciidoc[]
 include::pipeline/cumulative-sum-aggregation.asciidoc[]
 include::pipeline/bucket-script-aggregation.asciidoc[]

+ 121 - 0
docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc

@@ -0,0 +1,121 @@
+[[search-aggregations-pipeline-percentiles-bucket-aggregation]]
+=== Percentiles Bucket Aggregation
+
+coming[2.1.0]
+
+experimental[]
+
+A sibling pipeline aggregation which calculates percentiles across all bucket of a specified metric in a sibling aggregation.
+The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation.
+
+==== Syntax
+
+A `percentiles_bucket` aggregation looks like this in isolation:
+
+[source,js]
+--------------------------------------------------
+{
+    "percentiles_bucket": {
+        "buckets_path": "the_sum"
+    }
+}
+--------------------------------------------------
+
+.`sum_bucket` Parameters
+|===
+|Parameter Name |Description |Required |Default Value
+|`buckets_path` |The path to the buckets we wish to find the sum for (see <<bucket-path-syntax>> for more
+ details) |Required |
+|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
+ details)|Optional | `skip`
+|`format` |format to apply to the output value of this aggregation |Optional | `null`
+|`percents` |The list of percentiles to calculate |Optional | `[ 1, 5, 25, 50, 75, 95, 99 ]`
+|===
+
+The following snippet calculates the sum of all the total monthly `sales` buckets:
+
+[source,js]
+--------------------------------------------------
+{
+    "aggs" : {
+        "sales_per_month" : {
+            "date_histogram" : {
+                "field" : "date",
+                "interval" : "month"
+            },
+            "aggs": {
+                "sales": {
+                    "sum": {
+                        "field": "price"
+                    }
+                }
+            }
+        },
+        "sum_monthly_sales": {
+            "percentiles_bucket": {
+                "buckets_paths": "sales_per_month>sales", <1>
+                "percents": [ 25.0, 50.0, 75.0 ] <2>
+            }
+        }
+    }
+}
+--------------------------------------------------
+<1> `bucket_paths` instructs this percentiles_bucket aggregation that we want to calculate percentiles for
+the `sales` aggregation in the `sales_per_month` date histogram.
+<2> `percents` specifies which percentiles we wish to calculate, in this case, the 25th, 50th and 75th percentil
+
+And the following may be the response:
+
+[source,js]
+--------------------------------------------------
+{
+   "aggregations": {
+      "sales_per_month": {
+         "buckets": [
+            {
+               "key_as_string": "2015/01/01 00:00:00",
+               "key": 1420070400000,
+               "doc_count": 3,
+               "sales": {
+                  "value": 550
+               }
+            },
+            {
+               "key_as_string": "2015/02/01 00:00:00",
+               "key": 1422748800000,
+               "doc_count": 2,
+               "sales": {
+                  "value": 60
+               }
+            },
+            {
+               "key_as_string": "2015/03/01 00:00:00",
+               "key": 1425168000000,
+               "doc_count": 2,
+               "sales": {
+                  "value": 375
+               }
+            }
+         ]
+      },
+      "percentiles_monthly_sales": {
+        "values" : {
+            "25.0": 60,
+            "50.0": 375",
+            "75.0": 550
+         }
+      }
+   }
+}
+--------------------------------------------------
+
+
+==== Percentiles_bucket implementation
+
+The Percentile Bucket returns the nearest input data point that is not greater than the requested percentile; it does not
+interpolate between data points.
+
+The percentiles are calculated exactly and is not an approximation (unlike the Percentiles Metric). This means
+the implementation maintains an in-memory, sorted list of your data to compute the percentiles, before discarding the
+data.  You may run into memory pressure issues if you attempt to calculate percentiles over many millions of
+data-points in a single `percentiles_bucket`.