Browse Source

Avoid return negative value in CounterMetric (#71446)

A CounterMetric is used to track the number of completed and outstanding 
items, for example, the number of executed refreshes, the currently used
memory by indexing, the current pending search requests. In all cases,
the current count of CounterMetric is always non-negative.

However, as this metric is implemented using a LongAdder, the returned
count is NOT an atomic snapshot; invocation in the absence of concurrent
updates returns an accurate result, but concurrent updates that occur
while the sum is being calculated might not be incorporated.

We can replace LongAdder with AtomicLong, but this commit chooses to 
continue using LongAdder but returns 0 when the sum value is negative.

Relates #52411
Closes #70968
Nhat Nguyen 4 years ago
parent
commit
f887cf28d1

+ 38 - 2
server/src/main/java/org/elasticsearch/common/metrics/CounterMetric.java

@@ -8,29 +8,65 @@
 
 package org.elasticsearch.common.metrics;
 
+import org.elasticsearch.Assertions;
+
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
-public class CounterMetric implements Metric {
 
+/**
+ * A {@link CounterMetric} is used to track the number of completed and outstanding items, for example, the number of executed refreshes,
+ * the currently used memory by indexing, the current pending search requests. In both cases, the current {@link CounterMetric#count} is
+ * always non-negative.
+ */
+public final class CounterMetric {
     private final LongAdder counter = new LongAdder();
+    private final AtomicLong assertingCounter = Assertions.ENABLED ? new AtomicLong() : null;
+
+    private boolean assertNonNegative(long n) {
+        assert n >= 0 : "CounterMetric value must always be non-negative; got: " + n;
+        return true;
+    }
 
     public void inc() {
         counter.increment();
+        assert assertNonNegative(assertingCounter.incrementAndGet());
     }
 
     public void inc(long n) {
         counter.add(n);
+        assert assertNonNegative(assertingCounter.addAndGet(n));
     }
 
     public void dec() {
         counter.decrement();
+        assert assertNonNegative(assertingCounter.decrementAndGet());
     }
 
     public void dec(long n) {
         counter.add(-n);
+        assert assertNonNegative(assertingCounter.addAndGet(-n));
     }
 
+    /**
+     * Returns the current count of this metric. The returned value is always non-negative.
+     * <p>
+     * As this metric is implemented using a {@link LongAdder}, the returned value is NOT an atomic snapshot;
+     * invocation in the absence of concurrent updates returns an accurate result, but concurrent updates that
+     * occur while the sum is being calculated might not be incorporated.
+     *
+     * @see LongAdder#sum()
+     */
     public long count() {
-        return counter.sum();
+        // The `counter.sum()` value is expected to always be non-negative. And if it's negative, then some concurrent updates
+        // aren't incorporated yet. In this case, we can immediately return 0L; but here we choose to retry several times
+        // to hopefully have a more accurate value than 0L.
+        for (int i = 0; i < 5; i++) {
+            final long count = counter.sum();
+            if (count >= 0L) {
+                return count;
+            }
+        }
+        return 0L;
     }
 }

+ 8 - 12
server/src/main/java/org/elasticsearch/common/metrics/MeanMetric.java

@@ -10,7 +10,7 @@ package org.elasticsearch.common.metrics;
 
 import java.util.concurrent.atomic.LongAdder;
 
-public class MeanMetric implements Metric {
+public final class MeanMetric {
 
     private final LongAdder counter = new LongAdder();
     private final LongAdder sum = new LongAdder();
@@ -20,13 +20,14 @@ public class MeanMetric implements Metric {
         sum.add(n);
     }
 
-    public void dec(long n) {
-        counter.decrement();
-        sum.add(-n);
-    }
-
+    /**
+     * Returns the current count of this metric. This metric supports only {@link #inc(long)} that increases the counter
+     * whenever it's invoked; hence, the returned count is always non-negative.
+     */
     public long count() {
-        return counter.sum();
+        final long count = counter.sum();
+        assert count >= 0 : "Count of MeanMetric must always be non-negative; got " + count;
+        return count;
     }
 
     public long sum() {
@@ -40,9 +41,4 @@ public class MeanMetric implements Metric {
         }
         return 0.0;
     }
-
-    public void clear() {
-        counter.reset();
-        sum.reset();
-    }
 }

+ 0 - 12
server/src/main/java/org/elasticsearch/common/metrics/Metric.java

@@ -1,12 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0 and the Server Side Public License, v 1; you may not use this file except
- * in compliance with, at your election, the Elastic License 2.0 or the Server
- * Side Public License, v 1.
- */
-
-package org.elasticsearch.common.metrics;
-
-public interface Metric {
-}

+ 0 - 5
server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java

@@ -71,10 +71,8 @@ public final class ShardSearchStats implements SearchOperationListener {
         computeStats(searchContext, statsHolder -> {
             if (searchContext.hasOnlySuggest()) {
                 statsHolder.suggestCurrent.dec();
-                assert statsHolder.suggestCurrent.count() >= 0;
             } else {
                 statsHolder.queryCurrent.dec();
-                assert statsHolder.queryCurrent.count() >= 0;
             }
         });
     }
@@ -85,11 +83,9 @@ public final class ShardSearchStats implements SearchOperationListener {
             if (searchContext.hasOnlySuggest()) {
                 statsHolder.suggestMetric.inc(tookInNanos);
                 statsHolder.suggestCurrent.dec();
-                assert statsHolder.suggestCurrent.count() >= 0;
             } else {
                 statsHolder.queryMetric.inc(tookInNanos);
                 statsHolder.queryCurrent.dec();
-                assert statsHolder.queryCurrent.count() >= 0;
             }
         });
     }
@@ -109,7 +105,6 @@ public final class ShardSearchStats implements SearchOperationListener {
         computeStats(searchContext, statsHolder -> {
             statsHolder.fetchMetric.inc(tookInNanos);
             statsHolder.fetchCurrent.dec();
-            assert statsHolder.fetchCurrent.count() >= 0;
         });
     }
 

+ 8 - 8
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/slm/SnapshotLifecycleStatsTests.java

@@ -23,10 +23,10 @@ public class SnapshotLifecycleStatsTests extends AbstractSerializingTestCase<Sna
 
     public static SnapshotLifecycleStats.SnapshotPolicyStats randomPolicyStats(String policyId) {
         return new SnapshotLifecycleStats.SnapshotPolicyStats(policyId,
-            randomBoolean() ? 0 : randomNonNegativeLong(),
-            randomBoolean() ? 0 : randomNonNegativeLong(),
-            randomBoolean() ? 0 : randomNonNegativeLong(),
-            randomBoolean() ? 0 : randomNonNegativeLong());
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE));
     }
 
     public static SnapshotLifecycleStats randomLifecycleStats() {
@@ -37,10 +37,10 @@ public class SnapshotLifecycleStatsTests extends AbstractSerializingTestCase<Sna
             policyStats.put(policy, randomPolicyStats(policy));
         }
         return new SnapshotLifecycleStats(
-            randomBoolean() ? 0 : randomNonNegativeLong(),
-            randomBoolean() ? 0 : randomNonNegativeLong(),
-            randomBoolean() ? 0 : randomNonNegativeLong(),
-            randomBoolean() ? 0 : randomNonNegativeLong(),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
+            randomBoolean() ? 0 : randomIntBetween(0, Integer.MAX_VALUE),
             policyStats);
     }