Browse Source

Aggregations: Adds cumulative sum aggregation

This adds a new pipeline aggregation, the cumulative sum aggregation. This is a parent aggregation which must be specified as a sub-aggregation to a histogram or date_histogram aggregation. It will add a new aggregation to each bucket containing the sum of a specified metrics over this and all previous buckets.
Colin Goodheart-Smithe 10 years ago
parent
commit
f21924ae0d

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java

@@ -61,6 +61,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucke
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketParser;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketParser;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser;
+import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumParser;
 import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser;
 import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParser;
 import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
@@ -115,6 +116,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
         pipelineAggParsers.add(AvgBucketParser.class);
         pipelineAggParsers.add(SumBucketParser.class);
         pipelineAggParsers.add(MovAvgParser.class);
+        pipelineAggParsers.add(CumulativeSumParser.class);
         pipelineAggParsers.add(BucketScriptParser.class);
     }
 

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java

@@ -65,6 +65,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucke
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
@@ -128,6 +129,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
         AvgBucketPipelineAggregator.registerStreams();
         SumBucketPipelineAggregator.registerStreams();
         MovAvgPipelineAggregator.registerStreams();
+        CumulativeSumPipelineAggregator.registerStreams();
         BucketScriptPipelineAggregator.registerStreams();
     }
 

+ 5 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java

@@ -23,6 +23,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucke
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder;
+import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder;
 import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder;
 import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder;
 import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder;
@@ -59,4 +60,8 @@ public final class PipelineAggregatorBuilders {
     public static final BucketScriptBuilder seriesArithmetic(String name) {
         return new BucketScriptBuilder(name);
     }
+
+    public static final CumulativeSumBuilder cumulativeSum(String name) {
+        return new CumulativeSumBuilder(name);
+    }
 }

+ 48 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumBuilder.java

@@ -0,0 +1,48 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.cumulativesum;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
+
+import java.io.IOException;
+
+public class CumulativeSumBuilder extends PipelineAggregatorBuilder<CumulativeSumBuilder> {
+
+    private String format;
+
+    public CumulativeSumBuilder(String name) {
+        super(name, CumulativeSumPipelineAggregator.TYPE.name());
+    }
+
+    public CumulativeSumBuilder format(String format) {
+        this.format = format;
+        return this;
+    }
+
+    @Override
+    protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
+        if (format != null) {
+            builder.field(CumulativeSumParser.FORMAT.getPreferredName(), format);
+        }
+        return builder;
+    }
+
+}

+ 96 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumParser.java

@@ -0,0 +1,96 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.cumulativesum;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.SearchParseException;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.format.ValueFormat;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CumulativeSumParser implements PipelineAggregator.Parser {
+
+    public static final ParseField FORMAT = new ParseField("format");
+    public static final ParseField GAP_POLICY = new ParseField("gap_policy");
+    public static final ParseField UNIT = new ParseField("unit");
+
+    @Override
+    public String type() {
+        return CumulativeSumPipelineAggregator.TYPE.name();
+    }
+
+    @Override
+    public PipelineAggregatorFactory parse(String pipelineAggregatorName, XContentParser parser, SearchContext context) throws IOException {
+        XContentParser.Token token;
+        String currentFieldName = null;
+        String[] bucketsPaths = null;
+        String format = null;
+
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            } else if (token == XContentParser.Token.VALUE_STRING) {
+                if (FORMAT.match(currentFieldName)) {
+                    format = parser.text();
+                } else if (BUCKETS_PATH.match(currentFieldName)) {
+                    bucketsPaths = new String[] { parser.text() };
+                } else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: ["
+                            + currentFieldName + "].", parser.getTokenLocation());
+                }
+            } else if (token == XContentParser.Token.START_ARRAY) {
+                if (BUCKETS_PATH.match(currentFieldName)) {
+                    List<String> paths = new ArrayList<>();
+                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                        String path = parser.text();
+                        paths.add(path);
+                    }
+                    bucketsPaths = paths.toArray(new String[paths.size()]);
+                } else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: ["
+                            + currentFieldName + "].", parser.getTokenLocation());
+                }
+            } else {
+                throw new SearchParseException(context, "Unexpected token " + token + " in [" + pipelineAggregatorName + "].",
+                        parser.getTokenLocation());
+            }
+        }
+
+        if (bucketsPaths == null) {
+            throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+                    + "] for derivative aggregation [" + pipelineAggregatorName + "]", parser.getTokenLocation());
+        }
+
+        ValueFormatter formatter = null;
+        if (format != null) {
+            formatter = ValueFormat.Patternable.Number.format(format).formatter();
+        }
+
+        return new CumulativeSumPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter);
+    }
+
+}

+ 146 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/cumulativesum/CumulativeSumPipelineAggregator.java

@@ -0,0 +1,146 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.cumulativesum;
+
+import com.google.common.collect.Lists;
+
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregation.Type;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
+import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
+import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
+
+public class CumulativeSumPipelineAggregator extends PipelineAggregator {
+
+    public final static Type TYPE = new Type("cumulative_sum");
+
+    public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
+        @Override
+        public CumulativeSumPipelineAggregator readResult(StreamInput in) throws IOException {
+            CumulativeSumPipelineAggregator result = new CumulativeSumPipelineAggregator();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    private ValueFormatter formatter;
+
+    public CumulativeSumPipelineAggregator() {
+    }
+
+    public CumulativeSumPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter,
+            Map<String, Object> metadata) {
+        super(name, bucketsPaths, metadata);
+        this.formatter = formatter;
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
+        InternalHistogram histo = (InternalHistogram) aggregation;
+        List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
+        InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
+
+        List newBuckets = new ArrayList<>();
+        double sum = 0;
+        for (InternalHistogram.Bucket bucket : buckets) {
+            Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.INSERT_ZEROS);
+            sum += thisBucketValue;
+            List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(),
+                    AGGREGATION_TRANFORM_FUNCTION));
+            aggs.add(new InternalSimpleValue(name(), sum, formatter, new ArrayList<PipelineAggregator>(), metaData()));
+            InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(),
+                    new InternalAggregations(aggs), bucket.getKeyed(), bucket.getFormatter());
+            newBuckets.add(newBucket);
+        }
+        return factory.create(newBuckets, histo);
+    }
+
+    @Override
+    public void doReadFrom(StreamInput in) throws IOException {
+        formatter = ValueFormatterStreams.readOptional(in);
+    }
+
+    @Override
+    public void doWriteTo(StreamOutput out) throws IOException {
+        ValueFormatterStreams.writeOptional(formatter, out);
+    }
+
+    public static class Factory extends PipelineAggregatorFactory {
+
+        private final ValueFormatter formatter;
+
+        public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter) {
+            super(name, TYPE.name(), bucketsPaths);
+            this.formatter = formatter;
+        }
+
+        @Override
+        protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
+            return new CumulativeSumPipelineAggregator(name, bucketsPaths, formatter, metaData);
+        }
+
+        @Override
+        public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List<PipelineAggregatorFactory> pipelineAggregatorFactories) {
+            if (bucketsPaths.length != 1) {
+                throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+                        + " must contain a single entry for aggregation [" + name + "]");
+            }
+            if (!(parent instanceof HistogramAggregator.Factory)) {
+                throw new IllegalStateException("cumulative sum aggregation [" + name
+                        + "] must have a histogram or date_histogram as parent");
+            } else {
+                HistogramAggregator.Factory histoParent = (HistogramAggregator.Factory) parent;
+                if (histoParent.minDocCount() != 0) {
+                    throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name
+                            + "] must have min_doc_count of 0");
+                }
+            }
+        }
+
+    }
+}

+ 169 - 0
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumTests.java

@@ -0,0 +1,169 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
+import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.cumulativeSum;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+
+@ElasticsearchIntegrationTest.SuiteScopeTest
+public class CumulativeSumTests extends ElasticsearchIntegrationTest {
+
+    private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
+
+    static int numDocs;
+    static int interval;
+    static int minRandomValue;
+    static int maxRandomValue;
+    static int numValueBuckets;
+    static long[] valueCounts;
+
+    @Override
+    public void setupSuiteScopeCluster() throws Exception {
+        createIndex("idx");
+        createIndex("idx_unmapped");
+
+        numDocs = randomIntBetween(6, 20);
+        interval = randomIntBetween(2, 5);
+
+        minRandomValue = 0;
+        maxRandomValue = 20;
+
+        numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
+        valueCounts = new long[numValueBuckets];
+
+        List<IndexRequestBuilder> builders = new ArrayList<>();
+
+        for (int i = 0; i < numDocs; i++) {
+            int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
+            builders.add(client().prepareIndex("idx", "type").setSource(
+                    jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
+                            .endObject()));
+            final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
+            valueCounts[bucket]++;
+        }
+
+        assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
+        for (int i = 0; i < 2; i++) {
+            builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
+                    jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
+        }
+        indexRandom(true, builders);
+        ensureSearchable();
+    }
+
+    @Test
+    public void testDocCount() throws Exception {
+        SearchResponse response = client().prepareSearch("idx")
+                .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                .extendedBounds((long) minRandomValue, (long) maxRandomValue)
+                                .subAggregation(cumulativeSum("cumulative_sum").setBucketsPaths("_count"))).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Histogram histo = response.getAggregations().get("histo");
+        assertThat(histo, notNullValue());
+        assertThat(histo.getName(), equalTo("histo"));
+        List<? extends Bucket> buckets = histo.getBuckets();
+        assertThat(buckets.size(), equalTo(numValueBuckets));
+
+        double sum = 0;
+        for (int i = 0; i < numValueBuckets; ++i) {
+            Histogram.Bucket bucket = buckets.get(i);
+            assertThat(bucket, notNullValue());
+            assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
+            assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
+            sum += bucket.getDocCount();
+            InternalSimpleValue cumulativeSumValue = bucket.getAggregations().get("cumulative_sum");
+            assertThat(cumulativeSumValue, notNullValue());
+            assertThat(cumulativeSumValue.getName(), equalTo("cumulative_sum"));
+            assertThat(cumulativeSumValue.value(), equalTo(sum));
+        }
+
+    }
+
+    @Test
+    public void testMetric() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .addAggregation(
+                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))
+                                .subAggregation(cumulativeSum("cumulative_sum").setBucketsPaths("sum"))).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Histogram histo = response.getAggregations().get("histo");
+        assertThat(histo, notNullValue());
+        assertThat(histo.getName(), equalTo("histo"));
+        List<? extends Bucket> buckets = histo.getBuckets();
+
+        double bucketSum = 0;
+        for (int i = 0; i < buckets.size(); ++i) {
+            Bucket bucket = buckets.get(i);
+            assertThat(bucket, notNullValue());
+            assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
+            Sum sum = bucket.getAggregations().get("sum");
+            assertThat(sum, notNullValue());
+            bucketSum += sum.value();
+
+            InternalSimpleValue sumBucketValue = bucket.getAggregations().get("cumulative_sum");
+            assertThat(sumBucketValue, notNullValue());
+            assertThat(sumBucketValue.getName(), equalTo("cumulative_sum"));
+            assertThat(sumBucketValue.value(), equalTo(bucketSum));
+        }
+    }
+
+    @Test
+    public void testNoBuckets() throws Exception {
+        SearchResponse response = client()
+                .prepareSearch("idx")
+                .setQuery(rangeQuery(SINGLE_VALUED_FIELD_NAME).lt(minRandomValue))
+                .addAggregation(
+                        histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
+                                .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))
+                                .subAggregation(cumulativeSum("cumulative_sum").setBucketsPaths("sum"))).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        Histogram histo = response.getAggregations().get("histo");
+        assertThat(histo, notNullValue());
+        assertThat(histo.getName(), equalTo("histo"));
+        List<? extends Bucket> buckets = histo.getBuckets();
+        assertThat(buckets.size(), equalTo(0));
+    }
+}

+ 1 - 0
docs/reference/aggregations/pipeline.asciidoc

@@ -160,4 +160,5 @@ include::pipeline/max-bucket-aggregation.asciidoc[]
 include::pipeline/min-bucket-aggregation.asciidoc[]
 include::pipeline/sum-bucket-aggregation.asciidoc[]
 include::pipeline/movavg-aggregation.asciidoc[]
+include::pipeline/cumulative-sum-aggregation.asciidoc[]
 include::pipeline/bucket-script-aggregation.asciidoc[]

+ 104 - 0
docs/reference/aggregations/pipeline/cumulative-sum-aggregation.asciidoc

@@ -0,0 +1,104 @@
+[[search-aggregations-pipeline-cumulative-sum-aggregation]]
+=== Cumulative Sum Aggregation
+
+A parent pipeline aggregation which calculates the cumulative sum of a specified metric in a parent histogram (or date_histogram) 
+aggregation. The specified metric must be numeric and the enclosing histogram must have `min_doc_count` set to `0` (default
+for `histogram` aggregations).
+
+==== Syntax
+
+A `cumulative_sum` aggregation looks like this in isolation:
+
+[source,js]
+--------------------------------------------------
+{
+    "cumulative_sum": {
+        "buckets_path": "the_sum"
+    }
+}
+--------------------------------------------------
+
+.`cumulative_sum` Parameters
+|===
+|Parameter Name |Description |Required |Default Value
+|`buckets_path` |The path to the buckets we wish to find the cumulative sum for (see <<bucket-path-syntax>> for more
+ details) |Required |
+ |`format` |format to apply to the output value of this aggregation |Optional, defaults to `null` |
+|===
+
+The following snippet calculates the cumulative sum of the total monthly `sales`:
+
+[source,js]
+--------------------------------------------------
+{
+    "aggs" : {
+        "sales_per_month" : {
+            "date_histogram" : {
+                "field" : "date",
+                "interval" : "month"
+            },
+            "aggs": {
+                "sales": {
+                    "sum": {
+                        "field": "price"
+                    }
+                },
+                "cumulative_sales": {
+                    "cumulative_sum": {
+                        "buckets_paths": "sales" <1>
+                    }
+                }
+            }
+        }
+    }
+}
+--------------------------------------------------
+
+<1> `bucket_paths` instructs this cumulative sum aggregation to use the output of the `sales` aggregation for the cumulative sum
+
+And the following may be the response:
+
+[source,js]
+--------------------------------------------------
+{
+   "aggregations": {
+      "sales_per_month": {
+         "buckets": [
+            {
+               "key_as_string": "2015/01/01 00:00:00",
+               "key": 1420070400000,
+               "doc_count": 3,
+               "sales": {
+                  "value": 550
+               },
+               "cumulative_sales": {
+                  "value": 550
+               }
+            },
+            {
+               "key_as_string": "2015/02/01 00:00:00",
+               "key": 1422748800000,
+               "doc_count": 2,
+               "sales": {
+                  "value": 60
+               },
+               "cumulative_sales": {
+                  "value": 610 
+               }
+            },
+            {
+               "key_as_string": "2015/03/01 00:00:00",
+               "key": 1425168000000,
+               "doc_count": 2,
+               "sales": {
+                  "value": 375
+               },
+               "cumulative_sales": {
+                  "value": 985
+               }
+            }
+         ]
+      }
+   }
+}
+--------------------------------------------------