Browse Source

Ensure IndexingPressure memory is re-adjusted once (#67673)

We have seen a case where the memory of IndexingPressure was 
re-adjusted twice. With this commit, we will log that error with a 
stack trace so that we can figure out the source of the issue.
Nhat Nguyen 4 years ago
parent
commit
f5dfa7b432

+ 25 - 8
server/src/main/java/org/elasticsearch/index/IndexingPressure.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.index;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
@@ -26,6 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.index.stats.IndexingPressureStats;
 
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class IndexingPressure {
@@ -33,6 +36,8 @@ public class IndexingPressure {
     public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES =
         Setting.memorySizeSetting("indexing_pressure.memory.limit", "10%", Setting.Property.NodeScope);
 
+    private static final Logger logger = LogManager.getLogger(IndexingPressure.class);
+
     private final AtomicLong currentCombinedCoordinatingAndPrimaryBytes = new AtomicLong(0);
     private final AtomicLong currentCoordinatingBytes = new AtomicLong(0);
     private final AtomicLong currentPrimaryBytes = new AtomicLong(0);
@@ -63,6 +68,18 @@ public class IndexingPressure {
         this.replicaLimits = (long) (this.primaryAndCoordinatingLimits * 1.5);
     }
 
+    private static Releasable wrapReleasable(Releasable releasable) {
+        final AtomicBoolean called = new AtomicBoolean();
+        return () -> {
+            if (called.compareAndSet(false, true)) {
+                releasable.close();
+            } else {
+                logger.error("IndexingPressure memory is adjusted twice", new IllegalStateException("Releasable is called twice"));
+                assert false : "IndexingPressure is adjusted twice";
+            }
+        };
+    }
+
     public Releasable markCoordinatingOperationStarted(int operations, long bytes, boolean forceExecution) {
         long combinedBytes = this.currentCombinedCoordinatingAndPrimaryBytes.addAndGet(bytes);
         long replicaWriteBytes = this.currentReplicaBytes.get();
@@ -84,11 +101,11 @@ public class IndexingPressure {
         totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
         totalCoordinatingBytes.getAndAdd(bytes);
         totalCoordinatingOps.getAndAdd(operations);
-        return () -> {
+        return wrapReleasable(() -> {
             this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
             this.currentCoordinatingBytes.getAndAdd(-bytes);
             this.currentCoordinatingOps.getAndAdd(-operations);
-        };
+        });
     }
 
     public Releasable markPrimaryOperationLocalToCoordinatingNodeStarted(int operations, long bytes) {
@@ -96,10 +113,10 @@ public class IndexingPressure {
         currentPrimaryOps.getAndAdd(operations);
         totalPrimaryBytes.getAndAdd(bytes);
         totalPrimaryOps.getAndAdd(operations);
-        return () -> {
+        return wrapReleasable(() -> {
             this.currentPrimaryBytes.getAndAdd(-bytes);
             this.currentPrimaryOps.getAndAdd(-operations);
-        };
+        });
     }
 
     public Releasable markPrimaryOperationStarted(int operations, long bytes, boolean forceExecution) {
@@ -123,11 +140,11 @@ public class IndexingPressure {
         totalCombinedCoordinatingAndPrimaryBytes.getAndAdd(bytes);
         totalPrimaryBytes.getAndAdd(bytes);
         totalPrimaryOps.getAndAdd(operations);
-        return () -> {
+        return wrapReleasable(() -> {
             this.currentCombinedCoordinatingAndPrimaryBytes.getAndAdd(-bytes);
             this.currentPrimaryBytes.getAndAdd(-bytes);
             this.currentPrimaryOps.getAndAdd(-operations);
-        };
+        });
     }
 
     public Releasable markReplicaOperationStarted(int operations, long bytes, boolean forceExecution) {
@@ -144,10 +161,10 @@ public class IndexingPressure {
         currentReplicaOps.getAndAdd(operations);
         totalReplicaBytes.getAndAdd(bytes);
         totalReplicaOps.getAndAdd(operations);
-        return () -> {
+        return wrapReleasable(() -> {
             this.currentReplicaBytes.getAndAdd(-bytes);
             this.currentReplicaOps.getAndAdd(-operations);
-        };
+        });
     }
 
     public IndexingPressureStats stats() {

+ 0 - 2
server/src/test/java/org/elasticsearch/index/IndexingPressureTests.java

@@ -140,9 +140,7 @@ public class IndexingPressureTests extends ESTestCase {
             assertEquals(1, indexingPressure.stats().getReplicaRejections());
             assertEquals(1024 * 14, indexingPressure.stats().getCurrentReplicaBytes());
             forced.close();
-
             replica2.close();
-            forced.close();
         }
 
         assertEquals(1024 * 14, indexingPressure.stats().getTotalReplicaBytes());