Browse Source

Discovery: wait on incoming joins before electing local node as master

During master election each node pings in order to discover other nodes and validate the liveness of existing nodes. Based on this information the node either discovers an existing master or, if enough nodes are found (based on `discovery.zen.minimum_master_nodes>>) a new master will be elected.

Currently, the node that is elected as master will currently update it the cluster state to indicate the result of the election. Other nodes will submit a join request to the newly elected master node. Instead of immediately processing the election result, the elected master
node should wait for the incoming joins from other nodes, thus validating the elections result is properly applied. As soon as enough nodes have sent their joins request (based on the `minimum_master_nodes` settings) the cluster state is modified.

Note that if `minimum_master_nodes` is not set, this change has no effect.

Closes #12161
Boaz Leskes 10 years ago
parent
commit
1e35bf3171

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java

@@ -134,7 +134,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
     }
 
     // visible for testing
-    void performReroute(String reason) {
+    protected void performReroute(String reason) {
         try {
             if (lifecycle.stopped()) {
                 return;

+ 398 - 0
core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

@@ -0,0 +1,398 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.discovery.zen;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
+import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.discovery.DiscoverySettings;
+import org.elasticsearch.discovery.zen.membership.MembershipAction;
+
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
+ * are directly added to the cluster state or are accumulated during master election.
+ */
+public class NodeJoinController extends AbstractComponent {
+
+    final ClusterService clusterService;
+    final RoutingService routingService;
+    final DiscoverySettings discoverySettings;
+    final AtomicBoolean accumulateJoins = new AtomicBoolean(false);
+
+    // this is site while trying to become a master
+    final AtomicReference<ElectionContext> electionContext = new AtomicReference<>();
+
+
+    protected final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> pendingJoinRequests = new HashMap<>();
+
+    public NodeJoinController(ClusterService clusterService, RoutingService routingService, DiscoverySettings discoverySettings, Settings settings) {
+        super(settings);
+        this.clusterService = clusterService;
+        this.routingService = routingService;
+        this.discoverySettings = discoverySettings;
+    }
+
+    /**
+     * waits for enough incoming joins from master eligible nodes to complete the master election
+     * <p/>
+     * You must start accumulating joins before calling this method. See {@link #startAccumulatingJoins()}
+     * <p/>
+     * The method will return once the local node has been elected as master or some failure/timeout has happened.
+     * The exact outcome is communicated via the callback parameter, which is guaranteed to be called.
+     *
+     * @param requiredMasterJoins the number of joins from master eligible needed to complete the election
+     * @param timeValue           how long to wait before failing. a timeout is communicated via the callback's onFailure method.
+     * @param callback            the result of the election (success or failure) will be communicated by calling methods on this
+     *                            object
+     **/
+    public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final Callback callback) {
+        assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";
+
+        final CountDownLatch done = new CountDownLatch(1);
+        final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) {
+            @Override
+            void onClose() {
+                if (electionContext.compareAndSet(this, null)) {
+                    stopAccumulatingJoins();
+                } else {
+                    assert false : "failed to remove current election context";
+                }
+                done.countDown();
+            }
+        };
+
+        if (electionContext.compareAndSet(null, newContext) == false) {
+            // should never happen, but be conservative
+            callback.onFailure(new IllegalStateException("double waiting for election"));
+            return;
+        }
+        try {
+            // check what we have so far..
+            checkPendingJoinsAndElectIfNeeded();
+
+            try {
+                if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
+                    // callback handles everything
+                    return;
+                }
+            } catch (InterruptedException e) {
+
+            }
+            if (logger.isTraceEnabled()) {
+                final int pendingNodes;
+                synchronized (pendingJoinRequests) {
+                    pendingNodes = pendingJoinRequests.size();
+                }
+                logger.trace("timed out waiting to be elected. waited [{}]. pending node joins [{}]", timeValue, pendingNodes);
+            }
+            // callback will clear the context, if it's active
+            newContext.onFailure(new ElasticsearchTimeoutException("timed out waiting to be elected"));
+        } catch (Throwable t) {
+            logger.error("unexpected failure while waiting for incoming joins", t);
+            newContext.onFailure(t);
+        }
+    }
+
+    /**
+     * Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a
+     * master or when {@link #stopAccumulatingJoins()} is called.
+     */
+    public void startAccumulatingJoins() {
+        logger.trace("starting to accumulate joins");
+        boolean b = accumulateJoins.getAndSet(true);
+        assert b == false : "double startAccumulatingJoins() calls";
+        assert electionContext.get() == null : "startAccumulatingJoins() called, but there is an ongoing election context";
+    }
+
+    /** Stopped accumulating joins. All pending joins will be processed. Future joins will be processed immediately */
+    public void stopAccumulatingJoins() {
+        logger.trace("stopping join accumulation");
+        assert electionContext.get() == null : "stopAccumulatingJoins() called, but there is an ongoing election context";
+        boolean b = accumulateJoins.getAndSet(false);
+        assert b : "stopAccumulatingJoins() called but not accumulating";
+        synchronized (pendingJoinRequests) {
+            if (pendingJoinRequests.size() > 0) {
+                processJoins("stopping to accumulate joins");
+            }
+        }
+    }
+
+    /**
+     * processes or queues an incoming join request.
+     * <p/>
+     * Note: doesn't do any validation. This should have been done before.
+     */
+    public void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
+        synchronized (pendingJoinRequests) {
+            List<MembershipAction.JoinCallback> nodeCallbacks = pendingJoinRequests.get(node);
+            if (nodeCallbacks == null) {
+                nodeCallbacks = new ArrayList<>();
+                pendingJoinRequests.put(node, nodeCallbacks);
+            }
+            nodeCallbacks.add(callback);
+        }
+        if (accumulateJoins.get() == false) {
+            processJoins("join from node[" + node + "]");
+        } else {
+            checkPendingJoinsAndElectIfNeeded();
+        }
+    }
+
+    /**
+     * checks if there is an on going request to become master and if it has enough pending joins. If so, the node will
+     * become master via a ClusterState update task.
+     */
+    private void checkPendingJoinsAndElectIfNeeded() {
+        assert accumulateJoins.get() : "election check requested but we are not accumulating joins";
+        final ElectionContext context = electionContext.get();
+        if (context == null) {
+            return;
+        }
+
+        int pendingMasterJoins=0;
+        synchronized (pendingJoinRequests) {
+            for (DiscoveryNode node : pendingJoinRequests.keySet()) {
+                if (node.isMasterNode()) {
+                    pendingMasterJoins++;
+                }
+            }
+        }
+        if (pendingMasterJoins < context.requiredMasterJoins) {
+            logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins, context.requiredMasterJoins);
+            return;
+        }
+        if (context.pendingSetAsMasterTask.getAndSet(true)) {
+            logger.trace("elected as master task already submitted, ignoring...");
+            return;
+        }
+
+        final String source = "zen-disco-join(elected_as_master, [" + pendingMasterJoins + "] joins received)";
+        clusterService.submitStateUpdateTask(source, Priority.IMMEDIATE, new ProcessJoinsTask() {
+            @Override
+            public ClusterState execute(ClusterState currentState) {
+                // Take into account the previous known nodes, if they happen not to be available
+                // then fault detection will remove these nodes.
+
+                if (currentState.nodes().masterNode() != null) {
+                    // TODO can we tie break here? we don't have a remote master cluster state version to decide on
+                    logger.trace("join thread elected local node as master, but there is already a master in place: {}", currentState.nodes().masterNode());
+                    throw new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
+                }
+
+                DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());
+                // update the fact that we are the master...
+                ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
+                currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
+
+                // reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
+                RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState);
+                if (result.changed()) {
+                    currentState = ClusterState.builder(currentState).routingResult(result).build();
+                }
+
+                // Add the incoming join requests.
+                // Note: we only do this now (after the reroute) to avoid assigning shards to these nodes.
+                return super.execute(currentState);
+            }
+
+            @Override
+            public boolean runOnlyOnMaster() {
+                return false;
+            }
+
+            @Override
+            public void onFailure(String source, Throwable t) {
+                super.onFailure(source, t);
+                context.onFailure(t);
+            }
+
+            @Override
+            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+                super.clusterStateProcessed(source, oldState, newState);
+                context.onElectedAsMaster(newState);
+            }
+        });
+    }
+
+    /** process all pending joins */
+    private void processJoins(String reason) {
+        clusterService.submitStateUpdateTask("zen-disco-join(" + reason + ")", Priority.URGENT, new ProcessJoinsTask());
+    }
+
+
+    public interface Callback {
+        void onElectedAsMaster(ClusterState state);
+
+        void onFailure(Throwable t);
+    }
+
+    static abstract class ElectionContext implements Callback {
+        private final Callback callback;
+        private final int requiredMasterJoins;
+
+        /** set to true after enough joins have been seen and a cluster update task is submitted to become master */
+        final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
+        final AtomicBoolean closed = new AtomicBoolean();
+
+        ElectionContext(Callback callback, int requiredMasterJoins) {
+            this.callback = callback;
+            this.requiredMasterJoins = requiredMasterJoins;
+        }
+
+        abstract void onClose();
+
+        @Override
+        public void onElectedAsMaster(ClusterState state) {
+            assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set";
+            if (closed.compareAndSet(false, true)) {
+                try {
+                    onClose();
+                } finally {
+                    callback.onElectedAsMaster(state);
+                }
+            }
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+            if (closed.compareAndSet(false, true)) {
+                try {
+                    onClose();
+                } finally {
+                    callback.onFailure(t);
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Processes any pending joins via a ClusterState update task.
+     * Note: this task automatically fails (and fails all pending joins) if the current node is not marked as master
+     */
+    class ProcessJoinsTask extends ProcessedClusterStateUpdateTask {
+
+        private final List<MembershipAction.JoinCallback> joinCallbacksToRespondTo = new ArrayList<>();
+        private boolean nodeAdded = false;
+
+        @Override
+        public ClusterState execute(ClusterState currentState) {
+            DiscoveryNodes.Builder nodesBuilder;
+            synchronized (pendingJoinRequests) {
+                if (pendingJoinRequests.isEmpty()) {
+                    return currentState;
+                }
+
+                nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
+                Iterator<Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>>> iterator = pendingJoinRequests.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>> entry = iterator.next();
+                    final DiscoveryNode node = entry.getKey();
+                    joinCallbacksToRespondTo.addAll(entry.getValue());
+                    iterator.remove();
+                    if (currentState.nodes().nodeExists(node.id())) {
+                        logger.debug("received a join request for an existing node [{}]", node);
+                    } else {
+                        nodeAdded = true;
+                        nodesBuilder.put(node);
+                        for (DiscoveryNode existingNode : currentState.nodes()) {
+                            if (node.address().equals(existingNode.address())) {
+                                nodesBuilder.remove(existingNode.id());
+                                logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
+                            }
+                        }
+                    }
+                }
+            }
+
+            // we must return a new cluster state instance to force publishing. This is important
+            // for the joining node to finalize it's join and set us as a master
+            final ClusterState.Builder newState = ClusterState.builder(currentState);
+            if (nodeAdded) {
+                newState.nodes(nodesBuilder);
+            }
+
+            return newState.build();
+        }
+
+        @Override
+        public void onNoLongerMaster(String source) {
+            // we are rejected, so drain all pending task (execute never run)
+            synchronized (pendingJoinRequests) {
+                Iterator<Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>>> iterator = pendingJoinRequests.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<DiscoveryNode, List<MembershipAction.JoinCallback>> entry = iterator.next();
+                    joinCallbacksToRespondTo.addAll(entry.getValue());
+                    iterator.remove();
+                }
+            }
+            Exception e = new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request");
+            innerOnFailure(e);
+        }
+
+        void innerOnFailure(Throwable t) {
+            for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) {
+                try {
+                    callback.onFailure(t);
+                } catch (Exception e) {
+                    logger.error("error during task failure", e);
+                }
+            }
+        }
+
+        @Override
+        public void onFailure(String source, Throwable t) {
+            logger.error("unexpected failure during [{}]", t, source);
+            innerOnFailure(t);
+        }
+
+        @Override
+        public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+            if (nodeAdded) {
+                // we reroute not in the same cluster state update since in certain areas we rely on
+                // the node to be in the cluster state (sampled from ClusterService#state) to be there, also
+                // shard transitions need to better be handled in such cases
+                routingService.reroute("post_node_add");
+            }
+            for (MembershipAction.JoinCallback callback : joinCallbacksToRespondTo) {
+                try {
+                    callback.onSuccess();
+                } catch (Exception e) {
+                    logger.error("unexpected error during [{}]", e, source);
+                }
+            }
+        }
+    }
+}

+ 35 - 128
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -38,7 +38,6 @@ import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
 import org.elasticsearch.cluster.settings.DynamicSettings;
 import org.elasticsearch.cluster.settings.Validator;
 import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.inject.Inject;
@@ -93,6 +92,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
     public final static String SETTING_MAX_PINGS_FROM_ANOTHER_MASTER = "discovery.zen.max_pings_from_another_master";
     public final static String SETTING_SEND_LEAVE_REQUEST = "discovery.zen.send_leave_request";
     public final static String SETTING_MASTER_ELECTION_FILTER_CLIENT = "discovery.zen.master_election.filter_client";
+    public final static String SETTING_MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT = "discovery.zen.master_election.wait_for_joins_timeout";
     public final static String SETTING_MASTER_ELECTION_FILTER_DATA = "discovery.zen.master_election.filter_data";
 
     public static final String DISCOVERY_REJOIN_ACTION_NAME = "internal:discovery/zen/rejoin";
@@ -126,6 +126,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
 
     private final boolean masterElectionFilterClientNodes;
     private final boolean masterElectionFilterDataNodes;
+    private final TimeValue masterElectionWaitForJoinsTimeout;
 
 
     private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<>();
@@ -142,7 +143,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
     @Nullable
     private NodeService nodeService;
 
-    private final BlockingQueue<Tuple<DiscoveryNode, MembershipAction.JoinCallback>> processJoinRequests = ConcurrentCollections.newBlockingQueue();
+
+    // must initialized in doStart(), when we have the routingService set
+    private volatile NodeJoinController nodeJoinController;
 
     @Inject
     public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
@@ -169,6 +172,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
 
         this.masterElectionFilterClientNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_CLIENT, true);
         this.masterElectionFilterDataNodes = settings.getAsBoolean(SETTING_MASTER_ELECTION_FILTER_DATA, false);
+        this.masterElectionWaitForJoinsTimeout = settings.getAsTime(SETTING_MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT, TimeValue.timeValueMillis(joinTimeout.millis() / 2));
         this.rejoinOnMasterGone = settings.getAsBoolean(SETTING_REJOIN_ON_MASTER_GONE, true);
 
         if (this.joinRetryAttempts < 1) {
@@ -230,6 +234,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         nodesFD.setLocalNode(clusterService.localNode());
         joinThreadControl.start();
         pingService.start();
+        this.nodeJoinController = new NodeJoinController(clusterService, routingService, discoverySettings, settings);
 
         // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
         clusterService.submitStateUpdateTask("initial_join", new ClusterStateNonMasterUpdateTask() {
@@ -353,6 +358,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
     private void innerJoinCluster() {
         DiscoveryNode masterNode = null;
         final Thread currentThread = Thread.currentThread();
+        nodeJoinController.startAccumulatingJoins();
         while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
             masterNode = findMaster();
         }
@@ -363,52 +369,32 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         }
 
         if (clusterService.localNode().equals(masterNode)) {
-            clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-                    // Take into account the previous known nodes, if they happen not to be available
-                    // then fault detection will remove these nodes.
-
-                    if (currentState.nodes().masterNode() != null) {
-                        // TODO can we tie break here? we don't have a remote master cluster state version to decide on
-                        logger.trace("join thread elected local node as master, but there is already a master in place: {}", currentState.nodes().masterNode());
-                        return currentState;
-                    }
-
-                    DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id());
-                    // update the fact that we are the master...
-                    ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
-                    currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
-
-                    // eagerly run reroute to remove dead nodes from routing table
-                    RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState);
-                    return ClusterState.builder(currentState).routingResult(result).build();
-                }
-
-                @Override
-                public void onFailure(String source, Throwable t) {
-                    logger.error("unexpected failure during [{}]", t, source);
-                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
-                }
+            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
+            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
+            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
+                    new NodeJoinController.Callback() {
+                        @Override
+                        public void onElectedAsMaster(ClusterState state) {
+                            joinThreadControl.markThreadAsDone(currentThread);
+                            // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
+                            nodesFD.updateNodesAndPing(state); // start the nodes FD
+                            sendInitialStateEventIfNeeded();
+                            long count = clusterJoinsCounter.incrementAndGet();
+                            logger.trace("cluster joins counter set to [{}] (elected as master)", count);
+                        }
 
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    if (newState.nodes().localNodeMaster()) {
-                        // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging)
-                        joinThreadControl.markThreadAsDone(currentThread);
-                        nodesFD.updateNodesAndPing(newState); // start the nodes FD
-                    } else {
-                        // if we're not a master it means another node published a cluster state while we were pinging
-                        // make sure we go through another pinging round and actively join it
-                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
+                        @Override
+                        public void onFailure(Throwable t) {
+                            logger.trace("failed while waiting for nodes to join, rejoining", t);
+                            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
+                        }
                     }
-                    sendInitialStateEventIfNeeded();
-                    long count = clusterJoinsCounter.incrementAndGet();
-                    logger.trace("cluster joins counter set to [{}] (elected as master)", count);
 
-                }
-            });
+            );
         } else {
+            // process any incoming joins (they will fail because we are not the master)
+            nodeJoinController.stopAccumulatingJoins();
+
             // send join request
             final boolean success = joinElectedMaster(masterNode);
 
@@ -878,7 +864,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         }
         if (!currentState.nodes().masterNodeId().equals(newClusterState.nodes().masterNodeId())) {
             logger.warn("received a cluster state from a different master then the current one, rejecting (received {}, current {})", newClusterState.nodes().masterNode(), currentState.nodes().masterNode());
-            throw new IllegalStateException("cluster state from a different master then the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")");
+            throw new IllegalStateException("cluster state from a different master than the current one, rejecting (received " + newClusterState.nodes().masterNode() + ", current " + currentState.nodes().masterNode() + ")");
         } else if (newClusterState.version() < currentState.version()) {
             // if the new state has a smaller version, and it has the same master node, then no need to process it
             logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
@@ -893,6 +879,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         if (!transportService.addressSupported(node.address().getClass())) {
             // TODO, what should we do now? Maybe inform that node that its crap?
             logger.warn("received a wrong address type from [{}], ignoring...", node);
+        } else if (nodeJoinController == null) {
+            throw new IllegalStateException("discovery module is not yet started");
         } else {
             // The minimum supported version for a node joining a master:
             Version minimumNodeJoinVersion = localNode().getVersion().minimumCompatibilityVersion();
@@ -910,88 +898,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
             // validate the join request, will throw a failure if it fails, which will get back to the
             // node calling the join request
             membership.sendValidateJoinRequestBlocking(node, joinTimeout);
-            processJoinRequests.add(new Tuple<>(node, callback));
-            clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
-
-                private final List<Tuple<DiscoveryNode, MembershipAction.JoinCallback>> drainedJoinRequests = new ArrayList<>();
-                private boolean nodeAdded = false;
-
-                @Override
-                public ClusterState execute(ClusterState currentState) {
-                    processJoinRequests.drainTo(drainedJoinRequests);
-                    if (drainedJoinRequests.isEmpty()) {
-                        return currentState;
-                    }
-
-                    DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
-                    for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> task : drainedJoinRequests) {
-                        DiscoveryNode node = task.v1();
-                        if (currentState.nodes().nodeExists(node.id())) {
-                            logger.debug("received a join request for an existing node [{}]", node);
-                        } else {
-                            nodeAdded = true;
-                            nodesBuilder.put(node);
-                            for (DiscoveryNode existingNode : currentState.nodes()) {
-                                if (node.address().equals(existingNode.address())) {
-                                    nodesBuilder.remove(existingNode.id());
-                                    logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
-                                }
-                            }
-                        }
-                    }
-
-
-                    // we must return a new cluster state instance to force publishing. This is important
-                    // for the joining node to finalize it's join and set us as a master
-                    final ClusterState.Builder newState = ClusterState.builder(currentState);
-                    if (nodeAdded) {
-                        newState.nodes(nodesBuilder);
-                    }
-
-                    return newState.build();
-                }
-
-                @Override
-                public void onNoLongerMaster(String source) {
-                    // we are rejected, so drain all pending task (execute never run)
-                    processJoinRequests.drainTo(drainedJoinRequests);
-                    Exception e = new NotMasterException("Node [" + clusterService.localNode() + "] not master for join request from [" + node + "]");
-                    innerOnFailure(e);
-                }
-
-                void innerOnFailure(Throwable t) {
-                    for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedJoinRequests) {
-                        try {
-                            drainedTask.v2().onFailure(t);
-                        } catch (Exception e) {
-                            logger.error("error during task failure", e);
-                        }
-                    }
-                }
-
-                @Override
-                public void onFailure(String source, Throwable t) {
-                    logger.error("unexpected failure during [{}]", t, source);
-                    innerOnFailure(t);
-                }
-
-                @Override
-                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-                    if (nodeAdded) {
-                        // we reroute not in the same cluster state update since in certain areas we rely on
-                        // the node to be in the cluster state (sampled from ClusterService#state) to be there, also
-                        // shard transitions need to better be handled in such cases
-                        routingService.reroute("post_node_add");
-                    }
-                    for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedJoinRequests) {
-                        try {
-                            drainedTask.v2().onSuccess();
-                        } catch (Exception e) {
-                            logger.error("unexpected error during [{}]", e, source);
-                        }
-                    }
-                }
-            });
+            nodeJoinController.handleJoinRequest(node, callback);
         }
     }
 
@@ -1404,4 +1311,4 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         }
 
     }
-}
+}

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java

@@ -142,7 +142,7 @@ public class RoutingServiceTests extends ElasticsearchAllocationTestCase {
         }
 
         @Override
-        void performReroute(String reason) {
+        protected void performReroute(String reason) {
             rerouted.set(true);
         }
     }

+ 10 - 5
core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsTests.java

@@ -240,12 +240,16 @@ public class DiscoveryWithServiceDisruptionsTests extends ElasticsearchIntegrati
     /** Verify that nodes fault detection works after master (re) election */
     @Test
     public void testNodesFDAfterMasterReelection() throws Exception {
-        startCluster(3);
+        startCluster(4);
 
-        logger.info("stopping current master");
+        logger.info("--> stopping current master");
         internalCluster().stopCurrentMasterNode();
 
-        ensureStableCluster(2);
+        ensureStableCluster(3);
+
+        logger.info("--> reducing min master nodes to 2");
+        assertAcked(client().admin().cluster().prepareUpdateSettings()
+                .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2)).get());
 
         String master = internalCluster().getMasterName();
         String nonMaster = null;
@@ -259,7 +263,7 @@ public class DiscoveryWithServiceDisruptionsTests extends ElasticsearchIntegrati
         addRandomIsolation(nonMaster).startDisrupting();
 
         logger.info("--> waiting for master to remove it");
-        ensureStableCluster(1, master);
+        ensureStableCluster(2, master);
     }
 
     /**
@@ -703,12 +707,13 @@ public class DiscoveryWithServiceDisruptionsTests extends ElasticsearchIntegrati
     }
 
     /**
-     * Test that a document which is indexed on the majority side of a partition, is available from the minory side,
+     * Test that a document which is indexed on the majority side of a partition, is available from the minority side,
      * once the partition is healed
      *
      * @throws Exception
      */
     @Test
+    @TestLogging(value = "cluster.service:TRACE")
     public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
         List<String> nodes = startCluster(3);
 

+ 568 - 0
core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java

@@ -0,0 +1,568 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.discovery.zen;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RoutingService;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.DummyTransportAddress;
+import org.elasticsearch.common.transport.LocalTransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.BaseFuture;
+import org.elasticsearch.discovery.DiscoverySettings;
+import org.elasticsearch.discovery.zen.membership.MembershipAction;
+import org.elasticsearch.node.settings.NodeSettingsService;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.elasticsearch.test.cluster.TestClusterService;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.junit.Before;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+
+@TestLogging("discovery.zen:TRACE")
+public class NodeJoinControllerTests extends ElasticsearchTestCase {
+
+    private TestClusterService clusterService;
+    private NodeJoinController nodeJoinController;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        clusterService = new TestClusterService();
+        final DiscoveryNodes initialNodes = clusterService.state().nodes();
+        final DiscoveryNode localNode = initialNodes.localNode();
+        // make sure we have a master
+        clusterService.setState(ClusterState.builder(clusterService.state()).nodes(DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.id())));
+        nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY),
+                new DiscoverySettings(Settings.EMPTY, new NodeSettingsService(Settings.EMPTY)), Settings.EMPTY);
+    }
+
+    public void testSimpleJoinAccumulation() throws InterruptedException, ExecutionException {
+        List<DiscoveryNode> nodes = new ArrayList<>();
+        nodes.add(clusterService.localNode());
+
+        int nodeId = 0;
+        for (int i = randomInt(5); i > 0; i--) {
+            DiscoveryNode node = newNode(nodeId++);
+            nodes.add(node);
+            joinNode(node);
+        }
+        nodeJoinController.startAccumulatingJoins();
+        ArrayList<Future<Void>> pendingJoins = new ArrayList<>();
+        for (int i = randomInt(5); i > 0; i--) {
+            DiscoveryNode node = newNode(nodeId++);
+            nodes.add(node);
+            pendingJoins.add(joinNodeAsync(node));
+        }
+        nodeJoinController.stopAccumulatingJoins();
+        for (int i = randomInt(5); i > 0; i--) {
+            DiscoveryNode node = newNode(nodeId++);
+            nodes.add(node);
+            joinNode(node);
+        }
+        assertNodesInCurrentState(nodes);
+        for (Future<Void> joinFuture : pendingJoins) {
+            assertThat(joinFuture.isDone(), equalTo(true));
+        }
+    }
+
+    public void testFailingJoinsWhenNotMaster() throws ExecutionException, InterruptedException {
+        // remove current master flag
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null);
+        clusterService.setState(ClusterState.builder(clusterService.state()).nodes(nodes));
+        int nodeId = 0;
+        try {
+            joinNode(newNode(nodeId++));
+            fail("failed to fail node join when not a master");
+        } catch (ExecutionException e) {
+            assertThat(e.getCause(), instanceOf(NotMasterException.class));
+        }
+
+        logger.debug("--> testing joins fail post accumulation");
+        ArrayList<Future<Void>> pendingJoins = new ArrayList<>();
+        nodeJoinController.startAccumulatingJoins();
+        for (int i = 1 + randomInt(5); i > 0; i--) {
+            DiscoveryNode node = newNode(nodeId++);
+            final Future<Void> future = joinNodeAsync(node);
+            pendingJoins.add(future);
+            assertThat(future.isDone(), equalTo(false));
+        }
+        nodeJoinController.stopAccumulatingJoins();
+        for (Future<Void> future : pendingJoins) {
+            try {
+                future.get();
+                fail("failed to fail accumulated node join when not a master");
+            } catch (ExecutionException e) {
+                assertThat(e.getCause(), instanceOf(NotMasterException.class));
+            }
+        }
+    }
+
+    public void testSimpleMasterElectionWithoutRequiredJoins() throws InterruptedException, ExecutionException {
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null);
+        clusterService.setState(ClusterState.builder(clusterService.state()).nodes(nodes));
+        int nodeId = 0;
+        final int requiredJoins = 0;
+        logger.debug("--> using requiredJoins [{}]", requiredJoins);
+        // initial (failing) joins shouldn't count
+        for (int i = randomInt(5); i > 0; i--) {
+            try {
+                joinNode(newNode(nodeId++));
+                fail("failed to fail node join when not a master");
+            } catch (ExecutionException e) {
+                assertThat(e.getCause(), instanceOf(NotMasterException.class));
+            }
+        }
+
+        nodeJoinController.startAccumulatingJoins();
+        final SimpleFuture electionFuture = new SimpleFuture("master election");
+        final Thread masterElection = new Thread(new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                logger.error("unexpected error from waitToBeElectedAsMaster", t);
+                electionFuture.markAsFailed(t);
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
+                    @Override
+                    public void onElectedAsMaster(ClusterState state) {
+                        assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
+                        electionFuture.markAsDone();
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        logger.error("unexpected error while waiting to be elected as master", t);
+                        electionFuture.markAsFailed(t);
+                    }
+                });
+            }
+        });
+        masterElection.start();
+
+            logger.debug("--> requiredJoins is set to 0. verifying election finished");
+            electionFuture.get();
+    }
+
+    public void testSimpleMasterElection() throws InterruptedException, ExecutionException {
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null);
+        clusterService.setState(ClusterState.builder(clusterService.state()).nodes(nodes));
+        int nodeId = 0;
+        final int requiredJoins = 1 + randomInt(5);
+        logger.debug("--> using requiredJoins [{}]", requiredJoins);
+        // initial (failing) joins shouldn't count
+        for (int i = randomInt(5); i > 0; i--) {
+            try {
+                joinNode(newNode(nodeId++));
+                fail("failed to fail node join when not a master");
+            } catch (ExecutionException e) {
+                assertThat(e.getCause(), instanceOf(NotMasterException.class));
+            }
+        }
+
+        nodeJoinController.startAccumulatingJoins();
+        final SimpleFuture electionFuture = new SimpleFuture("master election");
+        final Thread masterElection = new Thread(new AbstractRunnable() {
+            @Override
+            public void onFailure(Throwable t) {
+                logger.error("unexpected error from waitToBeElectedAsMaster", t);
+                electionFuture.markAsFailed(t);
+            }
+
+            @Override
+            protected void doRun() throws Exception {
+                nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
+                    @Override
+                    public void onElectedAsMaster(ClusterState state) {
+                        assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
+                        electionFuture.markAsDone();
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        logger.error("unexpected error while waiting to be elected as master", t);
+                        electionFuture.markAsFailed(t);
+                    }
+                });
+            }
+        });
+        masterElection.start();
+        assertThat("election finished immediately but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false));
+
+        final int initialJoins = randomIntBetween(0, requiredJoins - 1);
+        final ArrayList<SimpleFuture> pendingJoins = new ArrayList<>();
+        ArrayList<DiscoveryNode> nodesToJoin = new ArrayList<>();
+        for (int i = 0; i < initialJoins; i++) {
+            DiscoveryNode node = newNode(nodeId++, true);
+            for (int j = 1 + randomInt(3); j > 0; j--) {
+                nodesToJoin.add(node);
+            }
+        }
+
+        // data nodes shouldn't count
+        for (int i = 0; i < requiredJoins; i++) {
+            DiscoveryNode node = newNode(nodeId++, false);
+            for (int j = 1 + randomInt(3); j > 0; j--) {
+                nodesToJoin.add(node);
+            }
+        }
+
+        // add
+
+        Collections.shuffle(nodesToJoin);
+        logger.debug("--> joining [{}] unique master nodes. Total of [{}] join requests", initialJoins, nodesToJoin.size());
+        for (DiscoveryNode node : nodesToJoin) {
+            pendingJoins.add(joinNodeAsync(node));
+        }
+
+        logger.debug("--> asserting master election didn't finish yet");
+        assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false));
+
+        final int finalJoins = requiredJoins - initialJoins + randomInt(5);
+        nodesToJoin.clear();
+        for (int i = 0; i < finalJoins; i++) {
+            DiscoveryNode node = newNode(nodeId++, true);
+            for (int j = 1 + randomInt(3); j > 0; j--) {
+                nodesToJoin.add(node);
+            }
+        }
+
+        for (int i = 0; i < requiredJoins; i++) {
+            DiscoveryNode node = newNode(nodeId++, false);
+            for (int j = 1 + randomInt(3); j > 0; j--) {
+                nodesToJoin.add(node);
+            }
+        }
+
+        Collections.shuffle(nodesToJoin);
+        logger.debug("--> joining [{}] nodes, with repetition a total of [{}]", finalJoins, nodesToJoin.size());
+        for (DiscoveryNode node : nodesToJoin) {
+            pendingJoins.add(joinNodeAsync(node));
+        }
+        logger.debug("--> waiting for master election to with no exception");
+        electionFuture.get();
+
+        logger.debug("--> waiting on all joins to be processed");
+        for (SimpleFuture future : pendingJoins) {
+            logger.debug("waiting on {}", future);
+            future.get(); // throw any exception
+        }
+
+        logger.debug("--> testing accumulation stopped");
+        nodeJoinController.startAccumulatingJoins();
+        nodeJoinController.stopAccumulatingJoins();
+
+    }
+
+
+    public void testMasterElectionTimeout() throws InterruptedException {
+        DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null);
+        clusterService.setState(ClusterState.builder(clusterService.state()).nodes(nodes));
+        int nodeId = 0;
+        final int requiredJoins = 1 + randomInt(5);
+        logger.debug("--> using requiredJoins [{}]", requiredJoins);
+        // initial (failing) joins shouldn't count
+        for (int i = randomInt(5); i > 0; i--) {
+            try {
+                joinNode(newNode(nodeId++));
+                fail("failed to fail node join when not a master");
+            } catch (ExecutionException e) {
+                assertThat(e.getCause(), instanceOf(NotMasterException.class));
+            }
+        }
+
+        nodeJoinController.startAccumulatingJoins();
+        final int initialJoins = randomIntBetween(0, requiredJoins - 1);
+        final ArrayList<SimpleFuture> pendingJoins = new ArrayList<>();
+        ArrayList<DiscoveryNode> nodesToJoin = new ArrayList<>();
+        for (int i = 0; i < initialJoins; i++) {
+            DiscoveryNode node = newNode(nodeId++);
+            for (int j = 1 + randomInt(3); j > 0; j--) {
+                nodesToJoin.add(node);
+            }
+        }
+        Collections.shuffle(nodesToJoin);
+        logger.debug("--> joining [{}] nodes, with repetition a total of [{}]", initialJoins, nodesToJoin.size());
+        for (DiscoveryNode node : nodesToJoin) {
+            pendingJoins.add(joinNodeAsync(node));
+        }
+
+        final AtomicReference<Throwable> failure = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.Callback() {
+            @Override
+            public void onElectedAsMaster(ClusterState state) {
+                assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                failure.set(t);
+                latch.countDown();
+            }
+        });
+        latch.await();
+        logger.debug("--> verifying election timed out");
+        assertThat(failure.get(), instanceOf(ElasticsearchTimeoutException.class));
+
+        logger.debug("--> verifying all joins are failed");
+        for (SimpleFuture future : pendingJoins) {
+            logger.debug("waiting on {}", future);
+            try {
+                future.get(); // throw any exception
+                fail("failed to fail node join [" + future + "]");
+            } catch (ExecutionException e) {
+                assertThat(e.getCause(), instanceOf(NotMasterException.class));
+            }
+        }
+    }
+
+    public void testNewClusterStateOnExistingNodeJoin() throws InterruptedException, ExecutionException {
+        ClusterState state = clusterService.state();
+        final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(state.nodes());
+        final DiscoveryNode other_node = new DiscoveryNode("other_node", DummyTransportAddress.INSTANCE, Version.CURRENT);
+        nodesBuilder.put(other_node);
+        clusterService.setState(ClusterState.builder(state).nodes(nodesBuilder));
+
+        state = clusterService.state();
+        joinNode(other_node);
+        assertTrue("failed to publish a new state upon existing join", clusterService.state() != state);
+    }
+
+    public void testNormalConcurrentJoins() throws InterruptedException {
+        Thread[] threads = new Thread[3 + randomInt(5)];
+        ArrayList<DiscoveryNode> nodes = new ArrayList<>();
+        nodes.add(clusterService.localNode());
+        final CyclicBarrier barrier = new CyclicBarrier(threads.length);
+        final List<Throwable> backgroundExceptions = new CopyOnWriteArrayList<>();
+        for (int i = 0; i < threads.length; i++) {
+            final DiscoveryNode node = newNode(i);
+            final int iterations = rarely() ? randomIntBetween(1, 4) : 1;
+            nodes.add(node);
+            threads[i] = new Thread(new AbstractRunnable() {
+                @Override
+                public void onFailure(Throwable t) {
+                    logger.error("unexpected error in join thread", t);
+                    backgroundExceptions.add(t);
+                }
+
+                @Override
+                protected void doRun() throws Exception {
+                    barrier.await();
+                    for (int i = 0; i < iterations; i++) {
+                        logger.debug("{} joining", node);
+                        joinNode(node);
+                    }
+                }
+            }, "t_" + i);
+            threads[i].start();
+        }
+
+        logger.info("--> waiting for joins to complete");
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        assertNodesInCurrentState(nodes);
+    }
+
+    public void testElectionWithConcurrentJoins() throws InterruptedException, BrokenBarrierException {
+        DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterService.state().nodes()).masterNodeId(null);
+        clusterService.setState(ClusterState.builder(clusterService.state()).nodes(nodesBuilder));
+
+        nodeJoinController.startAccumulatingJoins();
+
+        Thread[] threads = new Thread[3 + randomInt(5)];
+        final int requiredJoins = randomInt(threads.length);
+        ArrayList<DiscoveryNode> nodes = new ArrayList<>();
+        nodes.add(clusterService.localNode());
+        final CyclicBarrier barrier = new CyclicBarrier(threads.length + 1);
+        final List<Throwable> backgroundExceptions = new CopyOnWriteArrayList<>();
+        for (int i = 0; i < threads.length; i++) {
+            final DiscoveryNode node = newNode(i, true);
+            final int iterations = rarely() ? randomIntBetween(1, 4) : 1;
+            nodes.add(node);
+            threads[i] = new Thread(new AbstractRunnable() {
+                @Override
+                public void onFailure(Throwable t) {
+                    logger.error("unexpected error in join thread", t);
+                    backgroundExceptions.add(t);
+                }
+
+                @Override
+                protected void doRun() throws Exception {
+                    barrier.await();
+                    for (int i = 0; i < iterations; i++) {
+                        logger.debug("{} joining", node);
+                        joinNode(node);
+                    }
+                }
+            }, "t_" + i);
+            threads[i].start();
+        }
+
+        barrier.await();
+        logger.info("--> waiting to be elected as master (required joins [{}])", requiredJoins);
+        final AtomicReference<Throwable> failure = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
+            @Override
+            public void onElectedAsMaster(ClusterState state) {
+                assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.error("unexpected error while waiting to be elected as master", t);
+                failure.set(t);
+                latch.countDown();
+            }
+        });
+        latch.await();
+        ExceptionsHelper.reThrowIfNotNull(failure.get());
+
+
+        logger.info("--> waiting for joins to complete");
+        for (Thread thread : threads) {
+            thread.join();
+        }
+
+        assertNodesInCurrentState(nodes);
+    }
+
+
+    static class NoopRoutingService extends RoutingService {
+
+        public NoopRoutingService(Settings settings) {
+            super(settings, null, null, new NoopAllocationService(settings));
+        }
+
+        @Override
+        protected void performReroute(String reason) {
+
+        }
+    }
+
+    static class NoopAllocationService extends AllocationService {
+
+        public NoopAllocationService(Settings settings) {
+            super(settings, null, null, null);
+        }
+
+        @Override
+        public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
+            return new RoutingAllocation.Result(false, clusterState.routingTable());
+        }
+
+        @Override
+        public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
+            return new RoutingAllocation.Result(false, clusterState.routingTable());
+        }
+
+        @Override
+        public RoutingAllocation.Result reroute(ClusterState clusterState, boolean debug) {
+            return new RoutingAllocation.Result(false, clusterState.routingTable());
+        }
+    }
+
+    protected void assertNodesInCurrentState(List<DiscoveryNode> expectedNodes) {
+        DiscoveryNodes discoveryNodes = clusterService.state().nodes();
+        assertThat(discoveryNodes.prettyPrint() + "\nexpected: " + expectedNodes.toString(), discoveryNodes.size(), equalTo(expectedNodes.size()));
+        for (DiscoveryNode node : expectedNodes) {
+            assertThat("missing " + node + "\n" + discoveryNodes.prettyPrint(), discoveryNodes.get(node.id()), equalTo(node));
+        }
+    }
+
+    static class SimpleFuture extends BaseFuture<Void> {
+        final String description;
+
+        SimpleFuture(String description) {
+            this.description = description;
+        }
+
+        public void markAsDone() {
+            set(null);
+        }
+
+        public void markAsFailed(Throwable t) {
+            setException(t);
+        }
+
+        @Override
+        public String toString() {
+            return "future [" + description + "]";
+        }
+    }
+
+    final static AtomicInteger joinId = new AtomicInteger();
+
+    private SimpleFuture joinNodeAsync(final DiscoveryNode node) throws InterruptedException {
+        final SimpleFuture future = new SimpleFuture("join of " + node + " (id [" + joinId.incrementAndGet() + "]");
+        logger.debug("starting {}", future);
+        nodeJoinController.handleJoinRequest(node, new MembershipAction.JoinCallback() {
+            @Override
+            public void onSuccess() {
+                logger.debug("{} completed", future);
+                future.markAsDone();
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                logger.error("unexpected error for {}", t, future);
+                future.markAsFailed(t);
+            }
+        });
+        return future;
+    }
+
+    private void joinNode(final DiscoveryNode node) throws InterruptedException, ExecutionException {
+        joinNodeAsync(node).get();
+    }
+
+    protected DiscoveryNode newNode(int i) {
+        return newNode(i, randomBoolean());
+    }
+
+    protected DiscoveryNode newNode(int i, boolean master) {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("master", Boolean.toString(master));
+        final String prefix = master ? "master_" : "data_";
+        return new DiscoveryNode(prefix + i, i + "", new LocalTransportAddress("test_" + i), attributes, Version.CURRENT);
+    }
+}

+ 1 - 1
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java

@@ -216,7 +216,7 @@ public class ZenDiscoveryTests extends ElasticsearchIntegrationTest {
         });
         latch.await();
         assertThat(reference.get(), notNullValue());
-        assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master then the current one, rejecting "));
+        assertThat(ExceptionsHelper.detailedMessage(reference.get()), containsString("cluster state from a different master than the current one, rejecting"));
     }
 
     @Test

+ 1 - 1
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTest.java

@@ -75,7 +75,7 @@ public class ZenDiscoveryUnitTest extends ElasticsearchTestCase {
             shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build());
             fail("should ignore, because current state's master is not equal to new state's master");
         } catch (IllegalStateException e) {
-            assertThat(e.getMessage(), containsString("cluster state from a different master then the current one, rejecting"));
+            assertThat(e.getMessage(), containsString("cluster state from a different master than the current one, rejecting"));
         }
 
         currentNodes = DiscoveryNodes.builder();

+ 43 - 16
core/src/test/java/org/elasticsearch/test/cluster/TestClusterService.java

@@ -30,6 +30,10 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.component.LifecycleListener;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.ESLoggerFactory;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.DummyTransportAddress;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -50,6 +54,7 @@ public class TestClusterService implements ClusterService {
     private final Collection<ClusterStateListener> listeners = new CopyOnWriteArrayList<>();
     private final Queue<NotifyTimeout> onGoingTimeouts = ConcurrentCollections.newQueue();
     private final ThreadPool threadPool;
+    private final ESLogger logger = Loggers.getLogger(getClass(), Settings.EMPTY);
 
     public TestClusterService() {
         this(ClusterState.builder(new ClusterName("test")).build());
@@ -67,8 +72,8 @@ public class TestClusterService implements ClusterService {
         if (state.getNodes().size() == 0) {
             state = ClusterState.builder(state).nodes(
                     DiscoveryNodes.builder()
-                            .put(new DiscoveryNode("test_id", DummyTransportAddress.INSTANCE, Version.CURRENT))
-                            .localNodeId("test_id")).build();
+                            .put(new DiscoveryNode("test_node", DummyTransportAddress.INSTANCE, Version.CURRENT))
+                            .localNodeId("test_node")).build();
         }
 
         assert state.getNodes().localNode() != null;
@@ -78,21 +83,26 @@ public class TestClusterService implements ClusterService {
     }
 
 
-    /** set the current state and trigger any registered listeners about the change */
-    public void setState(ClusterState state) {
+    /** set the current state and trigger any registered listeners about the change, mimicking an update task */
+    synchronized public ClusterState setState(ClusterState state) {
         assert state.getNodes().localNode() != null;
         // make sure we have a version increment
         state = ClusterState.builder(state).version(this.state.version() + 1).build();
+        return setStateAndNotifyListeners(state);
+    }
+
+    private ClusterState setStateAndNotifyListeners(ClusterState state) {
         ClusterChangedEvent event = new ClusterChangedEvent("test", state, this.state);
         this.state = state;
         for (ClusterStateListener listener : listeners) {
             listener.clusterChanged(event);
         }
+        return state;
     }
 
     /** set the current state and trigger any registered listeners about the change */
-    public void setState(ClusterState.Builder state) {
-        setState(state.build());
+    public ClusterState setState(ClusterState.Builder state) {
+        return setState(state.build());
     }
 
     @Override
@@ -172,28 +182,45 @@ public class TestClusterService implements ClusterService {
     }
 
     @Override
-    public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
-        throw new UnsupportedOperationException();
+    synchronized public void submitStateUpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
+        logger.debug("processing [{}]", source);
+        if (state().nodes().localNodeMaster() == false && updateTask.runOnlyOnMaster()) {
+            updateTask.onNoLongerMaster(source);
+            logger.debug("failed [{}], no longer master", source);
+            return;
+        }
+        ClusterState newState;
+        ClusterState previousClusterState = state;
+        try {
+            newState = updateTask.execute(previousClusterState);
+        } catch (Exception e) {
+            throw new ElasticsearchException("failed to process cluster state update task [" + source + "]", e);
+        }
+        setStateAndNotifyListeners(newState);
+        if (updateTask instanceof ProcessedClusterStateUpdateTask) {
+            ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newState);
+        }
+        logger.debug("finished [{}]", source);
     }
 
     @Override
     public void submitStateUpdateTask(String source, ClusterStateUpdateTask updateTask) {
-        throw new UnsupportedOperationException();
+        submitStateUpdateTask(source, Priority.NORMAL, updateTask);
     }
 
     @Override
-    public List<PendingClusterTask> pendingTasks() {
+    public TimeValue getMaxTaskWaitTime() {
         throw new UnsupportedOperationException();
-
     }
 
     @Override
-    public int numberOfPendingTasks() {
+    public List<PendingClusterTask> pendingTasks() {
         throw new UnsupportedOperationException();
+
     }
 
     @Override
-    public TimeValue getMaxTaskWaitTime() {
+    public int numberOfPendingTasks() {
         throw new UnsupportedOperationException();
     }
 
@@ -213,17 +240,17 @@ public class TestClusterService implements ClusterService {
     }
 
     @Override
-    public ClusterService start() {
+    public ClusterService start() throws ElasticsearchException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public ClusterService stop() {
+    public ClusterService stop() throws ElasticsearchException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public void close() {
+    public void close() throws ElasticsearchException {
         throw new UnsupportedOperationException();
     }
 

+ 7 - 3
docs/reference/modules/discovery/zen.asciidoc

@@ -88,10 +88,14 @@ Nodes can be excluded from becoming a master by setting `node.master` to
 automatically set to `false`).
 
 The `discovery.zen.minimum_master_nodes` sets the minimum
-number of master eligible nodes a node should "see" in order to win a master election.
-It must be set to a quorum of your master eligible nodes. It is recommended to avoid
+number of master eligible nodes that need to join a newly elected master in order for an election to
+complete and for the elected node to accept it's mastership. The same setting controls the minimum number of
+active master eligible nodes that should be a part of any active cluster. If this requirement is not met the
+active master node will step down and a new mastser election will be begin.
+
+This setting must be set to a quorum of your master eligible nodes. It is recommended to avoid
 having only two master eligible nodes, since a quorum of two is two. Therefore, a loss
-of either master node will result in an inoperable cluster
+of either master node will result in an inoperable cluster.
 
 [float]
 [[fault-detection]]

+ 13 - 0
docs/resiliency/index.asciidoc

@@ -88,6 +88,19 @@ Further issues remain with the retry mechanism:
 
 See {GIT}9967[#9967]. (STATUS: ONGOING)
 
+[float]
+=== Wait on incoming joins before electing local node as master (STATUS: ONGOING)
+
+During master election each node pings in order to discover other nodes and validate the liveness of existing
+nodes. Based on this information the node either discovers an existing master or, if enough nodes are found
+(see https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html#master-election[`discovery.zen.minimum_master_nodes`]) a new master will be elected. Currently, the node that is
+elected as master will update the cluster state to indicate the result of the election. Other nodes will submit
+a join request to the newly elected master node. Instead of immediately processing the election result, the elected master
+node should wait for the incoming joins from other nodes, thus validating that the result of the election is properly applied. As soon as enough
+nodes have sent their joins request (based on the `minimum_master_nodes` settings) the cluster state is updated.
+{GIT}12161[#12161]
+
+
 [float]
 === Write index metadata on data nodes where shards allocated (STATUS: ONGOING)