Browse Source

Wait till node is part of cluster state for join process
When a node sends a join request to the master, only send back the response after it has been added to the master cluster state and published.
This will fix the rare cases where today, a join request can return, and the master, since its under load, have not yet added the node to its cluster state, and the node that joined will start a fault detect against the master, failing since its not part of the cluster state.
Since now the join request is longer, also increase the join request timeout default.
closes #6480

Shay Banon 11 years ago
parent
commit
2330421816

+ 3 - 2
docs/reference/modules/discovery/zen.asciidoc

@@ -73,8 +73,9 @@ elected or joined to. This is done automatically. The
 `discovery.zen.ping_timeout` (which defaults to `3s`) allows to
 configure the election to handle cases of slow or congested networks
 (higher values assure less chance of failure). Once a node joins, it
- will send a join request to the master (`discovery.zen.join_timeout`)
- with a timeout defaulting at 10 times the ping timeout.
+will send a join request to the master (`discovery.zen.join_timeout`)
+with a timeout defaulting at 20 times the ping timeout.
+coming[1.3.0,Previously defaulted to 10 times the ping timeout].
 
 Nodes can be excluded from becoming a master by setting `node.master` to
 `false`. Note, once a node is a client node (`node.client` set to

+ 14 - 10
src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -135,7 +135,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
 
         // also support direct discovery.zen settings, for cases when it gets extended
         this.pingTimeout = settings.getAsTime("discovery.zen.ping.timeout", settings.getAsTime("discovery.zen.ping_timeout", componentSettings.getAsTime("ping_timeout", componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3)))));
-        this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 10));
+        this.joinTimeout = settings.getAsTime("discovery.zen.join_timeout", TimeValue.timeValueMillis(pingTimeout.millis() * 20));
         this.sendLeaveRequest = componentSettings.getAsBoolean("send_leave_request", true);
 
         this.masterElectionFilterClientNodes = settings.getAsBoolean("discovery.zen.master_election.filter_client", true);
@@ -592,7 +592,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
                 newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster"));
             } else {
                 if (currentJoinThread != null) {
-                    logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
+                    logger.trace("got a new state from master node while joining the cluster, this is a valid state during the last phase of the join process");
                 }
 
                 final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
@@ -699,30 +699,29 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         }
     }
 
-    private ClusterState handleJoinRequest(final DiscoveryNode node) {
+    private void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
         if (!master) {
             throw new ElasticsearchIllegalStateException("Node [" + localNode + "] not master for join request from [" + node + "]");
         }
 
-        ClusterState state = clusterService.state();
         if (!transportService.addressSupported(node.address().getClass())) {
             // TODO, what should we do now? Maybe inform that node that its crap?
             logger.warn("received a wrong address type from [{}], ignoring...", node);
         } else {
             // try and connect to the node, if it fails, we can raise an exception back to the client...
             transportService.connectToNode(node);
-            state = clusterService.state();
+            ClusterState state = clusterService.state();
 
             // validate the join request, will throw a failure if it fails, which will get back to the
             // node calling the join request
             membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
 
-            clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
+            clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
                 @Override
                 public ClusterState execute(ClusterState currentState) {
                     if (currentState.nodes().nodeExists(node.id())) {
                         // the node already exists in the cluster
-                        logger.warn("received a join request for an existing node [{}]", node);
+                        logger.info("received a join request for an existing node [{}]", node);
                         // still send a new cluster state, so it will be re published and possibly update the other node
                         return ClusterState.builder(currentState).build();
                     }
@@ -741,10 +740,15 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
                 @Override
                 public void onFailure(String source, Throwable t) {
                     logger.error("unexpected failure during [{}]", t, source);
+                    callback.onFailure(t);
+                }
+
+                @Override
+                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                    callback.onSuccess(newState);
                 }
             });
         }
-        return state;
     }
 
     private DiscoveryNode findMaster() {
@@ -869,8 +873,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
 
     private class MembershipListener implements MembershipAction.MembershipListener {
         @Override
-        public ClusterState onJoin(DiscoveryNode node) {
-            return handleJoinRequest(node);
+        public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
+            handleJoinRequest(node, callback);
         }
 
         @Override

+ 31 - 8
src/main/java/org/elasticsearch/discovery/zen/membership/MembershipAction.java

@@ -39,8 +39,14 @@ import java.util.concurrent.TimeUnit;
  */
 public class MembershipAction extends AbstractComponent {
 
+    public static interface JoinCallback {
+        void onSuccess(ClusterState state);
+
+        void onFailure(Throwable t);
+    }
+
     public static interface MembershipListener {
-        ClusterState onJoin(DiscoveryNode node);
+        void onJoin(DiscoveryNode node, JoinCallback callback);
 
         void onLeave(DiscoveryNode node);
     }
@@ -160,13 +166,30 @@ public class MembershipAction extends AbstractComponent {
         }
 
         @Override
-        public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception {
-            ClusterState clusterState = listener.onJoin(request.node);
-            if (request.withClusterState) {
-                channel.sendResponse(new JoinResponse(clusterState));
-            } else {
-                channel.sendResponse(TransportResponse.Empty.INSTANCE);
-            }
+        public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
+            listener.onJoin(request.node, new JoinCallback() {
+                @Override
+                public void onSuccess(ClusterState state) {
+                    try {
+                        if (request.withClusterState) {
+                            channel.sendResponse(new JoinResponse(state));
+                        } else {
+                            channel.sendResponse(TransportResponse.Empty.INSTANCE);
+                        }
+                    } catch (Throwable t) {
+                        onFailure(t);
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    try {
+                        channel.sendResponse(t);
+                    } catch (Throwable e) {
+                        logger.warn("failed to send back failure on join request", e);
+                    }
+                }
+            });
         }
 
         @Override