Browse Source

Reset state recovery after successful recovery (#42576)

The problem this commit addresses is that state recovery is not reset on a node that then becomes
master with a cluster state that has a state not recovered flag in it. The situation that was observed
in a failed test run of MinimumMasterNodesIT.testThreeNodesNoMasterBlock (see below) is that we
have 3 master nodes (node_t0, node_t1, node_t2), two of them are shut down (node_t2 remains),
when the first one comes back (renamed to node_t4) it becomes leader in term 2 and sends state
(with state_not_recovered_block) to node_t2, which accepts. node_t2 becomes leader in term 3, and
as it was previously leader in term1 and successfully completed state recovery, does never retry
state recovery in term 3.

Closes #39172
Yannick Welsch 6 years ago
parent
commit
692245cc44

+ 2 - 1
server/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -297,7 +297,8 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
     public String toString() {
         StringBuilder sb = new StringBuilder();
         final String TAB = "   ";
-        sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
+        sb.append("cluster uuid: ").append(metaData.clusterUUID())
+            .append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
         sb.append("version: ").append(version).append("\n");
         sb.append("state uuid: ").append(stateUUID).append("\n");
         sb.append("from_diff: ").append(wasReadFromDiff).append("\n");

+ 0 - 1
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -699,7 +699,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
             assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
             assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
             assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
-            assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
             assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
                 : preVoteCollector + " vs " + getPreVoteResponse();
 

+ 7 - 4
server/src/main/java/org/elasticsearch/gateway/GatewayService.java

@@ -85,7 +85,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
 
     private final Runnable recoveryRunnable;
 
-    private final AtomicBoolean recovered = new AtomicBoolean();
+    private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
     private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
 
     @Inject
@@ -211,7 +211,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
 
                     @Override
                     protected void doRun() {
-                        if (recovered.compareAndSet(false, true)) {
+                        if (recoveryInProgress.compareAndSet(false, true)) {
                             logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
                             recoveryRunnable.run();
                         }
@@ -219,7 +219,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
                 }, recoverAfterTime, ThreadPool.Names.GENERIC);
             }
         } else {
-            if (recovered.compareAndSet(false, true)) {
+            if (recoveryInProgress.compareAndSet(false, true)) {
                 threadPool.generic().execute(new AbstractRunnable() {
                     @Override
                     public void onFailure(final Exception e) {
@@ -237,7 +237,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
     }
 
     private void resetRecoveredFlags() {
-        recovered.set(false);
+        recoveryInProgress.set(false);
         scheduledRecovery.set(false);
     }
 
@@ -256,6 +256,9 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
         @Override
         public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
             logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
+            // reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
+            // not-recovered state, that we again do another state recovery.
+            resetRecoveredFlags();
         }
 
         @Override

+ 58 - 3
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -45,6 +45,7 @@ import org.elasticsearch.cluster.metadata.Manifest;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNode.Role;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplierService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Nullable;
@@ -69,6 +70,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.discovery.DiscoveryModule;
 import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
 import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.gateway.ClusterStateUpdaters;
+import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.gateway.MetaStateService;
 import org.elasticsearch.gateway.MockGatewayMetaState;
 import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
@@ -130,6 +133,7 @@ import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MAS
 import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
 import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
 import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
+import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
 import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
 import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
 import static org.hamcrest.Matchers.containsString;
@@ -190,6 +194,45 @@ public class CoordinatorTests extends ESTestCase {
         assertEquals(result1, result2);
     }
 
+    /**
+     * This test was added to verify that state recovery is properly reset on a node after it has become master and successfully
+     * recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows:
+     * 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back
+     * one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.
+     * Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
+     * successfully completed state recovery, is never reset to a state where state recovery can be retried.
+     */
+    public void testStateRecoveryResetAfterPreviousLeadership() {
+        final Cluster cluster = new Cluster(3);
+        cluster.runRandomly();
+        cluster.stabilise();
+
+        final ClusterNode leader = cluster.getAnyLeader();
+        final ClusterNode follower1 = cluster.getAnyNodeExcept(leader);
+        final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);
+
+        // restart follower1 and follower2
+        for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
+            clusterNode.close();
+            cluster.clusterNodes.forEach(
+                cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
+                        }
+
+                        @Override
+                        public String toString() {
+                            return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
+                        }
+                    })));
+            cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn);
+        }
+
+        cluster.stabilise();
+    }
+
     public void testCanUpdateClusterStateAfterStabilisation() {
         final Cluster cluster = new Cluster(randomIntBetween(1, 5));
         cluster.runRandomly();
@@ -1524,6 +1567,10 @@ public class CoordinatorTests extends ESTestCase {
 
             assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
             assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
+            assertThat(leaderId + " has no NO_MASTER_BLOCK",
+                leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
+            assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK",
+                leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
             assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
 
             for (final ClusterNode clusterNode : clusterNodes) {
@@ -1555,6 +1602,8 @@ public class CoordinatorTests extends ESTestCase {
                         equalTo(leader.getLocalNode()));
                     assertThat(nodeId + " has no NO_MASTER_BLOCK",
                         clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
+                    assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK",
+                        clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
                 } else {
                     assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
                     assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
@@ -1724,7 +1773,8 @@ public class CoordinatorTests extends ESTestCase {
                     } else {
                         nodeEnvironment = null;
                         delegate = new InMemoryPersistedState(0L,
-                                clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
+                            ClusterStateUpdaters.addStateNotRecoveredBlock(
+                                clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)));
                     }
                 } catch (IOException e) {
                     throw new UncheckedIOException("Unable to create MockPersistedState", e);
@@ -1764,8 +1814,9 @@ public class CoordinatorTests extends ESTestCase {
                         clusterState.writeTo(outStream);
                         StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
                                 new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
+                        // adapt cluster state to new localNode instance and add blocks
                         delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
-                                ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
+                            ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
                     }
                 } catch (IOException e) {
                     throw new UncheckedIOException("Unable to create MockPersistedState", e);
@@ -1869,15 +1920,19 @@ public class CoordinatorTests extends ESTestCase {
                         transportService));
                 final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
                     Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
+                final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
                 coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
-                    ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
+                    allocationService, masterService, this::getPersistedState,
                     Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
                 masterService.setClusterStatePublisher(coordinator);
+                final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
+                    deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
 
                 logger.trace("starting up [{}]", localNode);
                 transportService.start();
                 transportService.acceptIncomingRequests();
                 coordinator.start();
+                gatewayService.start();
                 clusterService.start();
                 coordinator.startInitialJoin();
             }