Bläddra i källkod

Aggregations: add serial differencing pipeline aggregation

Zachary Tong 10 år sedan
förälder
incheckning
e3f9d561e4

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java

@@ -67,6 +67,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeParse
 import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorParser;
 import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgParser;
 import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelModule;
+import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffParser;
 
 import java.util.List;
 
@@ -120,6 +121,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
         pipelineAggParsers.add(CumulativeSumParser.class);
         pipelineAggParsers.add(BucketScriptParser.class);
         pipelineAggParsers.add(BucketSelectorParser.class);
+        pipelineAggParsers.add(SerialDiffParser.class);
     }
 
     /**

+ 2 - 0
core/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java

@@ -72,6 +72,7 @@ import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivat
 import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
 import org.elasticsearch.search.aggregations.pipeline.movavg.models.TransportMovAvgModelModule;
+import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
 
 /**
  * A module that registers all the transport streams for the addAggregation
@@ -133,6 +134,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
         CumulativeSumPipelineAggregator.registerStreams();
         BucketScriptPipelineAggregator.registerStreams();
         BucketSelectorPipelineAggregator.registerStreams();
+        SerialDiffPipelineAggregator.registerStreams();
     }
 
     @Override

+ 5 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java

@@ -28,6 +28,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptB
 import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativeBuilder;
 import org.elasticsearch.search.aggregations.pipeline.having.BucketSelectorBuilder;
 import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgBuilder;
+import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffBuilder;
 
 public final class PipelineAggregatorBuilders {
 
@@ -69,4 +70,8 @@ public final class PipelineAggregatorBuilders {
     public static final CumulativeSumBuilder cumulativeSum(String name) {
         return new CumulativeSumBuilder(name);
     }
+
+    public static final SerialDiffBuilder diff(String name) {
+        return new SerialDiffBuilder(name);
+    }
 }

+ 67 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffBuilder.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.serialdiff;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilder;
+
+import java.io.IOException;
+
+public class SerialDiffBuilder extends PipelineAggregatorBuilder<SerialDiffBuilder> {
+
+    private String format;
+    private GapPolicy gapPolicy;
+    private Integer lag;
+
+    public SerialDiffBuilder(String name) {
+        super(name, SerialDiffPipelineAggregator.TYPE.name());
+    }
+
+    public SerialDiffBuilder format(String format) {
+        this.format = format;
+        return this;
+    }
+
+    public SerialDiffBuilder gapPolicy(GapPolicy gapPolicy) {
+        this.gapPolicy = gapPolicy;
+        return this;
+    }
+
+    public SerialDiffBuilder lag(Integer lag) {
+        this.lag = lag;
+        return this;
+    }
+
+    @Override
+    protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
+        if (format != null) {
+            builder.field(SerialDiffParser.FORMAT.getPreferredName(), format);
+        }
+        if (gapPolicy != null) {
+            builder.field(SerialDiffParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
+        }
+        if (lag != null) {
+            builder.field(SerialDiffParser.LAG.getPreferredName(), lag);
+        }
+        return builder;
+    }
+
+}

+ 116 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffParser.java

@@ -0,0 +1,116 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.serialdiff;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.SearchParseException;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.format.ValueFormat;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
+
+public class SerialDiffParser implements PipelineAggregator.Parser {
+
+    public static final ParseField FORMAT = new ParseField("format");
+    public static final ParseField GAP_POLICY = new ParseField("gap_policy");
+    public static final ParseField LAG = new ParseField("lag");
+
+    @Override
+    public String type() {
+        return SerialDiffPipelineAggregator.TYPE.name();
+    }
+
+    @Override
+    public PipelineAggregatorFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
+        XContentParser.Token token;
+        String currentFieldName = null;
+        String[] bucketsPaths = null;
+        String format = null;
+        GapPolicy gapPolicy = GapPolicy.SKIP;
+        int lag = 1;
+
+        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
+            if (token == XContentParser.Token.FIELD_NAME) {
+                currentFieldName = parser.currentName();
+            } else if (token == XContentParser.Token.VALUE_STRING) {
+                if (context.parseFieldMatcher().match(currentFieldName, FORMAT)) {
+                    format = parser.text();
+                } else if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
+                    bucketsPaths = new String[] { parser.text() };
+                } else if (context.parseFieldMatcher().match(currentFieldName, GAP_POLICY)) {
+                    gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
+                } else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+                            + currentFieldName + "].", parser.getTokenLocation());
+                }
+            } else if (token == XContentParser.Token.VALUE_NUMBER) {
+                if (context.parseFieldMatcher().match(currentFieldName, LAG)) {
+                    lag = parser.intValue(true);
+                    if (lag <= 0) {
+                        throw new SearchParseException(context, "Lag must be a positive, non-zero integer.  Value supplied was" +
+                                lag + " in [" + reducerName + "]: ["
+                                + currentFieldName + "].", parser.getTokenLocation());
+                    }
+                }  else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+                            + currentFieldName + "].", parser.getTokenLocation());
+                }
+            } else if (token == XContentParser.Token.START_ARRAY) {
+                if (context.parseFieldMatcher().match(currentFieldName, BUCKETS_PATH)) {
+                    List<String> paths = new ArrayList<>();
+                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                        String path = parser.text();
+                        paths.add(path);
+                    }
+                    bucketsPaths = paths.toArray(new String[paths.size()]);
+                } else {
+                    throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+                            + currentFieldName + "].", parser.getTokenLocation());
+                }
+            } else {
+                throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
+                        parser.getTokenLocation());
+            }
+        }
+
+        if (bucketsPaths == null) {
+            throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+                    + "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
+        }
+
+        ValueFormatter formatter;
+        if (format != null) {
+            formatter = ValueFormat.Patternable.Number.format(format).formatter();
+        }  else {
+            formatter = ValueFormatter.RAW;
+        }
+
+        return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag);
+    }
+
+}

+ 161 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffPipelineAggregator.java

@@ -0,0 +1,161 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.serialdiff;
+
+import com.google.common.collect.EvictingQueue;
+import com.google.common.collect.Lists;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
+import org.elasticsearch.search.aggregations.InternalAggregation.Type;
+import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
+import org.elasticsearch.search.aggregations.pipeline.*;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
+import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
+import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
+
+public class SerialDiffPipelineAggregator extends PipelineAggregator {
+
+    public final static Type TYPE = new Type("serial_diff");
+
+    public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
+        @Override
+        public SerialDiffPipelineAggregator readResult(StreamInput in) throws IOException {
+            SerialDiffPipelineAggregator result = new SerialDiffPipelineAggregator();
+            result.readFrom(in);
+            return result;
+        }
+    };
+
+    public static void registerStreams() {
+        PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
+    }
+
+    private ValueFormatter formatter;
+    private GapPolicy gapPolicy;
+    private int lag;
+
+    public SerialDiffPipelineAggregator() {
+    }
+
+    public SerialDiffPipelineAggregator(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy,
+                                        int lag, Map<String, Object> metadata) {
+        super(name, bucketsPaths, metadata);
+        this.formatter = formatter;
+        this.gapPolicy = gapPolicy;
+        this.lag = lag;
+    }
+
+    @Override
+    public Type type() {
+        return TYPE;
+    }
+
+    @Override
+    public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
+        InternalHistogram histo = (InternalHistogram) aggregation;
+        List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
+        InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
+
+        List newBuckets = new ArrayList<>();
+        EvictingQueue<Double> lagWindow = EvictingQueue.create(lag);
+        int counter = 0;
+
+        for (InternalHistogram.Bucket bucket : buckets) {
+            Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
+            InternalHistogram.Bucket newBucket = bucket;
+
+            counter += 1;
+
+            // Still under the initial lag period, add nothing and move on
+            Double lagValue;
+            if (counter <= lag) {
+                lagValue = Double.NaN;
+            } else {
+                lagValue = lagWindow.peek();  // Peek here, because we rely on add'ing to always move the window
+            }
+
+            // Normalize null's to NaN
+            if (thisBucketValue == null) {
+                thisBucketValue = Double.NaN;
+            }
+
+            // Both have values, calculate diff and replace the "empty" bucket
+            if (!Double.isNaN(thisBucketValue) && !Double.isNaN(lagValue)) {
+                double diff = thisBucketValue - lagValue;
+
+                List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
+                aggs.add(new InternalSimpleValue(name(), diff, formatter, new ArrayList<PipelineAggregator>(), metaData()));
+                newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
+                        aggs), bucket.getKeyed(), bucket.getFormatter());
+            }
+
+
+            newBuckets.add(newBucket);
+            lagWindow.add(thisBucketValue);
+
+        }
+        return factory.create(newBuckets, histo);
+    }
+
+    @Override
+    public void doReadFrom(StreamInput in) throws IOException {
+        formatter = ValueFormatterStreams.readOptional(in);
+        gapPolicy = GapPolicy.readFrom(in);
+        lag = in.readVInt();
+    }
+
+    @Override
+    public void doWriteTo(StreamOutput out) throws IOException {
+        ValueFormatterStreams.writeOptional(formatter, out);
+        gapPolicy.writeTo(out);
+        out.writeVInt(lag);
+    }
+
+    public static class Factory extends PipelineAggregatorFactory {
+
+        private final ValueFormatter formatter;
+        private GapPolicy gapPolicy;
+        private int lag;
+
+        public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, int lag) {
+            super(name, TYPE.name(), bucketsPaths);
+            this.formatter = formatter;
+            this.gapPolicy = gapPolicy;
+            this.lag = lag;
+        }
+
+        @Override
+        protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
+            return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, lag, metaData);
+        }
+
+    }
+}

+ 291 - 0
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/serialdiff/SerialDiffTests.java

@@ -0,0 +1,291 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.search.aggregations.pipeline.serialdiff;
+
+import com.google.common.collect.EvictingQueue;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
+import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
+import org.elasticsearch.search.aggregations.metrics.ValuesSourceMetricsAggregationBuilder;
+import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
+import org.elasticsearch.search.aggregations.pipeline.PipelineAggregationHelperTests;
+import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
+import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.diff;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.hamcrest.core.IsNull.nullValue;
+
+@ElasticsearchIntegrationTest.SuiteScopeTest
+public class SerialDiffTests extends ElasticsearchIntegrationTest {
+    private static final String INTERVAL_FIELD = "l_value";
+    private static final String VALUE_FIELD = "v_value";
+
+    static int interval;
+    static int numBuckets;
+    static int lag;
+    static BucketHelpers.GapPolicy gapPolicy;
+    static ValuesSourceMetricsAggregationBuilder metric;
+    static List<PipelineAggregationHelperTests.MockBucket> mockHisto;
+
+    static Map<String, ArrayList<Double>> testValues;
+
+    enum MetricTarget {
+        VALUE ("value"), COUNT("count");
+
+        private final String name;
+
+        MetricTarget(String s) {
+            name = s;
+        }
+
+        public String toString(){
+            return name;
+        }
+    }
+
+    private ValuesSourceMetricsAggregationBuilder randomMetric(String name, String field) {
+        int rand = randomIntBetween(0,3);
+
+        switch (rand) {
+            case 0:
+                return min(name).field(field);
+            case 2:
+                return max(name).field(field);
+            case 3:
+                return avg(name).field(field);
+            default:
+                return avg(name).field(field);
+        }
+    }
+
+    private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
+        if (!expectedBucketIter.hasNext()) {
+            fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");
+        }
+        if (!expectedCountsIter.hasNext()) {
+            fail("`expectedCountsIter` iterator ended before `actual` iterator, size mismatch");
+        }
+        if (!expectedValuesIter.hasNext()) {
+            fail("`expectedValuesIter` iterator ended before `actual` iterator, size mismatch");
+        }
+    }
+
+    private void assertBucketContents(Histogram.Bucket actual, Double expectedCount, Double expectedValue) {
+        // This is a gap bucket
+        SimpleValue countDiff = actual.getAggregations().get("diff_counts");
+        if (expectedCount == null) {
+            assertThat("[_count] diff is not null", countDiff, nullValue());
+        } else {
+            assertThat("[_count] diff is null", countDiff, notNullValue());
+            assertThat("[_count] diff does not match expected [" + countDiff.value() + " vs " + expectedCount + "]",
+                    countDiff.value(), closeTo(expectedCount, 0.1));
+        }
+
+        // This is a gap bucket
+        SimpleValue valuesDiff = actual.getAggregations().get("diff_values");
+        if (expectedValue == null) {
+            assertThat("[value] diff is not null", valuesDiff, Matchers.nullValue());
+        } else {
+            assertThat("[value] diff is null", valuesDiff, notNullValue());
+            assertThat("[value] diff does not match expected [" + valuesDiff.value() + " vs " + expectedValue + "]",
+                    valuesDiff.value(), closeTo(expectedValue, 0.1));
+        }
+    }
+
+
+    @Override
+    public void setupSuiteScopeCluster() throws Exception {
+        createIndex("idx");
+        createIndex("idx_unmapped");
+        List<IndexRequestBuilder> builders = new ArrayList<>();
+
+
+        interval = 5;
+        numBuckets = randomIntBetween(10, 80);
+        lag = randomIntBetween(1, numBuckets / 2);
+
+        gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS;
+        metric = randomMetric("the_metric", VALUE_FIELD);
+        mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());
+
+        testValues = new HashMap<>(8);
+
+        for (MetricTarget target : MetricTarget.values()) {
+            setupExpected(target);
+        }
+
+        for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) {
+            for (double value : mockBucket.docValues) {
+                builders.add(client().prepareIndex("idx", "type").setSource(jsonBuilder().startObject()
+                        .field(INTERVAL_FIELD, mockBucket.key)
+                        .field(VALUE_FIELD, value).endObject()));
+            }
+        }
+
+        indexRandom(true, builders);
+        ensureSearchable();
+    }
+
+    /**
+     * @param target    The document field "target", e.g. _count or a field value
+     */
+    private void setupExpected(MetricTarget target) {
+        ArrayList<Double> values = new ArrayList<>(numBuckets);
+        EvictingQueue<Double> lagWindow = EvictingQueue.create(lag);
+
+        int counter = 0;
+        for (PipelineAggregationHelperTests.MockBucket mockBucket : mockHisto) {
+            Double metricValue;
+            double[] docValues = mockBucket.docValues;
+
+            // Gaps only apply to metric values, not doc _counts
+            if (mockBucket.count == 0 && target.equals(MetricTarget.VALUE)) {
+                // If there was a gap in doc counts and we are ignoring, just skip this bucket
+                if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
+                    metricValue = null;
+                } else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
+                    // otherwise insert a zero instead of the true value
+                    metricValue = 0.0;
+                } else {
+                    metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric);
+                }
+
+            } else {
+                // If this isn't a gap, or is a _count, just insert the value
+                metricValue = target.equals(MetricTarget.VALUE) ? PipelineAggregationHelperTests.calculateMetric(docValues, metric) : mockBucket.count;
+            }
+
+            counter += 1;
+
+            // Still under the initial lag period, add nothing and move on
+            Double lagValue;
+            if (counter <= lag) {
+                lagValue = Double.NaN;
+            } else {
+                lagValue = lagWindow.peek();  // Peek here, because we rely on add'ing to always move the window
+            }
+
+            // Normalize null's to NaN
+            if (metricValue == null) {
+                metricValue = Double.NaN;
+            }
+
+            // Both have values, calculate diff and replace the "empty" bucket
+            if (!Double.isNaN(metricValue) && !Double.isNaN(lagValue)) {
+                double diff = metricValue - lagValue;
+                values.add(diff);
+            } else {
+                values.add(null);   // The tests need null, even though the agg doesn't
+            }
+
+            lagWindow.add(metricValue);
+
+
+
+
+        }
+
+
+        testValues.put(target.toString(), values);
+    }
+
+    @Test
+    public void basicDiff() {
+
+        SearchResponse response = client()
+                .prepareSearch("idx").setTypes("type")
+                .addAggregation(
+                        histogram("histo").field(INTERVAL_FIELD).interval(interval)
+                                .extendedBounds(0L, (long) (interval * (numBuckets - 1)))
+                                .subAggregation(metric)
+                                .subAggregation(diff("diff_counts")
+                                        .lag(lag)
+                                        .gapPolicy(gapPolicy)
+                                        .setBucketsPaths("_count"))
+                                .subAggregation(diff("diff_values")
+                                        .lag(lag)
+                                        .gapPolicy(gapPolicy)
+                                        .setBucketsPaths("the_metric"))
+                ).execute().actionGet();
+
+        assertSearchResponse(response);
+
+        InternalHistogram<InternalHistogram.Bucket> histo = response.getAggregations().get("histo");
+        assertThat(histo, notNullValue());
+        assertThat(histo.getName(), equalTo("histo"));
+        List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
+        assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
+
+        List<Double> expectedCounts = testValues.get(MetricTarget.COUNT.toString());
+        List<Double> expectedValues = testValues.get(MetricTarget.VALUE.toString());
+
+        Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
+        Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
+        Iterator<Double> expectedCountsIter = expectedCounts.iterator();
+        Iterator<Double> expectedValuesIter = expectedValues.iterator();
+
+        while (actualIter.hasNext()) {
+            assertValidIterators(expectedBucketIter, expectedCountsIter, expectedValuesIter);
+
+            Histogram.Bucket actual = actualIter.next();
+            PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next();
+            Double expectedCount = expectedCountsIter.next();
+            Double expectedValue = expectedValuesIter.next();
+
+            assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key));
+            assertThat("doc counts do not match", actual.getDocCount(), equalTo((long)expected.count));
+
+            assertBucketContents(actual, expectedCount, expectedValue);
+        }
+    }
+
+    @Test
+    public void invalidLagSize() {
+        try {
+            client()
+                .prepareSearch("idx").setTypes("type")
+                .addAggregation(
+                        histogram("histo").field(INTERVAL_FIELD).interval(interval)
+                                .extendedBounds(0L, (long) (interval * (numBuckets - 1)))
+                                .subAggregation(metric)
+                                .subAggregation(diff("diff_counts")
+                                        .lag(-1)
+                                        .gapPolicy(gapPolicy)
+                                        .setBucketsPaths("_count"))
+                ).execute().actionGet();
+        } catch (SearchPhaseExecutionException e) {
+            // All good
+        }
+    }
+
+
+}

+ 1 - 0
docs/reference/aggregations/pipeline.asciidoc

@@ -163,3 +163,4 @@ include::pipeline/movavg-aggregation.asciidoc[]
 include::pipeline/cumulative-sum-aggregation.asciidoc[]
 include::pipeline/bucket-script-aggregation.asciidoc[]
 include::pipeline/bucket-selector-aggregation.asciidoc[]
+include::pipeline/series-arithmetic-aggregation.asciidoc[]

+ 105 - 0
docs/reference/aggregations/pipeline/serial-diff-aggregation.asciidoc

@@ -0,0 +1,105 @@
+[[search-aggregations-pipeline-serialdiff-aggregation]]
+=== Serial Differencing Aggregation
+
+coming[2.0.0]
+
+experimental[]
+
+Serial differencing is a technique where values in a time series are subtracted from itself at
+different time lags or periods. For example, the datapoint f(x) = f(x~t~) - f(x~t-n~), where n is the period being used.
+
+A period of 1 is equivalent to a derivative with no time normalization: it is simply the change from one point to the
+next. Single periods are useful for removing constant, linear trends.
+
+Single periods are also useful for transforming data into a stationary series. In this example, the Dow Jones is
+plotted over ~250 days. The raw data is not stationary, which would make it difficult to use with some techniques.
+
+By calculating the first-difference, we de-trend the data (e.g. remove a constant, linear trend).  We can see that the
+data becomes a stationary series (e.g. the first difference is randomly distributed around zero, and doesn't seem to
+exhibit any pattern/behavior). The transformation reveals that the dataset is following a random-walk; the value is the
+previous value +/- a random amount.  This insight allows selection of further tools for analysis.
+
+[[serialdiff_dow]]
+.Dow Jones plotted and made stationary with first-differencing
+image::images/pipeline_serialdiff/dow.png[]
+
+Larger periods can be used to remove seasonal / cyclic behavior. In this example, a population of lemmings was
+synthetically generated with a sine wave + constant linear trend + random noise. The sine wave has a period of 30 days.
+
+The first-difference removes the constant trend, leaving just a sine wave. The 30th-difference is then applied to the
+first-difference to remove the cyclic behavior, leaving a stationary series which is amenable to other analysis.
+
+[[serialdiff_lemmings]]
+.Lemmings data plotted made stationary with 1st and 30th difference
+image::images/pipeline_serialdiff/lemmings.png[]
+
+
+
+==== Syntax
+
+A `serial_diff` aggregation looks like this in isolation:
+
+[source,js]
+--------------------------------------------------
+{
+    "serial_diff": {
+        "buckets_path": "the_sum",
+        "lag": "7"
+    }
+}
+--------------------------------------------------
+
+.`moving_avg` Parameters
+|===
+|Parameter Name |Description |Required |Default Value
+|`buckets_path` |Path to the metric of interest (see <<bucket-path-syntax, `buckets_path` Syntax>> for more details |Required |
+|`lag` |The historical bucket to subtract from the current value. E.g. a lag of 7 will subtract the current value from
+ the value 7 buckets ago. Must be a positive, non-zero integer |Optional |`1`
+|`gap_policy` |Determines what should happen when a gap in the data is encountered. |Optional |`insert_zero`
+|`format` |Format to apply to the output value of this aggregation |Optional | `null`
+|===
+
+`serial_diff` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation:
+
+[source,js]
+--------------------------------------------------
+{
+   "aggs": {
+      "my_date_histo": {                  <1>
+         "date_histogram": {
+            "field": "timestamp",
+            "interval": "day"
+         },
+         "aggs": {
+            "the_sum": {
+               "sum": {
+                  "field": "lemmings"     <2>
+               }
+            },
+            "thirtieth_difference": {
+               "serial_diff": {                <3>
+                  "buckets_path": "lemmings",
+                  "lag" : 30
+               }
+            }
+         }
+      }
+   }
+}
+--------------------------------------------------
+<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals
+<2> A `sum` metric is used to calculate the sum of a field.  This could be any metric (sum, min, max, etc)
+<3> Finally, we specify a `serial_diff` aggregation which uses "the_sum" metric as its input.
+
+Serial differences are built by first specifying a `histogram` or `date_histogram` over a field.  You can then optionally
+add normal metrics, such as a `sum`, inside of that histogram.  Finally, the `serial_diff` is embedded inside the histogram.
+The `buckets_path` parameter is then used to "point" at one of the sibling metrics inside of the histogram (see
+<<bucket-path-syntax>> for a description of the syntax for `buckets_path`.
+
+
+
+
+
+
+
+

BIN
docs/reference/images/pipeline_serialdiff/dow.png


BIN
docs/reference/images/pipeline_serialdiff/lemmings.png