Browse Source

Use correct cluster state version for node fault detection (#30810)

Since its introduction in ES 1.4, node fault detection has been using the wrong cluster state version to send
as part of the ping request, by using always the constant -1 (ClusterState.UNKNOWN_VERSION). This can, in an
unfortunate series of events, lead to a situation where a previous stale master can regain its authority and
revert the cluster to an older state.

This commit makes NodesFaultDetection use the correct current cluster state for sending ping requests, avoiding
the situation where a stale master possibly forces a newer master to step down and rejoin the stale one.
Yannick Welsch 7 years ago
parent
commit
cfa2b069f3

+ 12 - 5
server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java

@@ -44,6 +44,7 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
 
 
 import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
 import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
 
 
@@ -66,13 +67,16 @@ public class NodesFaultDetection extends FaultDetection {
 
 
     private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
     private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
 
 
-    private volatile long clusterStateVersion = ClusterState.UNKNOWN_VERSION;
+    private final Supplier<ClusterState> clusterStateSupplier;
 
 
     private volatile DiscoveryNode localNode;
     private volatile DiscoveryNode localNode;
 
 
-    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
+    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
+                               Supplier<ClusterState> clusterStateSupplier, ClusterName clusterName) {
         super(settings, threadPool, transportService, clusterName);
         super(settings, threadPool, transportService, clusterName);
 
 
+        this.clusterStateSupplier = clusterStateSupplier;
+
         logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
         logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
             pingRetryCount);
             pingRetryCount);
 
 
@@ -208,15 +212,18 @@ public class NodesFaultDetection extends FaultDetection {
             return NodeFD.this.equals(nodesFD.get(node));
             return NodeFD.this.equals(nodesFD.get(node));
         }
         }
 
 
+        private PingRequest newPingRequest() {
+            return new PingRequest(node, clusterName, localNode, clusterStateSupplier.get().version());
+        }
+
         @Override
         @Override
         public void run() {
         public void run() {
             if (!running()) {
             if (!running()) {
                 return;
                 return;
             }
             }
-            final PingRequest pingRequest = new PingRequest(node, clusterName, localNode, clusterStateVersion);
             final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
             final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
                 .withTimeout(pingRetryTimeout).build();
                 .withTimeout(pingRetryTimeout).build();
-            transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new TransportResponseHandler<PingResponse>() {
+            transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
                         @Override
                         @Override
                         public PingResponse newInstance() {
                         public PingResponse newInstance() {
                             return new PingResponse();
                             return new PingResponse();
@@ -254,7 +261,7 @@ public class NodesFaultDetection extends FaultDetection {
                                 }
                                 }
                             } else {
                             } else {
                                 // resend the request, not reschedule, rely on send timeout
                                 // resend the request, not reschedule, rely on send timeout
-                                transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, this);
+                                transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this);
                             }
                             }
                         }
                         }
 
 

+ 1 - 1
server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -205,7 +205,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
 
 
         this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
         this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
         this.masterFD.addListener(new MasterNodeFailureListener());
         this.masterFD.addListener(new MasterNodeFailureListener());
-        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
+        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);
         this.nodesFD.addListener(new NodeFaultDetectionListener());
         this.nodesFD.addListener(new NodeFaultDetectionListener());
         this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));
         this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));
 
 

+ 5 - 3
server/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java

@@ -175,17 +175,19 @@ public class ZenFaultDetectionTests extends ESTestCase {
         final Settings pingSettings = Settings.builder()
         final Settings pingSettings = Settings.builder()
             .put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry)
             .put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry)
             .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m").build();
             .put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m").build();
-        ClusterState clusterState = ClusterState.builder(new ClusterName("test")).nodes(buildNodesForA(true)).build();
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test")).version(randomNonNegativeLong())
+            .nodes(buildNodesForA(true)).build();
         NodesFaultDetection nodesFDA = new NodesFaultDetection(Settings.builder().put(settingsA).put(pingSettings).build(),
         NodesFaultDetection nodesFDA = new NodesFaultDetection(Settings.builder().put(settingsA).put(pingSettings).build(),
-            threadPool, serviceA, clusterState.getClusterName());
+            threadPool, serviceA, () -> clusterState, clusterState.getClusterName());
         nodesFDA.setLocalNode(nodeA);
         nodesFDA.setLocalNode(nodeA);
         NodesFaultDetection nodesFDB = new NodesFaultDetection(Settings.builder().put(settingsB).put(pingSettings).build(),
         NodesFaultDetection nodesFDB = new NodesFaultDetection(Settings.builder().put(settingsB).put(pingSettings).build(),
-            threadPool, serviceB, clusterState.getClusterName());
+            threadPool, serviceB, () -> clusterState, clusterState.getClusterName());
         nodesFDB.setLocalNode(nodeB);
         nodesFDB.setLocalNode(nodeB);
         final CountDownLatch pingSent = new CountDownLatch(1);
         final CountDownLatch pingSent = new CountDownLatch(1);
         nodesFDB.addListener(new NodesFaultDetection.Listener() {
         nodesFDB.addListener(new NodesFaultDetection.Listener() {
             @Override
             @Override
             public void onPingReceived(NodesFaultDetection.PingRequest pingRequest) {
             public void onPingReceived(NodesFaultDetection.PingRequest pingRequest) {
+                assertThat(pingRequest.clusterStateVersion(), equalTo(clusterState.version()));
                 pingSent.countDown();
                 pingSent.countDown();
             }
             }
         });
         });