Browse Source

Add keep_values gap policy (#73297)

Adds a new keep_values gap policy that works like skip, except if the metric
calculated on an empty bucket provides a non-null non-NaN value, this value is
used for the bucket.

Fixes #27377

Co-authored-by: Mark Tozzi <mark.tozzi@gmail.com>
Igor Motov 4 years ago
parent
commit
db36b6c89a

+ 4 - 0
docs/reference/aggregations/pipeline.asciidoc

@@ -267,6 +267,10 @@ _insert_zeros_::
                 This option will replace missing values with a zero (`0`) and pipeline aggregation computation will
                 proceed as normal.
 
+_keep_values_::
+                This option is similar to skip, except if the metric provides a non-null, non-NaN value this value is
+                used, otherwise the empty bucket is skipped.
+
 include::pipeline/avg-bucket-aggregation.asciidoc[]
 
 include::pipeline/bucket-script-aggregation.asciidoc[]

+ 1 - 1
server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java

@@ -557,7 +557,7 @@ public class DerivativeIT extends ESIntegTestCase {
             checkBucketKeyAndDocCount("InternalBucket " + i, bucket, i, valueCounts_empty_rnd[i]);
             Sum sum = bucket.getAggregations().get("sum");
             double thisSumValue = sum.value();
-            if (bucket.getDocCount() == 0) {
+            if (bucket.getDocCount() == 0 && gapPolicy != GapPolicy.KEEP_VALUES) {
                 thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN;
             }
             SimpleValue sumDeriv = bucket.getAggregations().get("deriv");

+ 7 - 6
server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java

@@ -127,7 +127,7 @@ public class SerialDiffIT extends ESIntegTestCase {
         numBuckets = randomIntBetween(10, 80);
         lag = randomIntBetween(1, numBuckets / 2);
 
-        gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS;
+        gapPolicy = randomFrom(BucketHelpers.GapPolicy.values());
         metric = randomMetric("the_metric", VALUE_FIELD);
         mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());
 
@@ -171,6 +171,12 @@ public class SerialDiffIT extends ESIntegTestCase {
                     metricValue = 0.0;
                 } else {
                     metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric);
+                    if (gapPolicy.equals(BucketHelpers.GapPolicy.KEEP_VALUES)) {
+                        if (Double.isInfinite(metricValue) || Double.isNaN(metricValue)) {
+                            // serial diff ignores these values and replaces them with null
+                            metricValue = Double.NaN;
+                        }
+                    }
                 }
 
             } else {
@@ -204,13 +210,8 @@ public class SerialDiffIT extends ESIntegTestCase {
             }
 
             lagWindow.add(metricValue);
-
-
-
-
         }
 
-
         testValues.put(target.toString(), values);
     }
 

+ 41 - 12
server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java

@@ -41,9 +41,40 @@ public class BucketHelpers {
      *
      * "insert_zeros": empty buckets will be filled with zeros for all metrics
      * "skip": empty buckets will simply be ignored
+     * "keep_values": for empty buckets the values provided by the metrics will still be used if they are available
      */
     public enum GapPolicy implements Writeable {
-        INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");
+        INSERT_ZEROS((byte) 0, "insert_zeros", false) {
+            @Override
+            public Double processValue(long docCount, Double value) {
+                if (Double.isInfinite(value) || Double.isNaN(value) || docCount == 0) {
+                    return 0.0;
+                } else {
+                    return value;
+                }
+            }
+        },
+
+        SKIP((byte) 1, "skip", true) {
+            @Override
+            public Double processValue(long docCount, Double value) {
+                if (Double.isInfinite(value) || docCount == 0) {
+                    return Double.NaN;
+                } else {
+                    return value;
+                }
+            }
+        },
+
+        KEEP_VALUES((byte) 2, "keep_values", true) {
+            public Double processValue(long docCount, Double value) {
+                if (Double.isInfinite(value) || Double.isNaN(value)) {
+                    return Double.NaN;
+                } else {
+                    return value;
+                }
+            }
+        };
 
         /**
          * Parse a string GapPolicy into the byte enum
@@ -76,10 +107,12 @@ public class BucketHelpers {
 
         private final byte id;
         private final ParseField parseField;
+        public final boolean isSkippable;
 
-        GapPolicy(byte id, String name) {
+        GapPolicy(byte id, String name, boolean isSkippable) {
             this.id = id;
             this.parseField = new ParseField(name);
+            this.isSkippable = isSkippable;
         }
 
         /**
@@ -113,6 +146,8 @@ public class BucketHelpers {
         public String getName() {
             return parseField.getPreferredName();
         }
+
+        public abstract Double processValue(long docCount, Double value);
     }
 
     /**
@@ -161,18 +196,12 @@ public class BucketHelpers {
                     throw formatResolutionError(agg, aggPathAsList, propertyValue);
                 }
                 // doc count never has missing values so gap policy doesn't apply here
-                boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0));
-                if (Double.isInfinite(value) || Double.isNaN(value) || (bucket.getDocCount() == 0 && isDocCountProperty == false)) {
-                    switch (gapPolicy) {
-                    case INSERT_ZEROS:
-                        return 0.0;
-                    case SKIP:
-                    default:
-                        return Double.NaN;
-                    }
-                } else {
+                if (aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0))) {
                     return value;
+                } else {
+                    return gapPolicy.processValue(bucket.getDocCount(), value);
                 }
+
             }
         } catch (InvalidAggregationPathException e) {
             return null;

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java

@@ -60,7 +60,7 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
                 String varName = entry.getKey();
                 String bucketsPath = entry.getValue();
                 Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
-                if (GapPolicy.SKIP == gapPolicy && (value == null || Double.isNaN(value))) {
+                if (gapPolicy.isSkippable && (value == null || Double.isNaN(value))) {
                     skipBucket = true;
                     break;
                 }

+ 1 - 1
server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java

@@ -95,7 +95,7 @@ public class BucketSortPipelineAggregator extends PipelineAggregator {
                     resolved.put(sort, (Comparable<Object>) internalBucket.getKey());
                 } else {
                     Double bucketValue = BucketHelpers.resolveBucketValue(parentAgg, internalBucket, sortField, gapPolicy);
-                    if (GapPolicy.SKIP == gapPolicy && Double.isNaN(bucketValue)) {
+                    if (gapPolicy.isSkippable && Double.isNaN(bucketValue)) {
                         continue;
                     }
                     resolved.put(sort, (Comparable<Object>) (Object) bucketValue);

+ 9 - 1
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeAggregatorTests.java

@@ -586,7 +586,15 @@ public class DerivativeAggregatorTests extends AggregatorTestCase {
                     Sum sum = bucket.getAggregations().get("sum");
                     double thisSumValue = sum.value();
                     if (bucket.getDocCount() == 0) {
-                        thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN;
+                        switch (gapPolicy) {
+                            case INSERT_ZEROS:
+                                thisSumValue = 0;
+                                break;
+                            case KEEP_VALUES:
+                                break;
+                            default:
+                                thisSumValue = Double.NaN;
+                        }
                     }
                     SimpleValue sumDeriv = bucket.getAggregations().get("deriv");
                     if (i == 0) {

+ 5 - 1
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/GapPolicyTests.java

@@ -25,26 +25,30 @@ public class GapPolicyTests extends AbstractWriteableEnumTestCase {
     public void testValidOrdinals() {
         assertThat(BucketHelpers.GapPolicy.INSERT_ZEROS.ordinal(), equalTo(0));
         assertThat(BucketHelpers.GapPolicy.SKIP.ordinal(), equalTo(1));
+        assertThat(BucketHelpers.GapPolicy.KEEP_VALUES.ordinal(), equalTo(2));
     }
 
     @Override
     public void testFromString() {
         assertThat(BucketHelpers.GapPolicy.parse("insert_zeros", null), equalTo(BucketHelpers.GapPolicy.INSERT_ZEROS));
         assertThat(BucketHelpers.GapPolicy.parse("skip", null), equalTo(BucketHelpers.GapPolicy.SKIP));
+        assertThat(BucketHelpers.GapPolicy.parse("keep_values", null), equalTo(BucketHelpers.GapPolicy.KEEP_VALUES));
         ParsingException e = expectThrows(ParsingException.class, () -> BucketHelpers.GapPolicy.parse("does_not_exist", null));
         assertThat(e.getMessage(),
-            equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip]"));
+            equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip, keep_values]"));
     }
 
     @Override
     public void testReadFrom() throws IOException {
         assertReadFromStream(0, BucketHelpers.GapPolicy.INSERT_ZEROS);
         assertReadFromStream(1, BucketHelpers.GapPolicy.SKIP);
+        assertReadFromStream(2, BucketHelpers.GapPolicy.KEEP_VALUES);
     }
 
     @Override
     public void testWriteTo() throws IOException {
         assertWriteToStream(BucketHelpers.GapPolicy.INSERT_ZEROS, 0);
         assertWriteToStream(BucketHelpers.GapPolicy.SKIP, 1);
+        assertWriteToStream(BucketHelpers.GapPolicy.KEEP_VALUES, 2);
     }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java

@@ -129,7 +129,7 @@ public class PipelineAggregationHelperTests extends ESTestCase {
             for (double value : values) {
                 accumulator += value;
             }
-            return accumulator / values.length;
+            return values.length == 0 ? Double.NaN : accumulator / values.length ;
         }
 
         return 0.0;