Browse Source

Merge pull request #17038 from jasontedor/enable_acked

Prepare for enabling acked indexing
Jason Tedor 9 years ago
parent
commit
0a69985153

+ 36 - 24
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -79,6 +79,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -679,7 +680,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
                 return;
             }
             // closed in finishAsFailed(e) in the case of error
-            indexShardReference = getIndexShardReferenceOnPrimary(shardId);
+            indexShardReference = getIndexShardReferenceOnPrimary(shardId, request);
             if (indexShardReference.isRelocated() == false) {
                 executeLocally();
             } else {
@@ -797,7 +798,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
      * returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
      * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}).
      */
-    protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
+    protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
         // we may end up here if the cluster state used to route the primary is so stale that the underlying
@@ -816,7 +817,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
     protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
-        return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
+        IndexShardReference ref = IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
+        return ref;
     }
 
     /**
@@ -997,30 +999,38 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
                                 String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node);
                                 logger.warn("[{}] {}", exp, shardId, message);
                                 shardStateAction.shardFailed(
-                                    shard,
-                                    indexShardReference.routingEntry(),
-                                    message,
-                                    exp,
-                                    new ShardStateAction.Listener() {
-                                        @Override
-                                        public void onSuccess() {
-                                            onReplicaFailure(nodeId, exp);
-                                        }
-
-                                        @Override
-                                        public void onFailure(Throwable shardFailedError) {
-                                            if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
-                                                ShardRouting primaryShard = indexShardReference.routingEntry();
-                                                String message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
-                                                // we are no longer the primary, fail ourselves and start over
-                                                indexShardReference.failShard(message, shardFailedError);
-                                                forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
-                                            } else {
-                                                assert false : shardFailedError;
+                                        shard,
+                                        indexShardReference.routingEntry(),
+                                        message,
+                                        exp,
+                                        new ShardStateAction.Listener() {
+                                            @Override
+                                            public void onSuccess() {
                                                 onReplicaFailure(nodeId, exp);
                                             }
+
+                                            @Override
+                                            public void onFailure(Throwable shardFailedError) {
+                                                if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
+                                                    String message = "unknown";
+                                                    try {
+                                                        ShardRouting primaryShard = indexShardReference.routingEntry();
+                                                        message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp);
+                                                        // we are no longer the primary, fail ourselves and start over
+                                                        indexShardReference.failShard(message, shardFailedError);
+                                                    } catch (Throwable t) {
+                                                        shardFailedError.addSuppressed(t);
+                                                    }
+                                                    forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError));
+                                                } else {
+                                                    // these can occur if the node is shutting down and are okay
+                                                    // any other exception here is not expected and merits investigation
+                                                    assert shardFailedError instanceof TransportException ||
+                                                            shardFailedError instanceof NodeClosedException : shardFailedError;
+                                                    onReplicaFailure(nodeId, exp);
+                                                }
+                                            }
                                         }
-                                    }
                                 );
                             }
                         }
@@ -1108,7 +1118,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
 
     interface IndexShardReference extends Releasable {
         boolean isRelocated();
+
         void failShard(String reason, @Nullable Throwable e);
+
         ShardRouting routingEntry();
 
         /** returns the primary term of the current operation */

+ 2 - 1
core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

@@ -53,6 +53,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.EmptyTransportResponseHandler;
 import org.elasticsearch.transport.NodeDisconnectedException;
+import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
@@ -111,7 +112,7 @@ public class ShardStateAction extends AbstractComponent {
                             waitForNewMasterAndRetry(actionName, observer, shardRoutingEntry, listener);
                         } else {
                             logger.warn("{} unexpected failure while sending request [{}] to [{}] for shard [{}]", exp, shardRoutingEntry.getShardRouting().shardId(), actionName, masterNode, shardRoutingEntry);
-                            listener.onFailure(exp.getCause());
+                            listener.onFailure(exp instanceof RemoteTransportException ? exp.getCause() : exp);
                         }
                     }
                 });

+ 7 - 0
core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -210,6 +210,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
     @Override
     synchronized protected void doStop() {
         for (NotifyTimeout onGoingTimeout : onGoingTimeouts) {
+            onGoingTimeout.cancel();
             try {
                 onGoingTimeout.cancel();
                 onGoingTimeout.listener.onClose();
@@ -218,6 +219,12 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
             }
         }
         ThreadPool.terminate(updateTasksExecutor, 10, TimeUnit.SECONDS);
+        // close timeout listeners that did not have an ongoing timeout
+        postAppliedListeners
+                .stream()
+                .filter(listener -> listener instanceof TimeoutClusterStateListener)
+                .map(listener -> (TimeoutClusterStateListener)listener)
+                .forEach(TimeoutClusterStateListener::onClose);
         remove(localNodeMasterListeners);
     }
 

+ 22 - 5
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -36,6 +36,7 @@ import org.elasticsearch.cluster.routing.RoutingService;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.inject.Inject;
@@ -188,7 +189,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
         this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, clusterName);
         this.nodesFD.addListener(new NodeFaultDetectionListener());
 
-        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewPendingClusterStateListener(), discoverySettings, clusterName);
+        this.publishClusterState =
+                new PublishClusterStateAction(
+                        settings,
+                        transportService,
+                        clusterService::state,
+                        new NewPendingClusterStateListener(),
+                        discoverySettings,
+                        clusterName);
         this.pingService.setPingContextProvider(this);
         this.membership = new MembershipAction(settings, clusterService, transportService, this, new MembershipListener());
 
@@ -766,15 +774,24 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
      * If the first condition fails we reject the cluster state and throw an error.
      * If the second condition fails we ignore the cluster state.
      */
-    static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
+    @SuppressForbidden(reason = "debug")
+    public static boolean shouldIgnoreOrRejectNewClusterState(ESLogger logger, ClusterState currentState, ClusterState newClusterState) {
         validateStateIsFromCurrentMaster(logger, currentState.nodes(), newClusterState);
-        if (currentState.supersedes(newClusterState)) {
+
+        // reject cluster states that are not new from the same master
+        if (currentState.supersedes(newClusterState) ||
+                (newClusterState.nodes().getMasterNodeId().equals(currentState.nodes().getMasterNodeId()) && currentState.version() == newClusterState.version())) {
             // if the new state has a smaller version, and it has the same master node, then no need to process it
+            logger.debug("received a cluster state that is not newer than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
+            return true;
+        }
+
+        // reject older cluster states if we are following a master
+        if (currentState.nodes().getMasterNodeId() != null && newClusterState.version() < currentState.version()) {
             logger.debug("received a cluster state that has a lower version than the current one, ignoring (received {}, current {})", newClusterState.version(), currentState.version());
             return true;
-        } else {
-            return false;
         }
+        return false;
     }
 
     /**

+ 8 - 6
core/src/main/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueue.java

@@ -164,16 +164,18 @@ public class PendingClusterStatesQueue {
                             currentMaster
                     );
                 }
-            } else if (state.supersedes(pendingState) && pendingContext.committed()) {
-                logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
-                        pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
-                );
-                contextsToRemove.add(pendingContext);
-                pendingContext.listener.onNewClusterStateProcessed();
             } else if (pendingState.stateUUID().equals(state.stateUUID())) {
                 assert pendingContext.committed() : "processed cluster state is not committed " + state;
                 contextsToRemove.add(pendingContext);
                 pendingContext.listener.onNewClusterStateProcessed();
+            } else if (state.version() >= pendingState.version()) {
+                logger.trace("processing pending state uuid[{}]/v[{}] together with state uuid[{}]/v[{}]",
+                        pendingState.stateUUID(), pendingState.version(), state.stateUUID(), state.version()
+                );
+                contextsToRemove.add(pendingContext);
+                if (pendingContext.committed()) {
+                    pendingContext.listener.onNewClusterStateProcessed();
+                }
             }
         }
         // now ack the processed state

+ 28 - 11
core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java

@@ -41,7 +41,6 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
 import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
 import org.elasticsearch.discovery.Discovery;
 import org.elasticsearch.discovery.DiscoverySettings;
-import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
 import org.elasticsearch.discovery.zen.ZenDiscovery;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.BytesTransportRequest;
@@ -58,11 +57,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 /**
  *
@@ -81,17 +82,22 @@ public class PublishClusterStateAction extends AbstractComponent {
     }
 
     private final TransportService transportService;
-    private final DiscoveryNodesProvider nodesProvider;
+    private final Supplier<ClusterState> clusterStateSupplier;
     private final NewPendingClusterStateListener newPendingClusterStatelistener;
     private final DiscoverySettings discoverySettings;
     private final ClusterName clusterName;
     private final PendingClusterStatesQueue pendingStatesQueue;
 
-    public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
-                                     NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
+    public PublishClusterStateAction(
+            Settings settings,
+            TransportService transportService,
+            Supplier<ClusterState> clusterStateSupplier,
+            NewPendingClusterStateListener listener,
+            DiscoverySettings discoverySettings,
+            ClusterName clusterName) {
         super(settings);
         this.transportService = transportService;
-        this.nodesProvider = nodesProvider;
+        this.clusterStateSupplier = clusterStateSupplier;
         this.newPendingClusterStatelistener = listener;
         this.discoverySettings = discoverySettings;
         this.clusterName = clusterName;
@@ -363,7 +369,7 @@ public class PublishClusterStateAction extends AbstractComponent {
             final ClusterState incomingState;
             // If true we received full cluster state - otherwise diffs
             if (in.readBoolean()) {
-                incomingState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
+                incomingState = ClusterState.Builder.readFrom(in, clusterStateSupplier.get().nodes().getLocalNode());
                 logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
             } else if (lastSeenClusterState != null) {
                 Diff<ClusterState> diff = lastSeenClusterState.readDiffFrom(in);
@@ -394,14 +400,25 @@ public class PublishClusterStateAction extends AbstractComponent {
             logger.warn("received cluster state from [{}] which is also master but with a different cluster name [{}]", incomingState.nodes().getMasterNode(), incomingClusterName);
             throw new IllegalStateException("received state from a node that is not part of the cluster");
         }
-        final DiscoveryNodes currentNodes = nodesProvider.nodes();
+        final ClusterState clusterState = clusterStateSupplier.get();
 
-        if (currentNodes.getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
+        if (clusterState.nodes().getLocalNode().equals(incomingState.nodes().getLocalNode()) == false) {
             logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", incomingState.nodes().getMasterNode());
-            throw new IllegalStateException("received state from a node that is not part of the cluster");
+            throw new IllegalStateException("received state with a local node that does not match the current local node");
+        }
+
+        if (ZenDiscovery.shouldIgnoreOrRejectNewClusterState(logger, clusterState, incomingState)) {
+            String message = String.format(
+                    Locale.ROOT,
+                    "rejecting cluster state version [%d] uuid [%s] received from [%s]",
+                    incomingState.version(),
+                    incomingState.stateUUID(),
+                    incomingState.nodes().getMasterNodeId()
+            );
+            logger.warn(message);
+            throw new IllegalStateException(message);
         }
 
-        ZenDiscovery.validateStateIsFromCurrentMaster(logger, currentNodes, incomingState);
     }
 
     protected void handleCommitRequest(CommitClusterStateRequest request, final TransportChannel channel) {
@@ -518,7 +535,7 @@ public class PublishClusterStateAction extends AbstractComponent {
             }
 
             if (timedout) {
-                markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "]");
+                markAsFailed("timed out waiting for commit (commit timeout [" + commitTimeout + "])");
             }
             if (isCommitted() == false) {
                 throw new Discovery.FailedToCommitClusterStateException("{} enough masters to ack sent cluster state. [{}] left",

+ 4 - 1
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -1062,6 +1062,9 @@ public class TransportReplicationActionTests extends ESTestCase {
             @Override
             public void failShard(String reason, @Nullable Throwable e) {
                 isShardFailed.set(true);
+                if (randomBoolean()) {
+                    throw new ElasticsearchException("simulated");
+                }
             }
 
             @Override
@@ -1173,7 +1176,7 @@ public class TransportReplicationActionTests extends ESTestCase {
 
 
         @Override
-        protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
+        protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) {
             final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
             return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id()));
         }

+ 116 - 74
core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java

@@ -20,7 +20,6 @@
 package org.elasticsearch.discovery;
 
 import org.apache.lucene.index.CorruptIndexException;
-import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
@@ -138,6 +137,13 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         return 1;
     }
 
+    @Override
+    protected void beforeIndexDeletion() {
+        // some test may leave operations in flight
+        // this is because the disruption schemes swallow requests by design
+        // as such, these operations will never be marked as finished
+    }
+
     private List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
         return startCluster(numberOfNodes, -1);
     }
@@ -146,7 +152,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         return startCluster(numberOfNodes, minimumMasterNode, null);
     }
 
-    private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws ExecutionException, InterruptedException {
+    private List<String> startCluster(int numberOfNodes, int minimumMasterNode, @Nullable int[] unicastHostsOrdinals) throws
+            ExecutionException, InterruptedException {
         configureUnicastCluster(numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
         List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
         ensureStableCluster(numberOfNodes);
@@ -175,11 +182,20 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         return pluginList(MockTransportService.TestPlugin.class);
     }
 
-    private void configureUnicastCluster(int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
+    private void configureUnicastCluster(
+        int numberOfNodes,
+        @Nullable int[] unicastHostsOrdinals,
+        int minimumMasterNode
+    ) throws ExecutionException, InterruptedException {
         configureUnicastCluster(DEFAULT_SETTINGS, numberOfNodes, unicastHostsOrdinals, minimumMasterNode);
     }
 
-    private void configureUnicastCluster(Settings settings, int numberOfNodes, @Nullable int[] unicastHostsOrdinals, int minimumMasterNode) throws ExecutionException, InterruptedException {
+    private void configureUnicastCluster(
+        Settings settings,
+        int numberOfNodes,
+        @Nullable int[] unicastHostsOrdinals,
+        int minimumMasterNode
+    ) throws ExecutionException, InterruptedException {
         if (minimumMasterNode < 0) {
             minimumMasterNode = numberOfNodes / 2 + 1;
         }
@@ -253,7 +269,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         logger.info("--> reducing min master nodes to 2");
         assertAcked(client().admin().cluster().prepareUpdateSettings()
-                .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)).get());
+                .setTransientSettings(Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2))
+                .get());
 
         String master = internalCluster().getMasterName();
         String nonMaster = null;
@@ -278,8 +295,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         // Makes sure that the get request can be executed on each node locally:
         assertAcked(prepareCreate("test").setSettings(Settings.builder()
-                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
+                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
         ));
 
         // Everything is stable now, it is now time to simulate evil...
@@ -359,8 +376,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         assertAcked(prepareCreate("test")
                 .setSettings(Settings.builder()
-                                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
-                                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
                 ));
 
         ensureGreen();
@@ -380,7 +397,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         networkPartition.stopDisrupting();
 
         for (String node : nodes) {
-            ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()), true, node);
+            ensureStableCluster(3, new TimeValue(DISRUPTION_HEALING_OVERHEAD.millis() + networkPartition.expectedTimeToHeal().millis()),
+                    true, node);
         }
 
         logger.info("issue a reroute");
@@ -421,17 +439,20 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
      * <p>
      * This test is a superset of tests run in the Jepsen test suite, with the exception of versioned updates
      */
-    // NOTE: if you remove the awaitFix, make sure to port the test to the 1.x branch
-    @LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize")
-    @TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
+    @TestLogging("_root:DEBUG,action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,"
+            + "indices.recovery:TRACE,indices.cluster:TRACE")
     public void testAckedIndexing() throws Exception {
+
+        final int seconds = !(TEST_NIGHTLY && rarely()) ? 1 : 5;
+        final String timeout = seconds + "s";
+
         // TODO: add node count randomizaion
         final List<String> nodes = startCluster(3);
 
         assertAcked(prepareCreate("test")
                 .setSettings(Settings.builder()
-                                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
-                                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
                 ));
         ensureGreen();
 
@@ -455,36 +476,34 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
                 final Client client = client(node);
                 final String name = "indexer_" + indexers.size();
                 final int numPrimaries = getNumShards("test").numPrimaries;
-                Thread thread = new Thread(new Runnable() {
-                    @Override
-                    public void run() {
-                        while (!stop.get()) {
-                            String id = null;
+                Thread thread = new Thread(() -> {
+                    while (!stop.get()) {
+                        String id = null;
+                        try {
+                            if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
+                                continue;
+                            }
+                            logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits());
                             try {
-                                if (!semaphore.tryAcquire(10, TimeUnit.SECONDS)) {
-                                    continue;
-                                }
-                                logger.info("[{}] Acquired semaphore and it has {} permits left", name, semaphore.availablePermits());
-                                try {
-                                    id = Integer.toString(idGenerator.incrementAndGet());
-                                    int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
-                                    logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
-                                    IndexResponse response = client.prepareIndex("test", "type", id).setSource("{}").setTimeout("1s").get();
-                                    assertThat(response.getVersion(), equalTo(1L));
-                                    ackedDocs.put(id, node);
-                                    logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
-                                } catch (ElasticsearchException e) {
-                                    exceptedExceptions.add(e);
-                                    logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node);
-                                } finally {
-                                    countDownLatchRef.get().countDown();
-                                    logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
-                                }
-                            } catch (InterruptedException e) {
-                                // fine - semaphore interrupt
-                            } catch (Throwable t) {
-                                logger.info("unexpected exception in background thread of [{}]", t, node);
+                                id = Integer.toString(idGenerator.incrementAndGet());
+                                int shard = Math.floorMod(Murmur3HashFunction.hash(id), numPrimaries);
+                                logger.trace("[{}] indexing id [{}] through node [{}] targeting shard [{}]", name, id, node, shard);
+                                IndexResponse response =
+                                        client.prepareIndex("test", "type", id).setSource("{}").setTimeout(timeout).get(timeout);
+                                assertTrue("doc [" + id + "] should have been created", response.isCreated());
+                                ackedDocs.put(id, node);
+                                logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
+                            } catch (ElasticsearchException e) {
+                                exceptedExceptions.add(e);
+                                logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node);
+                            } finally {
+                                countDownLatchRef.get().countDown();
+                                logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
                             }
+                        } catch (InterruptedException e) {
+                            // fine - semaphore interrupt
+                        } catch (Throwable t) {
+                            logger.info("unexpected exception in background thread of [{}]", t, node);
                         }
                     }
                 });
@@ -514,11 +533,15 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
                     assertThat(semaphore.availablePermits(), equalTo(0));
                     semaphore.release(docsPerIndexer);
                 }
-                assertTrue(countDownLatchRef.get().await(60000 + disruptionScheme.expectedTimeToHeal().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS));
+                logger.info("waiting for indexing requests to complete");
+                assertTrue(countDownLatchRef.get().await(docsPerIndexer * seconds * 1000 + 2000, TimeUnit.MILLISECONDS));
 
                 logger.info("stopping disruption");
                 disruptionScheme.stopDisrupting();
-                ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + DISRUPTION_HEALING_OVERHEAD.millis()));
+                for (String node : internalCluster().getNodeNames()) {
+                    ensureStableCluster(3, TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() +
+                            DISRUPTION_HEALING_OVERHEAD.millis()), true, node);
+                }
                 ensureGreen("test");
 
                 logger.info("validating successful docs");
@@ -615,7 +638,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         majoritySide.remove(oldMasterNode);
 
         // Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
-        final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String, String>>>());
+        final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<String, List<Tuple<String,
+                String>>>());
         for (final String node : majoritySide) {
             masters.put(node, new ArrayList<Tuple<String, String>>());
             internalCluster().getInstance(ClusterService.class, node).add(new ClusterStateListener() {
@@ -624,7 +648,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
                     DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
                     DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
                     if (!Objects.equals(previousMaster, currentMaster)) {
-                        logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(), event.previousState());
+                        logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
+                                event.previousState());
                         String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
                         String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
                         masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
@@ -656,7 +681,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         // but will be queued and once the old master node un-freezes it gets executed.
         // The old master node will send this update + the cluster state where he is flagged as master to the other
         // nodes that follow the new master. These nodes should ignore this update.
-        internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
+        internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
+                ClusterStateUpdateTask(Priority.IMMEDIATE) {
             @Override
             public ClusterState execute(ClusterState currentState) throws Exception {
                 return ClusterState.builder(currentState).build();
@@ -693,11 +719,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
             String nodeName = entry.getKey();
             List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
-            assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(), equalTo(2));
-            assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(), equalTo(oldMasterNode));
-            assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(0).v2(), nullValue());
-            assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(), nullValue());
-            assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]", recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
+            assertThat("[" + nodeName + "] Each node should only record two master node transitions", recordedMasterTransition.size(),
+                    equalTo(2));
+            assertThat("[" + nodeName + "] First transition's previous master should be [null]", recordedMasterTransition.get(0).v1(),
+                    equalTo(oldMasterNode));
+            assertThat("[" + nodeName + "] First transition's current master should be [" + newMasterNode + "]", recordedMasterTransition
+                    .get(0).v2(), nullValue());
+            assertThat("[" + nodeName + "] Second transition's previous master should be [null]", recordedMasterTransition.get(1).v1(),
+                    nullValue());
+            assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
+                    recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
         }
     }
 
@@ -710,8 +741,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         assertAcked(prepareCreate("test")
                 .setSettings(Settings.builder()
-                                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
                 )
                 .get());
         ensureGreen("test");
@@ -727,7 +758,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         assertFalse(client(notIsolatedNode).admin().cluster().prepareHealth("test").setWaitForYellowStatus().get().isTimedOut());
 
 
-        IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get();
+        IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value")
+                .get();
         assertThat(indexResponse.getVersion(), equalTo(1L));
 
         logger.info("Verifying if document exists via node[{}]", notIsolatedNode);
@@ -845,17 +877,21 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         DiscoveryNodes discoveryNodes = internalCluster().getInstance(ClusterService.class, nonMasterNode).state().nodes();
 
-        TransportService masterTranspotService = internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
+        TransportService masterTranspotService =
+                internalCluster().getInstance(TransportService.class, discoveryNodes.getMasterNode().getName());
 
         logger.info("blocking requests from non master [{}] to master [{}]", nonMasterNode, masterNode);
-        MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode);
+        MockTransportService nonMasterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
+                nonMasterNode);
         nonMasterTransportService.addFailToSendNoConnectRule(masterTranspotService);
 
         assertNoMaster(nonMasterNode);
 
         logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
-        MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
-        TransportService localTransportService = internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName());
+        MockTransportService masterTransportService =
+            (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
+        TransportService localTransportService =
+            internalCluster().getInstance(TransportService.class, discoveryNodes.getLocalNode().getName());
         if (randomBoolean()) {
             masterTransportService.addFailToSendNoConnectRule(localTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
         } else {
@@ -864,9 +900,11 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
 
         logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
         final CountDownLatch countDownLatch = new CountDownLatch(2);
-        nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService.original()) {
+        nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
+                .original()) {
             @Override
-            public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+            public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions
+                    options) throws IOException, TransportException {
                 if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
                     countDownLatch.countDown();
                 }
@@ -894,16 +932,16 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         List<String> nonMasterNodes = nodes.stream().filter(node -> !node.equals(masterNode)).collect(Collectors.toList());
         String nonMasterNode = randomFrom(nonMasterNodes);
         assertAcked(prepareCreate("test")
-            .setSettings(Settings.builder()
-                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
-                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
-            ));
+                .setSettings(Settings.builder()
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 3)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
+                ));
         ensureGreen();
         String nonMasterNodeId = internalCluster().clusterService(nonMasterNode).localNode().getId();
 
         // fail a random shard
         ShardRouting failedShard =
-            randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
+                randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED));
         ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode);
         CountDownLatch latch = new CountDownLatch(1);
         AtomicBoolean success = new AtomicBoolean();
@@ -912,7 +950,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
         networkPartition.startDisrupting();
 
-        service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() {
+        service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new
+                ShardStateAction.Listener() {
             @Override
             public void onSuccess() {
                 success.set(true);
@@ -989,7 +1028,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         }
 
         logger.info("blocking request from master [{}] to [{}]", masterNode, nonMasterNode);
-        MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
+        MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class,
+                masterNode);
         if (randomBoolean()) {
             masterTransportService.addUnresponsiveRule(internalCluster().getInstance(TransportService.class, nonMasterNode));
         } else {
@@ -1021,9 +1061,9 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         final String masterNode = masterNodeFuture.get();
         logger.info("--> creating index [test] with one shard and on replica");
         assertAcked(prepareCreate("test").setSettings(
-                        Settings.builder().put(indexSettings())
-                                .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
-                                .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
+                Settings.builder().put(indexSettings())
+                        .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
+                        .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
         );
         ensureGreen("test");
 
@@ -1040,7 +1080,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
         MockTransportService transportServiceNode2 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_2);
         CountDownLatch beginRelocationLatch = new CountDownLatch(1);
         CountDownLatch endRelocationLatch = new CountDownLatch(1);
-        transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch, endRelocationLatch));
+        transportServiceNode2.addTracer(new IndicesStoreIntegrationIT.ReclocationStartEndTracer(logger, beginRelocationLatch,
+                endRelocationLatch));
         internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node_1, node_2)).get();
         // wait for relocation to start
         beginRelocationLatch.await();
@@ -1177,7 +1218,8 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
                 assertNull("node [" + node + "] still has [" + state.nodes().getMasterNode() + "] as master", state.nodes().getMasterNode());
                 if (expectedBlocks != null) {
                     for (ClusterBlockLevel level : expectedBlocks.levels()) {
-                        assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock(level));
+                        assertTrue("node [" + node + "] does have level [" + level + "] in it's blocks", state.getBlocks().hasGlobalBlock
+                                (level));
                     }
                 }
             }

+ 2 - 1
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.transport.DummyTransportAddress;
 import org.elasticsearch.discovery.zen.ping.ZenPing;
 import org.elasticsearch.test.ESTestCase;
@@ -64,7 +65,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
         assertTrue("should ignore, because new state's version is lower to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
         currentState.version(1);
         newState.version(1);
-        assertFalse("should not ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
+        assertTrue("should ignore, because new state's version is equal to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));
         currentState.version(1);
         newState.version(2);
         assertFalse("should not ignore, because new state's version is higher to current state's version", shouldIgnoreOrRejectNewClusterState(logger, currentState.build(), newState.build()));

+ 3 - 2
core/src/test/java/org/elasticsearch/discovery/zen/publish/PendingClusterStatesQueueTests.java

@@ -195,10 +195,11 @@ public class PendingClusterStatesQueueTests extends ESTestCase {
                 highestCommitted = context.state;
             }
         }
+        assert highestCommitted != null;
 
         queue.markAsProcessed(highestCommitted);
-        assertThat(queue.stats().getTotal(), equalTo(states.size() - committedContexts.size()));
-        assertThat(queue.stats().getPending(), equalTo(states.size() - committedContexts.size()));
+        assertThat((long)queue.stats().getTotal(), equalTo(states.size() - (1 + highestCommitted.version())));
+        assertThat((long)queue.stats().getPending(), equalTo(states.size() - (1 + highestCommitted.version())));
         assertThat(queue.stats().getCommitted(), equalTo(0));
     }
 

+ 53 - 17
core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java

@@ -63,16 +63,20 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
@@ -159,7 +163,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
         DiscoveryNodeService discoveryNodeService = new DiscoveryNodeService(settings, version);
         DiscoveryNode discoveryNode = discoveryNodeService.buildLocalNode(service.boundAddress().publishAddress());
         MockNode node = new MockNode(discoveryNode, service, listener, logger);
-        node.action = buildPublishClusterStateAction(settings, service, node, node);
+        node.action = buildPublishClusterStateAction(settings, service, () -> node.clusterState, node);
         final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);
         TransportConnectionListener waitForConnection = new TransportConnectionListener() {
             @Override
@@ -231,10 +235,21 @@ public class PublishClusterStateActionTests extends ESTestCase {
         return transportService;
     }
 
-    protected MockPublishAction buildPublishClusterStateAction(Settings settings, MockTransportService transportService, DiscoveryNodesProvider nodesProvider,
-                                                               PublishClusterStateAction.NewPendingClusterStateListener listener) {
-        DiscoverySettings discoverySettings = new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
-        return new MockPublishAction(settings, transportService, nodesProvider, listener, discoverySettings, ClusterName.DEFAULT);
+    protected MockPublishAction buildPublishClusterStateAction(
+            Settings settings,
+            MockTransportService transportService,
+            Supplier<ClusterState> clusterStateSupplier,
+            PublishClusterStateAction.NewPendingClusterStateListener listener
+    ) {
+        DiscoverySettings discoverySettings =
+                new DiscoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
+        return new MockPublishAction(
+                settings,
+                transportService,
+                clusterStateSupplier,
+                listener,
+                discoverySettings,
+                ClusterName.DEFAULT);
     }
 
     public void testSimpleClusterStatePublishing() throws Exception {
@@ -596,11 +611,12 @@ public class PublishClusterStateActionTests extends ESTestCase {
             node.action.validateIncomingState(state, node.clusterState);
             fail("node accepted state from another master");
         } catch (IllegalStateException OK) {
+            assertThat(OK.toString(), containsString("cluster state from a different master than the current one, rejecting"));
         }
 
         logger.info("--> test state from the current master is accepted");
         node.action.validateIncomingState(ClusterState.builder(node.clusterState)
-                .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).build(), node.clusterState);
+                .nodes(DiscoveryNodes.builder(node.nodes()).masterNodeId("master")).incrementVersion().build(), node.clusterState);
 
 
         logger.info("--> testing rejection of another cluster name");
@@ -608,6 +624,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
             node.action.validateIncomingState(ClusterState.builder(new ClusterName(randomAsciiOfLength(10))).nodes(node.nodes()).build(), node.clusterState);
             fail("node accepted state with another cluster name");
         } catch (IllegalStateException OK) {
+            assertThat(OK.toString(), containsString("received state from a node that is not part of the cluster"));
         }
 
         logger.info("--> testing rejection of a cluster state with wrong local node");
@@ -618,6 +635,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
             node.action.validateIncomingState(state, node.clusterState);
             fail("node accepted state with non-existence local node");
         } catch (IllegalStateException OK) {
+            assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node"));
         }
 
         try {
@@ -628,12 +646,22 @@ public class PublishClusterStateActionTests extends ESTestCase {
             node.action.validateIncomingState(state, node.clusterState);
             fail("node accepted state with existent but wrong local node");
         } catch (IllegalStateException OK) {
+            assertThat(OK.toString(), containsString("received state with a local node that does not match the current local node"));
         }
 
         logger.info("--> testing acceptance of an old cluster state");
-        state = node.clusterState;
+        final ClusterState incomingState = node.clusterState;
         node.clusterState = ClusterState.builder(node.clusterState).incrementVersion().build();
-        node.action.validateIncomingState(state, node.clusterState);
+        final IllegalStateException e =
+                expectThrows(IllegalStateException.class, () -> node.action.validateIncomingState(incomingState, node.clusterState));
+        final String message = String.format(
+                Locale.ROOT,
+                "rejecting cluster state version [%d] uuid [%s] received from [%s]",
+                incomingState.version(),
+                incomingState.stateUUID(),
+                incomingState.nodes().getMasterNodeId()
+        );
+        assertThat(e, hasToString("java.lang.IllegalStateException: " + message));
 
         // an older version from a *new* master is also OK!
         ClusterState previousState = ClusterState.builder(node.clusterState).incrementVersion().build();
@@ -646,18 +674,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
         node.action.validateIncomingState(state, previousState);
     }
 
-    public void testInterleavedPublishCommit() throws Throwable {
+    public void testOutOfOrderCommitMessages() throws Throwable {
         MockNode node = createMockNode("node").setAsMaster();
         final CapturingTransportChannel channel = new CapturingTransportChannel();
 
         List<ClusterState> states = new ArrayList<>();
-        final int numOfStates = scaledRandomIntBetween(3, 10);
+        final int numOfStates = scaledRandomIntBetween(3, 25);
         for (int i = 1; i <= numOfStates; i++) {
             states.add(ClusterState.builder(node.clusterState).version(i).stateUUID(ClusterState.UNKNOWN_UUID).build());
         }
 
         final ClusterState finalState = states.get(numOfStates - 1);
-        Collections.shuffle(states, random());
 
         logger.info("--> publishing states");
         for (ClusterState state : states) {
@@ -667,19 +694,28 @@ public class PublishClusterStateActionTests extends ESTestCase {
             assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
             assertThat(channel.error.get(), nullValue());
             channel.clear();
+
         }
 
         logger.info("--> committing states");
 
+        long largestVersionSeen = Long.MIN_VALUE;
         Randomness.shuffle(states);
         for (ClusterState state : states) {
             node.action.handleCommitRequest(new PublishClusterStateAction.CommitClusterStateRequest(state.stateUUID()), channel);
-            assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
-            if (channel.error.get() != null) {
-                throw channel.error.get();
+            if (largestVersionSeen < state.getVersion()) {
+                assertThat(channel.response.get(), equalTo((TransportResponse) TransportResponse.Empty.INSTANCE));
+                if (channel.error.get() != null) {
+                    throw channel.error.get();
+                }
+                largestVersionSeen = state.getVersion();
+            } else {
+                // older cluster states will be rejected
+                assertNotNull(channel.error.get());
+                assertThat(channel.error.get(), instanceOf(IllegalStateException.class));
             }
+            channel.clear();
         }
-        channel.clear();
 
         //now check the last state held
         assertSameState(node.clusterState, finalState);
@@ -817,8 +853,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
         AtomicBoolean timeoutOnCommit = new AtomicBoolean();
         AtomicBoolean errorOnCommit = new AtomicBoolean();
 
-        public MockPublishAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
-            super(settings, transportService, nodesProvider, listener, discoverySettings, clusterName);
+        public MockPublishAction(Settings settings, TransportService transportService, Supplier<ClusterState> clusterStateSupplier, NewPendingClusterStateListener listener, DiscoverySettings discoverySettings, ClusterName clusterName) {
+            super(settings, transportService, clusterStateSupplier, listener, discoverySettings, clusterName);
         }
 
         @Override

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -309,8 +309,8 @@ public final class InternalTestCluster extends TestCluster {
             builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10));
             builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 5, 10));
         } else if (random.nextInt(100) <= 90) {
-                builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
-                builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
+            builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
+            builder.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), RandomInts.randomIntBetween(random, 2, 5));
         }
         // always reduce this - it can make tests really slow
         builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
@@ -544,7 +544,7 @@ public final class InternalTestCluster extends TestCluster {
         logger.info("changing cluster size from {} to {}, {} data nodes", size(), n + numShareCoordOnlyNodes, n);
         Set<NodeAndClient> nodesToRemove = new HashSet<>();
         int numNodesAndClients = 0;
-        while (values.hasNext() && numNodesAndClients++ < size-n) {
+        while (values.hasNext() && numNodesAndClients++ < size - n) {
             NodeAndClient next = values.next();
             nodesToRemove.add(next);
             removeDisruptionSchemeFromNode(next);