浏览代码

Aggregations: Moving average forecasts should not include current datapoint.

- Fixes tests, and removes a few special snowflake, fragile tests.
- Removes concrete implementation of predict() and moves it into
  each model so that the logic is clearer.  Because there is some
  shared checks/assertions, those remain in predict() and the main
  prediction happens in doPredict()
Zachary Tong 10 年之前
父节点
当前提交
5d94febcb1

+ 2 - 1
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java

@@ -120,7 +120,6 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
             InternalHistogram.Bucket newBucket = bucket;
 
             if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) {
-                values.offer(thisBucketValue);
 
                 // Some models (e.g. HoltWinters) have certain preconditions that must be met
                 if (model.hasValue(values.size())) {
@@ -142,6 +141,8 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
                     }
                     lastValidPosition = counter;
                 }
+
+                values.offer(thisBucketValue);
             }
             counter += 1;
             newBuckets.add(newBucket);

+ 10 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java

@@ -29,6 +29,7 @@ import org.elasticsearch.search.internal.SearchContext;
 
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
@@ -50,6 +51,15 @@ public class EwmaModel extends MovAvgModel {
         this.alpha = alpha;
     }
 
+    @Override
+    protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
+        double[] predictions = new double[numPredictions];
+
+        // EWMA just emits the same final prediction repeatedly.
+        Arrays.fill(predictions, next(values));
+
+        return predictions;
+    }
 
     @Override
     public <T extends Number> double next(Collection<T> values) {

+ 1 - 1
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java

@@ -67,7 +67,7 @@ public class HoltLinearModel extends MovAvgModel {
      * @return                  Returns an array of doubles, since most smoothing methods operate on floating points
      */
     @Override
-    public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
+    protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
         return next(values, numPredictions);
     }
 

+ 1 - 1
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java

@@ -176,7 +176,7 @@ public class HoltWintersModel extends MovAvgModel {
      * @return                  Returns an array of doubles, since most smoothing methods operate on floating points
      */
     @Override
-    public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
+    protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
         return next(values, numPredictions);
     }
 

+ 11 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java

@@ -30,6 +30,7 @@ import org.elasticsearch.search.internal.SearchContext;
 
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
@@ -41,6 +42,16 @@ public class LinearModel extends MovAvgModel {
 
     protected static final ParseField NAME_FIELD = new ParseField("linear");
 
+    @Override
+    protected  <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
+        double[] predictions = new double[numPredictions];
+
+        // EWMA just emits the same final prediction repeatedly.
+        Arrays.fill(predictions, next(values));
+
+        return predictions;
+    }
+
     @Override
     public <T extends Number> double next(Collection<T> values) {
         double avg = 0;

+ 20 - 27
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java

@@ -19,8 +19,6 @@
 
 package org.elasticsearch.search.aggregations.pipeline.movavg.models;
 
-import com.google.common.collect.EvictingQueue;
-
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.search.SearchParseException;
@@ -44,7 +42,7 @@ public abstract class MovAvgModel {
      */
     public boolean hasValue(int windowLength) {
         // Default implementation can always provide a next() value
-        return true;
+        return windowLength > 0;
     }
 
     /**
@@ -57,9 +55,7 @@ public abstract class MovAvgModel {
     public abstract <T extends Number> double next(Collection<T> values);
 
     /**
-     * Predicts the next `n` values in the series, using the smoothing model to generate new values.
-     * Default prediction mode is to simply continuing calling <code>next()</code> and adding the
-     * predicted value back into the windowed buffer.
+     * Predicts the next `n` values in the series.
      *
      * @param values            Collection of numerics to movingAvg, usually windowed
      * @param numPredictions    Number of newly generated predictions to return
@@ -67,34 +63,31 @@ public abstract class MovAvgModel {
      * @return                  Returns an array of doubles, since most smoothing methods operate on floating points
      */
     public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
-        double[] predictions = new double[numPredictions];
+        assert(numPredictions >= 1);
 
         // If there are no values, we can't do anything.  Return an array of NaNs.
-        if (values.size() == 0) {
+        if (values.isEmpty()) {
             return emptyPredictions(numPredictions);
         }
 
-        // special case for one prediction, avoids allocation
-        if (numPredictions < 1) {
-            throw new IllegalArgumentException("numPredictions may not be less than 1.");
-        } else if (numPredictions == 1){
-            predictions[0] = next(values);
-            return predictions;
-        }
-
-        Collection<Number> predictionBuffer = EvictingQueue.create(values.size());
-        predictionBuffer.addAll(values);
-
-        for (int i = 0; i < numPredictions; i++) {
-            predictions[i] = next(predictionBuffer);
-
-            // Add the last value to the buffer, so we can keep predicting
-            predictionBuffer.add(predictions[i]);
-        }
-
-        return predictions;
+        return doPredict(values, numPredictions);
     }
 
+    /**
+     * Calls to the model-specific implementation which actually generates the predictions
+     *
+     * @param values            Collection of numerics to movingAvg, usually windowed
+     * @param numPredictions    Number of newly generated predictions to return
+     * @param <T>               Type of numeric
+     * @return                  Returns an array of doubles, since most smoothing methods operate on floating points
+     */
+    protected abstract <T extends Number> double[] doPredict(Collection<T> values, int numPredictions);
+
+    /**
+     * Returns an empty set of predictions, filled with NaNs
+     * @param numPredictions
+     * @return
+     */
     protected double[] emptyPredictions(int numPredictions) {
         double[] predictions = new double[numPredictions];
         Arrays.fill(predictions, Double.NaN);

+ 11 - 0
core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java

@@ -29,6 +29,7 @@ import org.elasticsearch.search.internal.SearchContext;
 
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
@@ -39,6 +40,16 @@ public class SimpleModel extends MovAvgModel {
 
     protected static final ParseField NAME_FIELD = new ParseField("simple");
 
+    @Override
+    protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
+        double[] predictions = new double[numPredictions];
+
+        // EWMA just emits the same final prediction repeatedly.
+        Arrays.fill(predictions, next(values));
+
+        return predictions;
+    }
+
     @Override
     public <T extends Number> double next(Collection<T> values) {
         double avg = 0;

+ 14 - 1
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java

@@ -50,6 +50,7 @@ public class PipelineAggregationHelperTests extends ElasticsearchTestCase {
         ArrayList<MockBucket> values = new ArrayList<>(size);
 
         boolean lastWasGap = false;
+        boolean emptyHisto = true;
 
         for (int i = 0; i < size; i++) {
             MockBucket bucket = new MockBucket();
@@ -70,15 +71,27 @@ public class PipelineAggregationHelperTests extends ElasticsearchTestCase {
                 bucket.count = randomIntBetween(1, 50);
                 bucket.docValues = new double[bucket.count];
                 for (int j = 0; j < bucket.count; j++) {
-                    bucket.docValues[j] = randomDouble() * randomIntBetween(-20,20);
+                    bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
                 }
                 lastWasGap = false;
+                emptyHisto = false;
             }
 
             bucket.key = i * interval;
             values.add(bucket);
         }
 
+        if (emptyHisto) {
+            int idx = randomIntBetween(0, values.size()-1);
+            MockBucket bucket = values.get(idx);
+            bucket.count = randomIntBetween(1, 50);
+            bucket.docValues = new double[bucket.count];
+            for (int j = 0; j < bucket.count; j++) {
+                bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
+            }
+            values.set(idx, bucket);
+        }
+
         return values;
     }
 

+ 55 - 381
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java

@@ -148,15 +148,6 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
             }
         }
 
-        // Used for specially crafted gap tests
-        builders.add(client().prepareIndex("idx", "gap_type").setSource(jsonBuilder().startObject()
-                .field(INTERVAL_FIELD, 0)
-                .field(GAP_FIELD, 1).endObject()));
-
-        builders.add(client().prepareIndex("idx", "gap_type").setSource(jsonBuilder().startObject()
-                .field(INTERVAL_FIELD, 49)
-                .field(GAP_FIELD, 1).endObject()));
-
         for (int i = -10; i < 10; i++) {
             builders.add(client().prepareIndex("neg_idx", "type").setSource(
                     jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD, 10).endObject()));
@@ -204,31 +195,36 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
                 metricValue = target.equals(MetricTarget.VALUE) ? PipelineAggregationHelperTests.calculateMetric(docValues, metric) : mockBucket.count;
             }
 
-            window.offer(metricValue);
-            switch (type) {
-                case SIMPLE:
-                    values.add(simple(window));
-                    break;
-                case LINEAR:
-                    values.add(linear(window));
-                    break;
-                case EWMA:
-                    values.add(ewma(window));
-                    break;
-                case HOLT:
-                    values.add(holt(window));
-                    break;
-                case HOLT_WINTERS:
-                    // HW needs at least 2 periods of data to start
-                    if (window.size() >= period * 2) {
-                        values.add(holtWinters(window));
-                    } else {
-                        values.add(null);
-                    }
-
-                    break;
+            if (window.size() > 0) {
+                switch (type) {
+                    case SIMPLE:
+                        values.add(simple(window));
+                        break;
+                    case LINEAR:
+                        values.add(linear(window));
+                        break;
+                    case EWMA:
+                        values.add(ewma(window));
+                        break;
+                    case HOLT:
+                        values.add(holt(window));
+                        break;
+                    case HOLT_WINTERS:
+                        // HW needs at least 2 periods of data to start
+                        if (window.size() >= period * 2) {
+                            values.add(holtWinters(window));
+                        } else {
+                            values.add(null);
+                        }
+
+                        break;
+                }
+            } else {
+                values.add(null);
             }
 
+            window.offer(metricValue);
+
         }
         testValues.put(type.toString() + "_" + target.toString(), values);
     }
@@ -685,7 +681,10 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
         List<? extends Bucket> buckets = histo.getBuckets();
         assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(25));
 
-        for (int i = 0; i < 20; i++) {
+        SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
+        assertThat(current, nullValue());
+
+        for (int i = 1; i < 20; i++) {
             Bucket bucket = buckets.get(i);
             assertThat(bucket, notNullValue());
             assertThat((long) bucket.getKey(), equalTo((long) i - 10));
@@ -699,7 +698,6 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
         }
 
         for (int i = 20; i < 25; i++) {
-            System.out.println(i);
             Bucket bucket = buckets.get(i);
             assertThat(bucket, notNullValue());
             assertThat((long) bucket.getKey(), equalTo((long) i - 10));
@@ -877,350 +875,6 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
         }
     }
 
-    /**
-     * This test uses the "gap" dataset, which is simply a doc at the beginning and end of
-     * the INTERVAL_FIELD range.  These docs have a value of 1 in GAP_FIELD.
-     * This test verifies that large gaps don't break things, and that the mov avg roughly works
-     * in the correct manner (checks direction of change, but not actual values)
-     */
-    @Test
-    public void testGiantGap() {
-
-        SearchResponse response = client()
-                .prepareSearch("idx").setTypes("gap_type")
-                .addAggregation(
-                        histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
-                                .subAggregation(min("the_metric").field(GAP_FIELD))
-                                .subAggregation(movingAvg("movavg_values")
-                                        .window(windowSize)
-                                        .modelBuilder(randomModelBuilder())
-                                        .gapPolicy(gapPolicy)
-                                        .setBucketsPaths("the_metric"))
-                ).execute().actionGet();
-
-        assertSearchResponse(response);
-
-        InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
-        assertThat(histo, notNullValue());
-        assertThat(histo.getName(), equalTo("histo"));
-        List<? extends Bucket> buckets = histo.getBuckets();
-        assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
-
-        double lastValue = ((SimpleValue)(buckets.get(0).getAggregations().get("movavg_values"))).value();
-        assertThat(Double.compare(lastValue, 0.0d), greaterThanOrEqualTo(0));
-
-        double currentValue;
-        for (int i = 1; i < 49; i++) {
-            SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
-            if (current != null) {
-                currentValue = current.value();
-
-                // Since there are only two values in this test, at the beginning and end, the moving average should
-                // decrease every step (until it reaches zero).  Crude way to check that it's doing the right thing
-                // without actually verifying the computed values.  Should work for all types of moving avgs and
-                // gap policies
-                assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0));
-                lastValue = currentValue;
-            }
-        }
-
-
-        SimpleValue current = buckets.get(49).getAggregations().get("movavg_values");
-        assertThat(current, notNullValue());
-        currentValue = current.value();
-
-        if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
-            // if we are ignoring, movavg could go up (holt) or stay the same (simple, linear, ewma)
-            assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
-        } else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
-            // If we insert zeros, this should always increase the moving avg since the last bucket has a real value
-            assertThat(Double.compare(lastValue, currentValue), equalTo(-1));
-        }
-    }
-
-    /**
-     * Big gap, but with prediction at the end.
-     */
-    @Test
-    public void testGiantGapWithPredict() {
-        int numPredictions = randomIntBetween(1, 10);
-
-        SearchResponse response = client()
-                .prepareSearch("idx").setTypes("gap_type")
-                .addAggregation(
-                        histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
-                                .subAggregation(min("the_metric").field(GAP_FIELD))
-                                .subAggregation(movingAvg("movavg_values")
-                                        .window(windowSize)
-                                        .modelBuilder(randomModelBuilder())
-                                        .gapPolicy(gapPolicy)
-                                        .setBucketsPaths("the_metric")
-                                        .predict(numPredictions))
-                ).execute().actionGet();
-
-        assertSearchResponse(response);
-
-        InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
-        assertThat(histo, notNullValue());
-        assertThat(histo.getName(), equalTo("histo"));
-        List<? extends Bucket> buckets = histo.getBuckets();
-
-        assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions));
-
-
-        double lastValue = ((SimpleValue)(buckets.get(0).getAggregations().get("movavg_values"))).value();
-        assertThat(Double.compare(lastValue, 0.0d), greaterThanOrEqualTo(0));
-
-        double currentValue;
-        for (int i = 1; i < 49; i++) {
-            SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
-            if (current != null) {
-                currentValue = current.value();
-
-                // Since there are only two values in this test, at the beginning and end, the moving average should
-                // decrease every step (until it reaches zero).  Crude way to check that it's doing the right thing
-                // without actually verifying the computed values.  Should work for all types of moving avgs and
-                // gap policies
-                assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0));
-                lastValue = currentValue;
-            }
-        }
-
-        SimpleValue current = buckets.get(49).getAggregations().get("movavg_values");
-        assertThat(current, notNullValue());
-        currentValue = current.value();
-
-        if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
-            // if we are ignoring, movavg could go up (holt) or stay the same (simple, linear, ewma)
-            assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
-        } else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
-            // If we insert zeros, this should always increase the moving avg since the last bucket has a real value
-            assertThat(Double.compare(lastValue, currentValue), equalTo(-1));
-        }
-
-        // Now check predictions
-        for (int i = 50; i < 50 + numPredictions; i++) {
-            // Unclear at this point which direction the predictions will go, just verify they are
-            // not null, and that we don't have the_metric anymore
-            assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
-            assertThat((buckets.get(i).getAggregations().get("the_metric")), nullValue());
-        }
-    }
-
-    /**
-     * This test filters the "gap" data so that the first doc is excluded.  This leaves a long stretch of empty
-     * buckets until the final bucket.  The moving avg should be zero up until the last bucket, and should work
-     * regardless of mov avg type or gap policy.
-     */
-    @Test
-    public void testLeftGap() {
-        SearchResponse response = client()
-                .prepareSearch("idx").setTypes("gap_type")
-                .addAggregation(
-                        filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).from(1)).subAggregation(
-                                histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
-                                        .subAggregation(randomMetric("the_metric", GAP_FIELD))
-                                        .subAggregation(movingAvg("movavg_values")
-                                                .window(windowSize)
-                                                .modelBuilder(randomModelBuilder())
-                                                .gapPolicy(gapPolicy)
-                                                .setBucketsPaths("the_metric"))
-                        ))
-                .execute().actionGet();
-
-        assertSearchResponse(response);
-
-        InternalFilter filtered = response.getAggregations().get("filtered");
-        assertThat(filtered, notNullValue());
-        assertThat(filtered.getName(), equalTo("filtered"));
-
-        InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
-        assertThat(histo, notNullValue());
-        assertThat(histo.getName(), equalTo("histo"));
-        List<? extends Bucket> buckets = histo.getBuckets();
-        assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
-
-        double lastValue = 0;
-
-        double currentValue;
-        for (int i = 0; i < 50; i++) {
-            SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
-            if (current != null) {
-                currentValue = current.value();
-
-                assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
-                lastValue = currentValue;
-            }
-        }
-    }
-
-    @Test
-    public void testLeftGapWithPredict() {
-        int numPredictions = randomIntBetween(1, 10);
-        SearchResponse response = client()
-                .prepareSearch("idx").setTypes("gap_type")
-                .addAggregation(
-                        filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).from(1)).subAggregation(
-                                histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
-                                        .subAggregation(randomMetric("the_metric", GAP_FIELD))
-                                        .subAggregation(movingAvg("movavg_values")
-                                                .window(windowSize)
-                                                .modelBuilder(randomModelBuilder())
-                                                .gapPolicy(gapPolicy)
-                                                .setBucketsPaths("the_metric")
-                                                .predict(numPredictions))
-                        ))
-                .execute().actionGet();
-
-        assertSearchResponse(response);
-
-        InternalFilter filtered = response.getAggregations().get("filtered");
-        assertThat(filtered, notNullValue());
-        assertThat(filtered.getName(), equalTo("filtered"));
-
-        InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
-        assertThat(histo, notNullValue());
-        assertThat(histo.getName(), equalTo("histo"));
-        List<? extends Bucket> buckets = histo.getBuckets();
-
-        assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions));
-
-
-        double lastValue = 0;
-
-        double currentValue;
-        for (int i = 0; i < 50; i++) {
-            SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
-            if (current != null) {
-                currentValue = current.value();
-
-                assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
-                lastValue = currentValue;
-            }
-        }
-
-        // Now check predictions
-        for (int i = 50; i < 50 + numPredictions; i++) {
-            // Unclear at this point which direction the predictions will go, just verify they are
-            // not null, and that we don't have the_metric anymore
-            assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
-            assertThat((buckets.get(i).getAggregations().get("the_metric")), nullValue());
-        }
-    }
-
-    /**
-     * This test filters the "gap" data so that the last doc is excluded.  This leaves a long stretch of empty
-     * buckets after the first bucket.
-     */
-    @Test
-    public void testRightGap() {
-        SearchResponse response = client()
-                .prepareSearch("idx").setTypes("gap_type")
-                .addAggregation(
-                        filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).to(1)).subAggregation(
-                                histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
-                                        .subAggregation(randomMetric("the_metric", GAP_FIELD))
-                                        .subAggregation(movingAvg("movavg_values")
-                                                .window(windowSize)
-                                                .modelBuilder(randomModelBuilder())
-                                                .gapPolicy(gapPolicy)
-                                                .setBucketsPaths("the_metric"))
-                        ))
-                .execute().actionGet();
-
-        assertSearchResponse(response);
-
-        InternalFilter filtered = response.getAggregations().get("filtered");
-        assertThat(filtered, notNullValue());
-        assertThat(filtered.getName(), equalTo("filtered"));
-
-        InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
-        assertThat(histo, notNullValue());
-        assertThat(histo.getName(), equalTo("histo"));
-        List<? extends Bucket> buckets = histo.getBuckets();
-        assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
-
-
-        SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
-        assertThat(current, notNullValue());
-
-        double lastValue = current.value();
-
-        double currentValue;
-        for (int i = 1; i < 50; i++) {
-            current = buckets.get(i).getAggregations().get("movavg_values");
-            if (current != null) {
-                currentValue = current.value();
-
-                assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0));
-                lastValue = currentValue;
-            }
-        }
-    }
-
-    @Test
-    public void testRightGapWithPredict() {
-        int numPredictions = randomIntBetween(1, 10);
-        SearchResponse response = client()
-                .prepareSearch("idx").setTypes("gap_type")
-                .addAggregation(
-                        filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).to(1)).subAggregation(
-                                histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
-                                        .subAggregation(randomMetric("the_metric", GAP_FIELD))
-                                        .subAggregation(movingAvg("movavg_values")
-                                                .window(windowSize)
-                                                .modelBuilder(randomModelBuilder())
-                                                .gapPolicy(gapPolicy)
-                                                .setBucketsPaths("the_metric")
-                                                .predict(numPredictions))
-                        ))
-                .execute().actionGet();
-
-        assertSearchResponse(response);
-
-        InternalFilter filtered = response.getAggregations().get("filtered");
-        assertThat(filtered, notNullValue());
-        assertThat(filtered.getName(), equalTo("filtered"));
-
-        InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
-        assertThat(histo, notNullValue());
-        assertThat(histo.getName(), equalTo("histo"));
-        List<? extends Bucket> buckets = histo.getBuckets();
-
-        // If we are skipping, there will only be predictions at the very beginning and won't append any new buckets
-        if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
-            assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
-        } else {
-            assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions));
-        }
-
-        // Unlike left-gap tests, we cannot check the slope of prediction for right-gap. E.g. linear will
-        // converge on zero, but holt-linear may trend upwards based on the first value
-        // Just check for non-nullness
-        SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
-        assertThat(current, notNullValue());
-
-        // If we are skipping, there will only be predictions at the very beginning and won't append any new buckets
-        if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
-            // Now check predictions
-            for (int i = 1; i < 1 + numPredictions; i++) {
-                // Unclear at this point which direction the predictions will go, just verify they are
-                // not null
-                assertThat(buckets.get(i).getDocCount(), equalTo(0L));
-                assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
-            }
-        } else {
-            // Otherwise we'll have some predictions at the end
-            for (int i = 50; i < 50 + numPredictions; i++) {
-                // Unclear at this point which direction the predictions will go, just verify they are
-                // not null
-                assertThat(buckets.get(i).getDocCount(), equalTo(0L));
-                assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
-            }
-        }
-
-    }
-
     @Test
     public void testHoltWintersNotEnoughData() {
         try {
@@ -1288,8 +942,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
         assertThat(avgAgg.value(), equalTo(10d));
 
         SimpleValue movAvgAgg = bucket.getAggregations().get("avg_movavg");
-        assertThat(movAvgAgg, notNullValue());
-        assertThat(movAvgAgg.value(), equalTo(10d));
+        assertThat(movAvgAgg, nullValue());
 
         Derivative deriv = bucket.getAggregations().get("deriv");
         assertThat(deriv, nullValue());
@@ -1297,7 +950,28 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
         SimpleValue derivMovAvg = bucket.getAggregations().get("deriv_movavg");
         assertThat(derivMovAvg, nullValue());
 
-        for (int i = 1; i < 12; i++) {
+        // Second bucket
+        bucket = buckets.get(1);
+        assertThat(bucket, notNullValue());
+        assertThat((long) bucket.getKey(), equalTo(1L));
+        assertThat(bucket.getDocCount(), equalTo(1l));
+
+        avgAgg = bucket.getAggregations().get("avg");
+        assertThat(avgAgg, notNullValue());
+        assertThat(avgAgg.value(), equalTo(10d));
+
+        deriv = bucket.getAggregations().get("deriv");
+        assertThat(deriv, notNullValue());
+        assertThat(deriv.value(), equalTo(0d));
+
+        movAvgAgg = bucket.getAggregations().get("avg_movavg");
+        assertThat(movAvgAgg, notNullValue());
+        assertThat(movAvgAgg.value(), equalTo(10d));
+
+        derivMovAvg = bucket.getAggregations().get("deriv_movavg");
+        assertThat(derivMovAvg, Matchers.nullValue());                 // still null because of movavg delay
+
+        for (int i = 2; i < 12; i++) {
             bucket = buckets.get(i);
             assertThat(bucket, notNullValue());
             assertThat((long) bucket.getKey(), equalTo((long) i));

+ 47 - 35
core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java

@@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg;
 
 import com.google.common.collect.EvictingQueue;
 
-import org.elasticsearch.search.SearchParseException;
 import org.elasticsearch.search.aggregations.pipeline.movavg.models.*;
 import org.elasticsearch.test.ElasticsearchTestCase;
 
@@ -47,7 +46,10 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
             double randValue = randomDouble();
             double expected = 0;
 
-            window.offer(randValue);
+            if (i == 0) {
+                window.offer(randValue);
+                continue;
+            }
 
             for (double value : window) {
                 expected += value;
@@ -56,6 +58,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
 
             double actual = model.next(window);
             assertThat(Double.compare(expected, actual), equalTo(0));
+            window.offer(randValue);
         }
     }
 
@@ -64,7 +67,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         MovAvgModel model = new SimpleModel();
 
         int windowSize = randomIntBetween(1, 50);
-        int numPredictions = randomIntBetween(1,50);
+        int numPredictions = randomIntBetween(1, 50);
 
         EvictingQueue<Double> window = EvictingQueue.create(windowSize);
         for (int i = 0; i < windowSize; i++) {
@@ -73,13 +76,12 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         double actual[] = model.predict(window, numPredictions);
 
         double expected[] = new double[numPredictions];
-        for (int i = 0; i < numPredictions; i++) {
-            for (double value : window) {
-                expected[i] += value;
-            }
-            expected[i] /= window.size();
-            window.offer(expected[i]);
+        double t = 0;
+        for (double value : window) {
+            t += value;
         }
+        t /= window.size();
+        Arrays.fill(expected, t);
 
         for (int i = 0; i < numPredictions; i++) {
             assertThat(Double.compare(expected[i], actual[i]), equalTo(0));
@@ -96,7 +98,11 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         EvictingQueue<Double> window = EvictingQueue.create(windowSize);
         for (int i = 0; i < numValues; i++) {
             double randValue = randomDouble();
-            window.offer(randValue);
+
+            if (i == 0) {
+                window.offer(randValue);
+                continue;
+            }
 
             double avg = 0;
             long totalWeight = 1;
@@ -110,6 +116,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
             double expected = avg / totalWeight;
             double actual = model.next(window);
             assertThat(Double.compare(expected, actual), equalTo(0));
+            window.offer(randValue);
         }
     }
 
@@ -127,19 +134,17 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         double actual[] = model.predict(window, numPredictions);
         double expected[] = new double[numPredictions];
 
-        for (int i = 0; i < numPredictions; i++) {
-            double avg = 0;
-            long totalWeight = 1;
-            long current = 1;
+        double avg = 0;
+        long totalWeight = 1;
+        long current = 1;
 
-            for (double value : window) {
-                avg += value * current;
-                totalWeight += current;
-                current += 1;
-            }
-            expected[i] = avg / totalWeight;
-            window.offer(expected[i]);
+        for (double value : window) {
+            avg += value * current;
+            totalWeight += current;
+            current += 1;
         }
+        avg = avg / totalWeight;
+        Arrays.fill(expected, avg);
 
         for (int i = 0; i < numPredictions; i++) {
             assertThat(Double.compare(expected[i], actual[i]), equalTo(0));
@@ -157,7 +162,11 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         EvictingQueue<Double> window = EvictingQueue.create(windowSize);
         for (int i = 0; i < numValues; i++) {
             double randValue = randomDouble();
-            window.offer(randValue);
+
+            if (i == 0) {
+                window.offer(randValue);
+                continue;
+            }
 
             double avg = 0;
             boolean first = true;
@@ -173,6 +182,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
             double expected = avg;
             double actual = model.next(window);
             assertThat(Double.compare(expected, actual), equalTo(0));
+            window.offer(randValue);
         }
     }
 
@@ -191,21 +201,18 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         double actual[] = model.predict(window, numPredictions);
         double expected[] = new double[numPredictions];
 
-        for (int i = 0; i < numPredictions; i++) {
-            double avg = 0;
-            boolean first = true;
+        double avg = 0;
+        boolean first = true;
 
-            for (double value : window) {
-                if (first) {
-                    avg = value;
-                    first = false;
-                } else {
-                    avg = (value * alpha) + (avg * (1 - alpha));
-                }
+        for (double value : window) {
+            if (first) {
+                avg = value;
+                first = false;
+            } else {
+                avg = (value * alpha) + (avg * (1 - alpha));
             }
-            expected[i] = avg;
-            window.offer(expected[i]);
         }
+        Arrays.fill(expected, avg);
 
         for (int i = 0; i < numPredictions; i++) {
             assertThat(Double.compare(expected[i], actual[i]), equalTo(0));
@@ -224,7 +231,11 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
         EvictingQueue<Double> window = EvictingQueue.create(windowSize);
         for (int i = 0; i < numValues; i++) {
             double randValue = randomDouble();
-            window.offer(randValue);
+
+            if (i == 0) {
+                window.offer(randValue);
+                continue;
+            }
 
             double s = 0;
             double last_s = 0;
@@ -253,6 +264,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
             double expected = s + (0 * b) ;
             double actual = model.next(window);
             assertThat(Double.compare(expected, actual), equalTo(0));
+            window.offer(randValue);
         }
     }