Browse Source

Skip TRANSLOG stage for searchable snapshots recovery state (#70311)

This commit introduces a change where searchable snapshots
skip the RecoveryState TRANSLOG stage. Since #65531 was introduced, the
cleanFiles peer recovery phase is blocked until the prewarming
completes (this is done to avoid search latency spikes due to a cold
cache). In that phase, the RecoveryState stage is
TRANSLOG which can be confusing as we don't replay
any ops during searchable snapshots recoveries. In order
to avoid that confusion we transition directly to
FINALIZE stage.
Francisco Fernández Castaño 4 years ago
parent
commit
d3611c5ccb

+ 2 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1442,7 +1442,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         }
         }
         try {
         try {
             maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
             maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
-            recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
+            recoveryState.setLocalTranslogStage();
             if (safeCommit.isPresent() == false) {
             if (safeCommit.isPresent() == false) {
                 logger.trace("skip local recovery as no safe commit found");
                 logger.trace("skip local recovery as no safe commit found");
                 return UNASSIGNED_SEQ_NO;
                 return UNASSIGNED_SEQ_NO;
@@ -1608,7 +1608,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
     public void openEngineAndRecoverFromTranslog() throws IOException {
     public void openEngineAndRecoverFromTranslog() throws IOException {
         recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
         recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
         maybeCheckIndex();
         maybeCheckIndex();
-        recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
+        recoveryState.setLocalTranslogStage();
         final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
         final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
         final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
         final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
             translogRecoveryStats.totalOperations(snapshot.totalOperations());
             translogRecoveryStats.totalOperations(snapshot.totalOperations());

+ 8 - 0
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

@@ -222,6 +222,14 @@ public class RecoveryState implements ToXContentFragment, Writeable {
         return this;
         return this;
     }
     }
 
 
+    public synchronized RecoveryState setLocalTranslogStage() {
+        return setStage(Stage.TRANSLOG);
+    }
+
+    public synchronized RecoveryState setRemoteTranslogStage() {
+        return setStage(Stage.TRANSLOG);
+    }
+
     public Index getIndex() {
     public Index getIndex() {
         return index;
         return index;
     }
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -452,7 +452,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
                     assert indexShard.assertRetentionLeasesPersisted();
                     assert indexShard.assertRetentionLeasesPersisted();
                 }
                 }
                 indexShard.maybeCheckIndex();
                 indexShard.maybeCheckIndex();
-                state().setStage(RecoveryState.Stage.TRANSLOG);
+                state().setRemoteTranslogStage();
             } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
             } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
                 // this is a fatal exception at this stage.
                 // this is a fatal exception at this stage.
                 // this means we transferred files from the remote that have not be checksummed and they are
                 // this means we transferred files from the remote that have not be checksummed and they are

+ 2 - 2
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRelocationIntegTests.java

@@ -90,7 +90,7 @@ public class SearchableSnapshotsRelocationIntegTests extends BaseSearchableSnaps
             assertEquals(secondDataNode, shardRecoveryState.getTargetNode().getName());
             assertEquals(secondDataNode, shardRecoveryState.getTargetNode().getName());
         });
         });
 
 
-        assertBusy(() -> assertSame(RecoveryState.Stage.TRANSLOG, getActiveRelocations(restoredIndex).get(0).getStage()));
+        assertBusy(() -> assertSame(RecoveryState.Stage.FINALIZE, getActiveRelocations(restoredIndex).get(0).getStage()));
         final Index restoredIdx = clusterAdmin().prepareState().get().getState().metadata().index(restoredIndex).getIndex();
         final Index restoredIdx = clusterAdmin().prepareState().get().getState().metadata().index(restoredIndex).getIndex();
         final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, secondDataNode);
         final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, secondDataNode);
         assertEquals(1, indicesService.indexService(restoredIdx).getShard(0).outstandingCleanFilesConditions());
         assertEquals(1, indicesService.indexService(restoredIdx).getShard(0).outstandingCleanFilesConditions());
@@ -126,7 +126,7 @@ public class SearchableSnapshotsRelocationIntegTests extends BaseSearchableSnaps
             .stream()
             .stream()
             // filter for relocations that are not in stage FINALIZE (they could end up in this stage without progress for good if the
             // filter for relocations that are not in stage FINALIZE (they could end up in this stage without progress for good if the
             // target node does not have enough cache space available to hold the primary completely
             // target node does not have enough cache space available to hold the primary completely
-            .filter(recoveryState -> recoveryState.getSourceNode() != null && recoveryState.getStage() != RecoveryState.Stage.FINALIZE)
+            .filter(recoveryState -> recoveryState.getSourceNode() != null)
             .collect(Collectors.toList());
             .collect(Collectors.toList());
     }
     }
 }
 }

+ 35 - 1
x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/indices/recovery/SearchableSnapshotRecoveryState.java

@@ -16,6 +16,7 @@ import java.util.Set;
 
 
 public final class SearchableSnapshotRecoveryState extends RecoveryState {
 public final class SearchableSnapshotRecoveryState extends RecoveryState {
     private boolean preWarmComplete;
     private boolean preWarmComplete;
+    private boolean remoteTranslogSet;
 
 
     public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
     public SearchableSnapshotRecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
         super(shardRouting, targetNode, sourceNode, new Index());
         super(shardRouting, targetNode, sourceNode, new Index());
@@ -24,7 +25,7 @@ public final class SearchableSnapshotRecoveryState extends RecoveryState {
     @Override
     @Override
     public synchronized RecoveryState setStage(Stage stage) {
     public synchronized RecoveryState setStage(Stage stage) {
         // The transition to the final state was done by #prewarmCompleted, just ignore the transition
         // The transition to the final state was done by #prewarmCompleted, just ignore the transition
-        if (getStage() == Stage.DONE) {
+        if (getStage() == Stage.DONE || stage == Stage.FINALIZE && remoteTranslogSet) {
             return this;
             return this;
         }
         }
 
 
@@ -35,9 +36,42 @@ public final class SearchableSnapshotRecoveryState extends RecoveryState {
             return this;
             return this;
         }
         }
 
 
+        if (stage == Stage.INIT) {
+            remoteTranslogSet = false;
+        }
+
         return super.setStage(stage);
         return super.setStage(stage);
     }
     }
 
 
+    @Override
+    public synchronized RecoveryState setRemoteTranslogStage() {
+        remoteTranslogSet = true;
+        super.setStage(Stage.TRANSLOG);
+        return super.setStage(Stage.FINALIZE);
+    }
+
+    @Override
+    public synchronized void validateCurrentStage(Stage expected) {
+        if (remoteTranslogSet == false) {
+            super.validateCurrentStage(expected);
+        } else {
+            final Stage stage = getStage();
+            // For small indices it's possible that pre-warming finished shortly
+            // after transitioning to FINALIZE stage
+            if (stage != Stage.FINALIZE && stage != Stage.DONE) {
+                assert false : "expected stage [" + Stage.FINALIZE + " || " + Stage.DONE + "]; but current stage is [" + stage + "]";
+                throw new IllegalStateException(
+                    "expected stage [" + Stage.FINALIZE + " || " + Stage.DONE + "]; " + "but current stage is [" + stage + "]"
+                );
+            }
+        }
+    }
+
+    // Visible for tests
+    boolean isRemoteTranslogSet() {
+        return remoteTranslogSet;
+    }
+
     public synchronized void setPreWarmComplete() {
     public synchronized void setPreWarmComplete() {
         // For small shards it's possible that the
         // For small shards it's possible that the
         // cache is pre-warmed before the stage has transitioned
         // cache is pre-warmed before the stage has transitioned

+ 13 - 0
x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/indices/recovery/SearchableSnapshotsRecoveryStateTests.java

@@ -128,6 +128,19 @@ public class SearchableSnapshotsRecoveryStateTests extends ESTestCase {
         assertThat(recoveryState.getIndex().getFileDetails("non_pre_warmed_file"), is(nullValue()));
         assertThat(recoveryState.getIndex().getFileDetails("non_pre_warmed_file"), is(nullValue()));
     }
     }
 
 
+    public void testResetAfterRemoteTranslogIsSetResetsFlag() {
+        SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
+        recoveryState.getIndex().setFileDetailsComplete();
+
+        recoveryState.setStage(RecoveryState.Stage.INDEX).setStage(RecoveryState.Stage.VERIFY_INDEX).setRemoteTranslogStage();
+
+        assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.FINALIZE));
+        assertThat(recoveryState.isRemoteTranslogSet(), equalTo(true));
+
+        recoveryState.setStage(RecoveryState.Stage.INIT);
+        assertThat(recoveryState.isRemoteTranslogSet(), equalTo(false));
+    }
+
     private SearchableSnapshotRecoveryState createRecoveryState() {
     private SearchableSnapshotRecoveryState createRecoveryState() {
         ShardRouting shardRouting = TestShardRouting.newShardRouting(
         ShardRouting shardRouting = TestShardRouting.newShardRouting(
             randomAlphaOfLength(10),
             randomAlphaOfLength(10),