浏览代码

[ML] fix weird change_point bug where all data values are equivalent (#97588)

If a user calls change_point aggregation and all the bucket values are exactly the same, we may run into weird floating point errors when calculating statistics. When we have no variance and standard deviation is 0, we should indicate that there is no change point and its a stationary set of data.
Benjamin Trent 2 年之前
父节点
当前提交
6a15ecfa38

+ 5 - 0
docs/changelog/97588.yaml

@@ -0,0 +1,5 @@
+pr: 97588
+summary: Fix weird `change_point` bug where all data values are equivalent
+area: Machine Learning
+type: bug
+issues: []

+ 10 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregator.java

@@ -7,9 +7,12 @@
 
 package org.elasticsearch.xpack.ml.aggs.changepoint;
 
+import org.apache.commons.math3.exception.NotStrictlyPositiveException;
 import org.apache.commons.math3.special.Beta;
 import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
 import org.apache.commons.math3.stat.regression.SimpleRegression;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.aggregations.AggregationReduceContext;
 import org.elasticsearch.search.aggregations.Aggregations;
@@ -32,6 +35,8 @@ import static org.elasticsearch.xpack.ml.aggs.MlAggsHelper.extractDoubleBucketed
 
 public class ChangePointAggregator extends SiblingPipelineAggregator {
 
+    private static final Logger logger = LogManager.getLogger(ChangePointAggregator.class);
+
     static final double P_VALUE_THRESHOLD = 0.025;
     private static final int MINIMUM_BUCKETS = 10;
     private static final int MAXIMUM_CANDIDATE_CHANGE_POINTS = 1000;
@@ -85,7 +90,11 @@ public class ChangePointAggregator extends SiblingPipelineAggregator {
         Tuple<int[], Integer> candidatePoints = candidateChangePoints(bucketValues.getValues());
         ChangeType changeType = changePValue(bucketValues, candidatePoints, P_VALUE_THRESHOLD);
         if (changeType.pValue() > P_VALUE_THRESHOLD) {
-            changeType = maxDeviationKdePValue(bucketValues, P_VALUE_THRESHOLD);
+            try {
+                changeType = maxDeviationKdePValue(bucketValues, P_VALUE_THRESHOLD);
+            } catch (NotStrictlyPositiveException nspe) {
+                logger.debug("failure calculating spikes", nspe);
+            }
         }
         ChangePointBucket changePointBucket = null;
         if (changeType.changePoint() >= 0) {

+ 5 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/changepoint/ChangePointAggregatorTests.java

@@ -168,6 +168,11 @@ public class ChangePointAggregatorTests extends AggregatorTestCase {
         );
     }
 
+    public void testZeroDeviation() throws IOException {
+        double[] bucketValues = DoubleStream.generate(() -> 4243.1621621621625).limit(30).toArray();
+        testChangeType(bucketValues, changeType -> { assertThat(changeType, instanceOf(ChangeType.Stationary.class)); });
+    }
+
     public void testStepChangeEdgeCaseScenarios() throws IOException {
         double[] bucketValues = new double[] {
             214505.0,