Prechádzať zdrojové kódy

Add logging to index commit deletion policy (#28448)

This would help us to figure out which index commit that an engine 
started with or used in peer-recovery.

Relates #28405
Nhat Nguyen 7 rokov pred
rodič
commit
5e0be61774

+ 27 - 4
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.index.engine;
 
 import com.carrotsearch.hppc.ObjectIntHashMap;
+import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
 import org.apache.lucene.store.Directory;
@@ -31,6 +32,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.function.LongSupplier;
 
@@ -42,6 +44,7 @@ import java.util.function.LongSupplier;
  * the current global checkpoint except the index commit which has the highest max sequence number among those.
  */
 public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
+    private final Logger logger;
     private final TranslogDeletionPolicy translogDeletionPolicy;
     private final EngineConfig.OpenMode openMode;
     private final LongSupplier globalCheckpointSupplier;
@@ -50,9 +53,10 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
     private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
     private volatile IndexCommit lastCommit; // the most recent commit point
 
-    CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
+    CombinedDeletionPolicy(EngineConfig.OpenMode openMode, Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
                            LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
         this.openMode = openMode;
+        this.logger = logger;
         this.translogDeletionPolicy = translogDeletionPolicy;
         this.globalCheckpointSupplier = globalCheckpointSupplier;
         this.startingCommit = startingCommit;
@@ -104,8 +108,12 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
      * (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
      * the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
      */
-    private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) {
-        commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
+    private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
+        for (IndexCommit commit : commits) {
+            if (startingCommit.equals(commit) == false) {
+                this.deleteCommit(commit);
+            }
+        }
         assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
         lastCommit = startingCommit;
         safeCommit = startingCommit;
@@ -118,14 +126,22 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
         safeCommit = commits.get(keptPosition);
         for (int i = 0; i < keptPosition; i++) {
             if (snapshottedCommits.containsKey(commits.get(i)) == false) {
-                commits.get(i).delete();
+                deleteCommit(commits.get(i));
             }
         }
         updateTranslogDeletionPolicy();
     }
 
+    private void deleteCommit(IndexCommit commit) throws IOException {
+        assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice";
+        logger.debug("Delete index commit [{}]", commitDescription(commit));
+        commit.delete();
+        assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed";
+    }
+
     private void updateTranslogDeletionPolicy() throws IOException {
         assert Thread.holdsLock(this);
+        logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
         assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
         final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
         assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
@@ -229,6 +245,13 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
         return false;
     }
 
+    /**
+     * Returns a description for a given {@link IndexCommit}. This should be only used for logging and debugging.
+     */
+    public static String commitDescription(IndexCommit commit) throws IOException {
+        return String.format(Locale.ROOT, "CommitPoint{segment[%s], userData[%s]}", commit.getSegmentsFileName(), commit.getUserData());
+    }
+
     /**
      * A wrapper of an index commit that prevents it from being deleted.
      */

+ 1 - 1
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -184,7 +184,7 @@ public class InternalEngine extends Engine {
                 assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
                     "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
                 this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
-                this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
+                this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, logger, translogDeletionPolicy,
                     translog::getLastSyncedGlobalCheckpoint, startingCommit);
                 writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
                 updateMaxUnsafeAutoIdTimestampFromWriter(writer);

+ 12 - 2
server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.indices.recovery;
 
+import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.util.Supplier;
 import org.apache.lucene.index.DirectoryReader;
@@ -64,6 +65,7 @@ import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.StringJoiner;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -320,7 +322,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
 
         final long startingSeqNo;
         if (metadataSnapshot.size() > 0) {
-            startingSeqNo = getStartingSeqNo(recoveryTarget);
+            startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
         } else {
             startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
         }
@@ -354,12 +356,20 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
      * @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
      * failed
      */
-    public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
+    public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
         try {
             final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
             final List<IndexCommit> existingCommits = DirectoryReader.listCommits(recoveryTarget.store().directory());
             final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
             final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(safeCommit);
+            if (logger.isTraceEnabled()) {
+                final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
+                for (IndexCommit commit : existingCommits) {
+                    descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
+                }
+                logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]",
+                    globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits);
+            }
             if (seqNoStats.maxSeqNo <= globalCheckpoint) {
                 assert seqNoStats.localCheckpoint <= globalCheckpoint;
                 /*

+ 18 - 8
server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -55,7 +55,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final AtomicLong globalCheckpoint = new AtomicLong();
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
-            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
+            OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
 
         final LongArrayList maxSeqNoList = new LongArrayList();
         final LongArrayList translogGenList = new LongArrayList();
@@ -95,7 +95,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final UUID translogUUID = UUID.randomUUID();
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
-            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
+            OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
         long lastMaxSeqNo = between(1, 1000);
         long lastTranslogGen = between(1, 20);
         int safeIndex = 0;
@@ -116,6 +116,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
             long upper = safeIndex == commitList.size() - 1 ? lastMaxSeqNo :
                 Long.parseLong(commitList.get(safeIndex + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1;
             globalCheckpoint.set(randomLongBetween(lower, upper));
+            commitList.forEach(this::resetDeletion);
             indexPolicy.onCommit(commitList);
             // Captures and releases some commits
             int captures = between(0, 5);
@@ -144,6 +145,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         }
         snapshottingCommits.forEach(indexPolicy::releaseCommit);
         globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
+        commitList.forEach(this::resetDeletion);
         indexPolicy.onCommit(commitList);
         for (int i = 0; i < commitList.size() - 1; i++) {
             assertThat(commitList.get(i).isDeleted(), equalTo(true));
@@ -159,7 +161,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
 
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
-            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
+            OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
 
         long legacyTranslogGen = randomNonNegativeLong();
         IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
@@ -180,6 +182,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen));
 
         // Make the fresh commit safe.
+        resetDeletion(legacyCommit);
         globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE));
         indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit));
         verify(legacyCommit, times(2)).delete();
@@ -192,7 +195,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
         TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
-            OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
+            OPEN_INDEX_CREATE_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
 
         final int invalidCommits = between(1, 10);
         final List<IndexCommit> commitList = new ArrayList<>();
@@ -230,7 +233,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         }
         final IndexCommit startingCommit = randomFrom(commitList);
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
-            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit);
+            OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, startingCommit);
         indexPolicy.onInit(commitList);
         for (IndexCommit commit : commitList) {
             if (commit.equals(startingCommit) == false) {
@@ -249,7 +252,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         final UUID translogUUID = UUID.randomUUID();
         final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
         CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
-            OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
+            OPEN_INDEX_AND_TRANSLOG, logger, translogPolicy, globalCheckpoint::get, null);
         final List<IndexCommit> commitList = new ArrayList<>();
         int totalCommits = between(2, 20);
         long lastMaxSeqNo = between(1, 1000);
@@ -261,6 +264,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         }
         IndexCommit safeCommit = randomFrom(commitList);
         globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
+        commitList.forEach(this::resetDeletion);
         indexPolicy.onCommit(commitList);
         if (safeCommit == commitList.get(commitList.size() - 1)) {
             // Safe commit is the last commit - no need to clean up
@@ -274,6 +278,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
             // Advanced enough
             globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE));
             assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(true));
+            commitList.forEach(this::resetDeletion);
             indexPolicy.onCommit(commitList);
             // Safe commit is the last commit - no need to clean up
             assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
@@ -287,17 +292,21 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
         userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
         userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
-        final AtomicBoolean deleted = new AtomicBoolean();
         final IndexCommit commit = mock(IndexCommit.class);
         final Directory directory = mock(Directory.class);
         when(commit.getUserData()).thenReturn(userData);
         when(commit.getDirectory()).thenReturn(directory);
+        resetDeletion(commit);
+        return commit;
+    }
+
+    void resetDeletion(IndexCommit commit) {
+        final AtomicBoolean deleted = new AtomicBoolean();
         when(commit.isDeleted()).thenAnswer(args -> deleted.get());
         doAnswer(arg -> {
             deleted.set(true);
             return null;
         }).when(commit).delete();
-        return commit;
     }
 
     IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException {
@@ -306,6 +315,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
         userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
         final IndexCommit commit = mock(IndexCommit.class);
         when(commit.getUserData()).thenReturn(userData);
+        resetDeletion(commit);
         return commit;
     }
 }

+ 4 - 4
server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

@@ -33,7 +33,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
             {
                 recoveryEmptyReplica(replica);
                 final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
-                assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(0L));
+                assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(0L));
                 recoveryTarget.decRef();
             }
             // Last commit is good - use it.
@@ -49,7 +49,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
                 replica.updateGlobalCheckpointOnReplica(initDocs - 1, "test");
                 replica.getTranslog().sync();
                 final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
-                assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs));
+                assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
                 recoveryTarget.decRef();
             }
             // Global checkpoint does not advance, last commit is not good - use the previous commit
@@ -63,7 +63,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
                 }
                 flushShard(replica);
                 final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
-                assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs));
+                assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs));
                 recoveryTarget.decRef();
             }
             // Advances the global checkpoint, a safe commit also advances
@@ -71,7 +71,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
                 replica.updateGlobalCheckpointOnReplica(initDocs + moreDocs - 1, "test");
                 replica.getTranslog().sync();
                 final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
-                assertThat(PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget), equalTo(initDocs + moreDocs));
+                assertThat(PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget), equalTo(initDocs + moreDocs));
                 recoveryTarget.decRef();
             }
         } finally {

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

@@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
         final Store.MetadataSnapshot snapshot = getMetadataSnapshotOrEmpty(replica);
         final long startingSeqNo;
         if (snapshot.size() > 0) {
-            startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(recoveryTarget);
+            startingSeqNo = PeerRecoveryTargetService.getStartingSeqNo(logger, recoveryTarget);
         } else {
             startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
         }