Browse Source

Account for archive memory usage in LVM (#102752)

Move memory accounting for VersionLookup into its class to make it easier
to get the VersionLookup memory usage before/after aVersionLookup#merge.

Relates ES-5921
Pooya Salehi 1 year ago
parent
commit
8040b86d0e

+ 49 - 22
server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

@@ -64,11 +64,18 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
 
         // Modifies the map of this instance by merging with the given VersionLookup
         public void merge(VersionLookup versionLookup) {
+            long existingEntriesSize = 0;
+            for (var entry : versionLookup.map.entrySet()) {
+                var existingValue = map.get(entry.getKey());
+                existingEntriesSize += existingValue == null ? 0 : mapEntryBytesUsed(entry.getKey(), existingValue);
+            }
             map.putAll(versionLookup.map);
+            adjustRam(versionLookup.ramBytesUsed() - existingEntriesSize);
             minDeleteTimestamp.accumulateAndGet(versionLookup.minDeleteTimestamp(), Math::min);
         }
 
-        private VersionLookup(Map<BytesRef, VersionValue> map) {
+        // Visible for testing
+        VersionLookup(Map<BytesRef, VersionValue> map) {
             this.map = map;
         }
 
@@ -77,7 +84,11 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
         }
 
         VersionValue put(BytesRef key, VersionValue value) {
-            return map.put(key, value);
+            long ramAccounting = mapEntryBytesUsed(key, value);
+            VersionValue previousValue = map.put(key, value);
+            ramAccounting += previousValue == null ? 0 : -mapEntryBytesUsed(key, previousValue);
+            adjustRam(ramAccounting);
+            return previousValue;
         }
 
         public boolean isEmpty() {
@@ -96,8 +107,12 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
             unsafe = true;
         }
 
-        public VersionValue remove(BytesRef uid) {
-            return map.remove(uid);
+        VersionValue remove(BytesRef uid) {
+            VersionValue previousValue = map.remove(uid);
+            if (previousValue != null) {
+                adjustRam(-mapEntryBytesUsed(uid, previousValue));
+            }
+            return previousValue;
         }
 
         public void updateMinDeletedTimestamp(DeleteVersionValue delete) {
@@ -107,6 +122,26 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
         public long minDeleteTimestamp() {
             return minDeleteTimestamp.get();
         }
+
+        void adjustRam(long value) {
+            if (value != 0) {
+                long v = ramBytesUsed.addAndGet(value);
+                assert v >= 0 : "bytes=" + v;
+            }
+        }
+
+        public long ramBytesUsed() {
+            return ramBytesUsed.get();
+        }
+
+        public static long mapEntryBytesUsed(BytesRef key, VersionValue value) {
+            return (BASE_BYTES_PER_BYTESREF + key.bytes.length) + (BASE_BYTES_PER_CHM_ENTRY + value.ramBytesUsed());
+        }
+
+        // Used only for testing
+        Map<BytesRef, VersionValue> getMap() {
+            return map;
+        }
     }
 
     private static final class Maps {
@@ -170,27 +205,12 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
         }
 
         void put(BytesRef uid, VersionValue version) {
-            long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
-            long ramAccounting = BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed;
-            VersionValue previousValue = current.put(uid, version);
-            ramAccounting += previousValue == null ? 0 : -(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed);
-            adjustRam(ramAccounting);
-        }
-
-        void adjustRam(long value) {
-            if (value != 0) {
-                long v = current.ramBytesUsed.addAndGet(value);
-                assert v >= 0 : "bytes=" + v;
-            }
+            current.put(uid, version);
         }
 
         void remove(BytesRef uid, DeleteVersionValue deleted) {
-            VersionValue previousValue = current.remove(uid);
+            current.remove(uid);
             current.updateMinDeletedTimestamp(deleted);
-            if (previousValue != null) {
-                long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
-                adjustRam(-(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed));
-            }
             if (old != VersionLookup.EMPTY) {
                 // we also need to remove it from the old map here to make sure we don't read this stale value while
                 // we are in the middle of a refresh. Most of the time the old map is an empty map so we can skip it there.
@@ -452,7 +472,7 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
 
     @Override
     public long ramBytesUsed() {
-        return maps.ramBytesUsed() + ramBytesUsedTombstones.get();
+        return maps.ramBytesUsed() + ramBytesUsedTombstones.get() + ramBytesUsedForArchive();
     }
 
     /**
@@ -463,6 +483,13 @@ public final class LiveVersionMap implements ReferenceManager.RefreshListener, A
         return maps.current.ramBytesUsed.get();
     }
 
+    /**
+     * Returns how much RAM would be freed up by cleaning out the LiveVersionMapArchive.
+     */
+    long ramBytesUsedForArchive() {
+        return archive.getMemoryBytesUsed();
+    }
+
     /**
      * Returns how much RAM is current being freed up by refreshing. This is the RAM usage of the previous version map that needs to stay
      * around until operations are safely recorded in the Lucene index.

+ 8 - 0
server/src/main/java/org/elasticsearch/index/engine/LiveVersionMapArchive.java

@@ -39,6 +39,14 @@ public interface LiveVersionMapArchive {
         return false;
     }
 
+    /**
+     * Returns how much memory is currently being used by the archive and would be freed up after
+     * unpromotables are refreshed.
+     */
+    default long getMemoryBytesUsed() {
+        return 0L;
+    }
+
     LiveVersionMapArchive NOOP_ARCHIVE = new LiveVersionMapArchive() {
         @Override
         public void afterRefresh(LiveVersionMap.VersionLookup old) {}

+ 6 - 2
server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTestUtils.java

@@ -61,11 +61,11 @@ public class LiveVersionMapTestUtils {
         map.pruneTombstones(maxTimestampToPrune, maxSeqNoToPrune);
     }
 
-    static IndexVersionValue randomIndexVersionValue() {
+    public static IndexVersionValue randomIndexVersionValue() {
         return new IndexVersionValue(randomTranslogLocation(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
     }
 
-    static Translog.Location randomTranslogLocation() {
+    public static Translog.Location randomTranslogLocation() {
         if (randomBoolean()) {
             return null;
         } else {
@@ -93,6 +93,10 @@ public class LiveVersionMapTestUtils {
         return map.isSafeAccessRequired();
     }
 
+    public static void enforceSafeAccess(LiveVersionMap map) {
+        map.enforceSafeAccess();
+    }
+
     public static LiveVersionMapArchive getArchive(LiveVersionMap map) {
         return map.getArchive();
     }

+ 54 - 1
server/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java

@@ -13,7 +13,9 @@ import org.apache.lucene.tests.util.TestUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
 import org.apache.lucene.util.Constants;
+import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.test.ESTestCase;
 
@@ -23,11 +25,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
 
+import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency;
+import static org.elasticsearch.core.Tuple.tuple;
 import static org.elasticsearch.index.engine.LiveVersionMapTestUtils.randomIndexVersionValue;
 import static org.elasticsearch.index.engine.LiveVersionMapTestUtils.randomTranslogLocation;
 import static org.hamcrest.Matchers.empty;
@@ -36,7 +43,6 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.nullValue;
 
 public class LiveVersionMapTests extends ESTestCase {
-
     public void testRamBytesUsed() throws Exception {
         LiveVersionMap map = new LiveVersionMap();
         for (int i = 0; i < 100000; ++i) {
@@ -442,4 +448,51 @@ public class LiveVersionMapTests extends ESTestCase {
             }
         }
     }
+
+    public void testVersionLookupRamBytesUsed() {
+        var vl = new LiveVersionMap.VersionLookup(newConcurrentMapWithAggressiveConcurrency());
+        assertEquals(0, vl.ramBytesUsed());
+        Set<BytesRef> existingKeys = new HashSet<>();
+        Supplier<Tuple<BytesRef, IndexVersionValue>> randomEntry = () -> {
+            var key = randomBoolean() || existingKeys.isEmpty() ? uid(randomIdentifier()) : randomFrom(existingKeys);
+            return tuple(key, randomIndexVersionValue());
+        };
+        IntStream.range(0, randomIntBetween(10, 100)).forEach(i -> {
+            switch (randomIntBetween(0, 2)) {
+                case 0: // put
+                    var entry = randomEntry.get();
+                    var previousValue = vl.put(entry.v1(), entry.v2());
+                    if (existingKeys.contains(entry.v1())) {
+                        assertNotNull(previousValue);
+                    } else {
+                        assertNull(previousValue);
+                        existingKeys.add(entry.v1());
+                    }
+                    break;
+                case 1: // remove
+                    if (existingKeys.isEmpty() == false) {
+                        var key = randomFrom(existingKeys);
+                        assertNotNull(vl.remove(key));
+                        existingKeys.remove(key);
+                    }
+                    break;
+                case 2: // merge
+                    var toMerge = new LiveVersionMap.VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency());
+                    IntStream.range(0, randomIntBetween(1, 100))
+                        .mapToObj(n -> randomEntry.get())
+                        .forEach(kv -> toMerge.put(kv.v1(), kv.v2()));
+                    vl.merge(toMerge);
+                    existingKeys.addAll(toMerge.getMap().keySet());
+                    break;
+                default:
+                    throw new IllegalStateException("branch value unexpected");
+            }
+        });
+        long actualRamBytesUsed = vl.getMap()
+            .entrySet()
+            .stream()
+            .mapToLong(entry -> LiveVersionMap.VersionLookup.mapEntryBytesUsed(entry.getKey(), entry.getValue()))
+            .sum();
+        assertEquals(actualRamBytesUsed, vl.ramBytesUsed());
+    }
 }