Browse Source

Publishing a cluster state should clear the pending states queue (#21259)

The pending cluster state queue is used to hold incoming cluster states from the master. Currently the elected master doesn't publish to itself as thus the queue is not used. Sometimes, depending on the timing of disruptions, a pending cluster state can be put on the queue (but not committed) but another master before being isolated. If this happens concurrently with a master election the elected master can have a pending cluster state in its queue. This is not really a problem but it does confuse our assertions during tests as we check to see everything was processed correctly.

This commit takes a temporary step to clear (and fail) any pending cluster state on the master after it has successfully published a CS. Most notably this will happen when the master publishes the cluster state indicating it has just become the master.

Long term we are working to change the publishing mechanism to make the master use the pending queue just like other nodes, which will make this a non issue.

See https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+5.x+java9-periodic/509 for example.
Boaz Leskes 9 years ago
parent
commit
7276737a03

+ 9 - 0
core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

@@ -322,6 +322,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
 
         // update the set of nodes to ping after the new cluster state has been published
         nodesFD.updateNodesAndPing(clusterChangedEvent.state());
+
+        // clean the pending cluster queue - we are currently master, so any pending cluster state should be failed
+        // note that we also clean the queue on master failure (see handleMasterGone) but a delayed cluster state publish
+        // from a stale master can still make it in the queue during the election (but not be committed)
+        publishClusterState.pendingStatesQueue().failAllStatesAndClear(new ElasticsearchException("elected as master"));
     }
 
     /**
@@ -362,6 +367,10 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
         return publishClusterState.pendingStatesQueue().pendingClusterStates();
     }
 
+    PendingClusterStatesQueue pendingClusterStatesQueue() {
+        return publishClusterState.pendingStatesQueue();
+    }
+
     /**
      * the main function of a join thread. This function is guaranteed to join the cluster
      * or spawn a new join thread upon failure to do so.

+ 55 - 1
core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

@@ -55,7 +55,9 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M
 import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
 import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
 import static org.elasticsearch.test.ClusterServiceUtils.setState;
+import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.emptyArray;
 import static org.hamcrest.Matchers.equalTo;
 
 public class ZenDiscoveryUnitTests extends ESTestCase {
@@ -182,7 +184,6 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
             toClose.add(otherZen);
             otherTransport.acceptIncomingRequests();
 
-
             masterTransport.connectToNode(otherNode);
             otherTransport.connectToNode(masterNode);
 
@@ -213,6 +214,59 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
         }
     }
 
+    public void testPendingCSQueueIsClearedWhenClusterStatePublished() throws Exception {
+        ThreadPool threadPool = new TestThreadPool(getClass().getName());
+        // randomly make minimum_master_nodes a value higher than we have nodes for, so it will force failure
+        int minMasterNodes =  randomBoolean() ? 3 : 1;
+        Settings settings = Settings.builder()
+            .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build();
+
+        ArrayList<Closeable> toClose = new ArrayList<>();
+        try {
+            final MockTransportService masterTransport = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null);
+            masterTransport.start();
+            DiscoveryNode masterNode = new DiscoveryNode("master",  masterTransport.boundAddress().publishAddress(), Version.CURRENT);
+            toClose.add(masterTransport);
+            masterTransport.setLocalNode(masterNode);
+            ClusterState state = ClusterStateCreationUtils.state(masterNode, null, masterNode);
+            // build the zen discovery and cluster service
+            ClusterService masterClusterService = createClusterService(threadPool, masterNode);
+            toClose.add(masterClusterService);
+            state = ClusterState.builder(masterClusterService.getClusterName()).nodes(state.nodes()).build();
+            setState(masterClusterService, state);
+            ZenDiscovery masterZen = buildZenDiscovery(settings, masterTransport, masterClusterService, threadPool);
+            toClose.add(masterZen);
+            masterTransport.acceptIncomingRequests();
+
+            // inject a pending cluster state
+            masterZen.pendingClusterStatesQueue().addPending(ClusterState.builder(new ClusterName("foreign")).build());
+
+            // a new cluster state with a new discovery node (we will test if the cluster state
+            // was updated by the presence of this node in NodesFaultDetection)
+            ClusterState newState = ClusterState.builder(masterClusterService.state()).incrementVersion().nodes(
+                DiscoveryNodes.builder(masterClusterService.state().nodes()).masterNodeId(masterNode.getId())
+            ).build();
+
+
+            try {
+                // publishing a new cluster state
+                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state);
+                AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1);
+                masterZen.publish(clusterChangedEvent, listener);
+                listener.await(1, TimeUnit.HOURS);
+                // publish was a success, check that queue as cleared
+                assertThat(masterZen.pendingClusterStates(), emptyArray());
+            } catch (Discovery.FailedToCommitClusterStateException e) {
+                // not successful, so the pending queue should stay
+                assertThat(masterZen.pendingClusterStates(), arrayWithSize(1));
+                assertThat(masterZen.pendingClusterStates()[0].getClusterName().value(), equalTo("foreign"));
+            }
+        } finally {
+            IOUtils.close(toClose);
+            terminate(threadPool);
+        }
+    }
+
     private ZenDiscovery buildZenDiscovery(Settings settings, TransportService service, ClusterService clusterService, ThreadPool threadPool) {
         ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
         ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet());