Browse Source

[ML] Fix the position of spike, dip and distribution changes bucket when the sibling aggregation includes empty buckets (#106472)

There was a bug in our indexing of the spike, dip and distribution change bucket. Specifically, we
were not mapping the index in the values array, which skips empty buckets, back to the aggregation
bucket index. The effect was that the reported buckets were offset to the left when the spike, dip
or distribution change occurred after one or more empty buckets.
Tom Veasey 1 year ago
parent
commit
628689f0eb

+ 6 - 0
docs/changelog/106472.yaml

@@ -0,0 +1,6 @@
+pr: 106472
+summary: "Fix the position of spike, dip and distribution changes bucket when the\
+  \ sibling aggregation includes empty buckets"
+area: Machine Learning
+type: bug
+issues: []

+ 5 - 5
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

@@ -93,13 +93,13 @@ public class ChangePointAggregator extends SiblingPipelineAggregator {
 
     @Override
     public InternalAggregation doReduce(InternalAggregations aggregations, AggregationReduceContext context) {
-        Optional<MlAggsHelper.DoubleBucketValues> maybeBucketsValue = extractDoubleBucketedValues(
+        Optional<MlAggsHelper.DoubleBucketValues> maybeBucketValues = extractDoubleBucketedValues(
             bucketsPaths()[0],
             aggregations,
             BucketHelpers.GapPolicy.SKIP,
             true
         );
-        if (maybeBucketsValue.isEmpty()) {
+        if (maybeBucketValues.isEmpty()) {
             return new InternalChangePointAggregation(
                 name(),
                 metadata(),
@@ -107,7 +107,7 @@ public class ChangePointAggregator extends SiblingPipelineAggregator {
                 new ChangeType.Indeterminable("unable to find valid bucket values in bucket path [" + bucketsPaths()[0] + "]")
             );
         }
-        MlAggsHelper.DoubleBucketValues bucketValues = maybeBucketsValue.get();
+        MlAggsHelper.DoubleBucketValues bucketValues = maybeBucketValues.get();
         if (bucketValues.getValues().length < (2 * MINIMUM_BUCKETS) + 2) {
             return new InternalChangePointAggregation(
                 name(),
@@ -146,7 +146,7 @@ public class ChangePointAggregator extends SiblingPipelineAggregator {
     static ChangeType testForSpikeOrDip(MlAggsHelper.DoubleBucketValues bucketValues, double pValueThreshold) {
         try {
             SpikeAndDipDetector detect = new SpikeAndDipDetector(bucketValues.getValues());
-            ChangeType result = detect.at(pValueThreshold);
+            ChangeType result = detect.at(pValueThreshold, bucketValues);
             logger.trace("spike or dip p-value: [{}]", result.pValue());
             return result;
         } catch (NotStrictlyPositiveException nspe) {
@@ -552,7 +552,7 @@ public class ChangePointAggregator extends SiblingPipelineAggregator {
                 case TREND_CHANGE:
                     return new ChangeType.TrendChange(pValueVsStationary(), rSquared(), bucketValues.getBucketIndex(changePoint));
                 case DISTRIBUTION_CHANGE:
-                    return new ChangeType.DistributionChange(pValue, changePoint);
+                    return new ChangeType.DistributionChange(pValue, bucketValues.getBucketIndex(changePoint));
             }
             throw new RuntimeException("Unknown change type [" + type + "].");
         }

+ 10 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetector.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.ml.aggs.changepoint;
 
+import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
+
 import java.util.Arrays;
 
 /**
@@ -133,29 +135,29 @@ final class SpikeAndDipDetector {
         spikeTestKDE = new KDE(spikeKDEValues, 1.36);
     }
 
-    ChangeType at(double pValueThreshold) {
+    ChangeType at(double pValueThreshold, MlAggsHelper.DoubleBucketValues bucketValues) {
         if (dipIndex == -1 || spikeIndex == -1) {
             return new ChangeType.Indeterminable(
                 "not enough buckets to check for dip or spike. Requires at least [3]; found [" + numValues + "]"
             );
         }
 
-        KDE.ValueAndMagnitude dipLeftLeftTailTest = dipTestKDE.cdf(dipValue);
+        KDE.ValueAndMagnitude dipLeftTailTest = dipTestKDE.cdf(dipValue);
         KDE.ValueAndMagnitude spikeRightTailTest = spikeTestKDE.sf(spikeValue);
-        double dipPValue = dipLeftLeftTailTest.pValue(numValues);
+        double dipPValue = dipLeftTailTest.pValue(numValues);
         double spikePValue = spikeRightTailTest.pValue(numValues);
 
         if (dipPValue < pValueThreshold && spikePValue < pValueThreshold) {
-            if (dipLeftLeftTailTest.isMoreSignificant(spikeRightTailTest)) {
-                return new ChangeType.Dip(dipPValue, dipIndex);
+            if (dipLeftTailTest.isMoreSignificant(spikeRightTailTest)) {
+                return new ChangeType.Dip(dipPValue, bucketValues.getBucketIndex(dipIndex));
             }
-            return new ChangeType.Spike(spikePValue, spikeIndex);
+            return new ChangeType.Spike(spikePValue, bucketValues.getBucketIndex(spikeIndex));
         }
         if (dipPValue < pValueThreshold) {
-            return new ChangeType.Dip(dipPValue, dipIndex);
+            return new ChangeType.Dip(dipPValue, bucketValues.getBucketIndex(dipIndex));
         }
         if (spikePValue < pValueThreshold) {
-            return new ChangeType.Spike(spikePValue, spikeIndex);
+            return new ChangeType.Spike(spikePValue, bucketValues.getBucketIndex(spikeIndex));
         }
         return new ChangeType.Stationary();
     }

+ 25 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/SpikeAndDipDetectorTests.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.ml.aggs.changepoint;
 
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
 
 import java.util.Arrays;
 
@@ -19,10 +20,13 @@ public class SpikeAndDipDetectorTests extends ESTestCase {
 
     public void testTooLittleData() {
         for (int i = 0; i < 4; i++) {
+            long[] docCounts = new long[i];
             double[] values = new double[i];
+            Arrays.fill(docCounts, 1);
             Arrays.fill(values, 1.0);
+            MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values);
             SpikeAndDipDetector detect = new SpikeAndDipDetector(values);
-            assertThat(detect.at(0.01), instanceOf(ChangeType.Indeterminable.class));
+            assertThat(detect.at(0.01, bucketValues), instanceOf(ChangeType.Indeterminable.class));
         }
     }
 
@@ -142,24 +146,42 @@ public class SpikeAndDipDetectorTests extends ESTestCase {
         // Check vs some expected values.
 
         {
+            long[] docCounts = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
             double[] values = new double[] { 0.1, 3.1, 1.2, 1.7, 0.9, 2.3, -0.8, 3.2, 1.2, 1.3, 1.1, 1.0, 8.5, 0.5, 2.6, 0.7 };
+            MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values);
 
             SpikeAndDipDetector detect = new SpikeAndDipDetector(values);
 
-            ChangeType change = detect.at(0.05);
+            ChangeType change = detect.at(0.05, bucketValues);
 
             assertThat(change, instanceOf(ChangeType.Spike.class));
             assertThat(change.pValue(), closeTo(3.0465e-12, 1e-15));
         }
         {
+            long[] docCounts = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
             double[] values = new double[] { 0.1, 3.1, 1.2, 1.7, 0.9, 2.3, -4.2, 3.2, 1.2, 1.3, 1.1, 1.0, 3.5, 0.5, 2.6, 0.7 };
+            MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values);
 
             SpikeAndDipDetector detect = new SpikeAndDipDetector(values);
 
-            ChangeType change = detect.at(0.05);
+            ChangeType change = detect.at(0.05, bucketValues);
 
             assertThat(change, instanceOf(ChangeType.Dip.class));
             assertThat(change.pValue(), closeTo(1.2589e-08, 1e-11));
         }
     }
+
+    public void testMissingBuckets() {
+        long[] docCounts = new long[] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+        double[] values = new double[] { 1.0, 2.0, 0.7, 1.0, 1.5, 1.1, 2.2, 10.0, 0.3, 0.4, 0.7, 0.9, 1.4, 2.1, 1.2, 1.0 };
+        int[] buckets = new int[] { 0, 2, 3, 6, 7, 8, 9, 10, 11, 12, 13, 15, 17, 18, 19, 20 };
+        MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(docCounts, values, buckets);
+
+        SpikeAndDipDetector detect = new SpikeAndDipDetector(values);
+
+        ChangeType change = detect.at(0.01, bucketValues);
+
+        assertThat(change, instanceOf(ChangeType.Spike.class));
+        assertThat(change.changePoint(), equalTo(10));
+    }
 }