Browse Source

Optimize no-op update of SafeCommitInfo (#106066)

Today we always read the last safe commit on flush in order to get its
document count, so we can expire any peer retention leases which are
lagging unreasonably. There's no need to do any IO here if nothing has
changed, we can just re-use the info from the previous commit.
David Turner 1 year ago
parent
commit
3f8504e782

+ 32 - 13
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -97,21 +97,13 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
         assert Thread.holdsLock(this) == false : "should not block concurrent acquire or release";
         final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
         final IndexCommit safeCommit = commits.get(keptPosition);
-        int totalDocsOfSafeCommit;
-        try {
-            totalDocsOfSafeCommit = getDocCountOfCommit(safeCommit);
-        } catch (IOException ex) {
-            logger.info("failed to get the total docs from the safe commit; use the total docs from the previous safe commit", ex);
-            totalDocsOfSafeCommit = safeCommitInfo.docCount;
-        }
-        IndexCommit newCommit = null;
-        IndexCommit previousLastCommit = null;
+        final var newSafeCommitInfo = getNewSafeCommitInfo(safeCommit);
+        final IndexCommit newCommit;
+        final IndexCommit previousLastCommit;
         List<IndexCommit> deletedCommits = null;
         synchronized (this) {
-            this.safeCommitInfo = new SafeCommitInfo(
-                Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
-                totalDocsOfSafeCommit
-            );
+            // we are synchronized on the IndexWriter in this method so nothing concurrently changed safeCommitInfo since the previous read
+            this.safeCommitInfo = newSafeCommitInfo;
             previousLastCommit = this.lastCommit;
             this.lastCommit = commits.get(commits.size() - 1);
             this.safeCommit = safeCommit;
@@ -123,6 +115,8 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
             }
             if (commitsListener != null && previousLastCommit != this.lastCommit) {
                 newCommit = acquireIndexCommit(false);
+            } else {
+                newCommit = null;
             }
             for (int i = 0; i < keptPosition; i++) {
                 final IndexCommit commit = commits.get(i);
@@ -149,6 +143,31 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
         }
     }
 
+    private SafeCommitInfo getNewSafeCommitInfo(IndexCommit newSafeCommit) {
+        final var currentSafeCommitInfo = this.safeCommitInfo;
+        final long newSafeCommitLocalCheckpoint;
+        try {
+            newSafeCommitLocalCheckpoint = Long.parseLong(newSafeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
+        } catch (Exception ex) {
+            logger.info("failed to get the local checkpoint from the safe commit; use the info from the previous safe commit", ex);
+            return currentSafeCommitInfo;
+        }
+
+        if (currentSafeCommitInfo.localCheckpoint == newSafeCommitLocalCheckpoint) {
+            // the new commit could in principle have the same LCP but a different doc count due to extra operations between its LCP and
+            // MSN, but that is a transient state since we'll eventually advance the LCP. The doc count is only used for heuristics around
+            // expiring excessively-lagging retention leases, so a little inaccuracy is tolerable here.
+            return currentSafeCommitInfo;
+        }
+
+        try {
+            return new SafeCommitInfo(newSafeCommitLocalCheckpoint, getDocCountOfCommit(newSafeCommit));
+        } catch (IOException ex) {
+            logger.info("failed to get the total docs from the safe commit; use the total docs from the previous safe commit", ex);
+            return new SafeCommitInfo(newSafeCommitLocalCheckpoint, currentSafeCommitInfo.docCount);
+        }
+    }
+
     private boolean assertSafeCommitUnchanged(IndexCommit safeCommit) {
         // This is protected from concurrent calls by a lock on the IndexWriter, but this assertion makes sure that we notice if that ceases
         // to be true in future. It is not disastrous if safeCommitInfo refers to an older safeCommit, it just means that we might retain a

+ 31 - 0
server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
@@ -97,6 +98,36 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         );
     }
 
+    public void testReusePreviousSafeCommitInfo() throws Exception {
+        final AtomicLong globalCheckpoint = new AtomicLong();
+        final AtomicInteger getDocCountCalls = new AtomicInteger();
+        CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
+            logger,
+            new TranslogDeletionPolicy(),
+            new SoftDeletesPolicy(globalCheckpoint::get, NO_OPS_PERFORMED, between(0, 100), () -> RetentionLeases.EMPTY),
+            globalCheckpoint::get,
+            null
+        ) {
+            @Override
+            protected int getDocCountOfCommit(IndexCommit indexCommit) {
+                getDocCountCalls.incrementAndGet();
+                return between(0, 1000);
+            }
+        };
+
+        final long seqNo = between(1, 10000);
+        final List<IndexCommit> commitList = new ArrayList<>();
+        final var translogUUID = UUID.randomUUID();
+        commitList.add(mockIndexCommit(seqNo, seqNo, translogUUID));
+        globalCheckpoint.set(seqNo);
+        indexPolicy.onCommit(commitList);
+        assertEquals(1, getDocCountCalls.get());
+
+        commitList.add(mockIndexCommit(seqNo, seqNo, translogUUID));
+        indexPolicy.onCommit(commitList);
+        assertEquals(1, getDocCountCalls.get());
+    }
+
     public void testAcquireIndexCommit() throws Exception {
         final AtomicLong globalCheckpoint = new AtomicLong();
         final int extraRetainedOps = between(0, 100);