Browse Source

Publish to self through transport (#43994)

This commit ensures that cluster state publications to self also go through the transport layer. This
allows voting-only nodes to intercept the publication to self.

Fixes an issue discovered by a test failure where a voting-only node, which was the only
bootstrapped node, would not step down as master after state transfer because publishing to self
would succeed.

Closes #43631
Yannick Welsch 6 years ago
parent
commit
50e0347028

+ 42 - 27
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

@@ -38,7 +38,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.BytesTransportRequest;
@@ -71,6 +70,13 @@ public class PublicationTransportHandler {
 
     private AtomicReference<ClusterState> lastSeenClusterState = new AtomicReference<>();
 
+    // the master needs the original non-serialized state as the cluster state contains some volatile information that we
+    // don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
+    // because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
+    // snapshot code).
+    // TODO: look into these and check how to get rid of them
+    private AtomicReference<PublishRequest> currentPublishRequestToSelf = new AtomicReference<>();
+
     private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
     private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
     private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
@@ -153,32 +159,32 @@ public class PublicationTransportHandler {
         return new PublicationContext() {
             @Override
             public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
-                                           ActionListener<PublishWithJoinResponse> responseActionListener) {
+                                           ActionListener<PublishWithJoinResponse> originalListener) {
                 assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
+                final ActionListener<PublishWithJoinResponse> responseActionListener;
                 if (destination.equals(nodes.getLocalNode())) {
-                    // the master needs the original non-serialized state as the cluster state contains some volatile information that we
-                    // don't want to be replicated because it's not usable on another node (e.g. UnassignedInfo.unassignedTimeNanos) or
-                    // because it's mostly just debugging info that would unnecessarily blow up CS updates (I think there was one in
-                    // snapshot code).
-                    // TODO: look into these and check how to get rid of them
-                    transportService.getThreadPool().generic().execute(new AbstractRunnable() {
-                        @Override
-                        public void onFailure(Exception e) {
-                            // wrap into fake TransportException, as that's what we expect in Publication
-                            responseActionListener.onFailure(new TransportException(e));
-                        }
-
+                    // if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
+                    final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(publishRequest);
+                    assert previousRequest == null;
+                    responseActionListener = new ActionListener<PublishWithJoinResponse>() {
                         @Override
-                        protected void doRun() {
-                            responseActionListener.onResponse(handlePublishRequest.apply(publishRequest));
+                        public void onResponse(PublishWithJoinResponse publishWithJoinResponse) {
+                            final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(null);
+                            assert previousRequest == publishRequest;
+                            originalListener.onResponse(publishWithJoinResponse);
                         }
 
                         @Override
-                        public String toString() {
-                            return "publish to self of " + publishRequest;
+                        public void onFailure(Exception e) {
+                            final PublishRequest previousRequest = currentPublishRequestToSelf.getAndSet(null);
+                            assert previousRequest == publishRequest;
+                            originalListener.onFailure(e);
                         }
-                    });
-                } else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
+                    };
+                } else {
+                    responseActionListener = originalListener;
+                }
+                if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
                     logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
                     PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
                 } else {
@@ -267,10 +273,6 @@ public class PublicationTransportHandler {
                                                     Map<Version, BytesReference> serializedDiffs) {
         Diff<ClusterState> diff = null;
         for (DiscoveryNode node : discoveryNodes) {
-            if (node.equals(discoveryNodes.getLocalNode())) {
-                // ignore, see newPublicationContext
-                continue;
-            }
             try {
                 if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                     if (serializedStates.containsKey(node.getVersion()) == false) {
@@ -356,7 +358,7 @@ public class PublicationTransportHandler {
                 fullClusterStateReceivedCount.incrementAndGet();
                 logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
                     request.bytes().length());
-                final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
+                final PublishWithJoinResponse response = acceptState(incomingState);
                 lastSeenClusterState.set(incomingState);
                 return response;
             } else {
@@ -366,7 +368,7 @@ public class PublicationTransportHandler {
                     incompatibleClusterStateDiffReceivedCount.incrementAndGet();
                     throw new IncompatibleClusterStateVersionException("have no local cluster state");
                 } else {
-                    final ClusterState incomingState;
+                    ClusterState incomingState;
                     try {
                         Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
                         incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
@@ -380,7 +382,7 @@ public class PublicationTransportHandler {
                     compatibleClusterStateDiffReceivedCount.incrementAndGet();
                     logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
                         incomingState.version(), incomingState.stateUUID(), request.bytes().length());
-                    final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
+                    final PublishWithJoinResponse response = acceptState(incomingState);
                     lastSeenClusterState.compareAndSet(lastSeen, incomingState);
                     return response;
                 }
@@ -389,4 +391,17 @@ public class PublicationTransportHandler {
             IOUtils.close(in);
         }
     }
+
+    private PublishWithJoinResponse acceptState(ClusterState incomingState) {
+        // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
+        if (transportService.getLocalNode().equals(incomingState.nodes().getMasterNode())) {
+            final PublishRequest publishRequest = currentPublishRequestToSelf.get();
+            if (publishRequest == null || publishRequest.getAcceptedState().stateUUID().equals(incomingState.stateUUID()) == false) {
+                throw new IllegalStateException("publication to self failed for " + publishRequest);
+            } else {
+                return handlePublishRequest.apply(publishRequest);
+            }
+        }
+        return handlePublishRequest.apply(new PublishRequest(incomingState));
+    }
 }

+ 6 - 17
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

@@ -770,23 +770,12 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
 
         for (ClusterNode cn : cluster.clusterNodes) {
             assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
-            if (cn == leader) {
-                // leader does not update publish stats as it's not using the serialized state
-                assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
-                    postPublishStats.get(cn).getFullClusterStateReceivedCount());
-                assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
-                    postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
-                assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
-                    postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
-            } else {
-                // followers receive a diff
-                assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
-                    postPublishStats.get(cn).getFullClusterStateReceivedCount());
-                assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
-                    postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
-                assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
-                    postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
-            }
+            assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
+                postPublishStats.get(cn).getFullClusterStateReceivedCount());
+            assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
+                postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
+            assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
+                postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
         }
     }
 

+ 3 - 20
server/src/test/java/org/elasticsearch/cluster/coordination/ZenDiscoveryIT.java

@@ -28,7 +28,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -49,6 +48,7 @@ import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
@@ -150,21 +150,6 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
     }
 
     public void testDiscoveryStats() throws Exception {
-        String expectedStatsJsonResponse = "{\n" +
-                "  \"discovery\" : {\n" +
-                "    \"cluster_state_queue\" : {\n" +
-                "      \"total\" : 0,\n" +
-                "      \"pending\" : 0,\n" +
-                "      \"committed\" : 0\n" +
-                "    },\n" +
-                "    \"published_cluster_states\" : {\n" +
-                "      \"full_states\" : 0,\n" +
-                "      \"incompatible_diffs\" : 0,\n" +
-                "      \"compatible_diffs\" : 0\n" +
-                "    }\n" +
-                "  }\n" +
-                "}";
-
         internalCluster().startNode();
         ensureGreen(); // ensures that all events are processed (in particular state recovery fully completed)
         assertBusy(() ->
@@ -182,15 +167,13 @@ public class ZenDiscoveryIT extends ESIntegTestCase {
         assertThat(stats.getQueueStats().getPending(), equalTo(0));
 
         assertThat(stats.getPublishStats(), notNullValue());
-        assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), equalTo(0L));
+        assertThat(stats.getPublishStats().getFullClusterStateReceivedCount(), greaterThanOrEqualTo(0L));
         assertThat(stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount(), equalTo(0L));
-        assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), equalTo(0L));
+        assertThat(stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount(), greaterThanOrEqualTo(0L));
 
         XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
         builder.startObject();
         stats.toXContent(builder, ToXContent.EMPTY_PARAMS);
         builder.endObject();
-
-        assertThat(Strings.toString(builder), equalTo(expectedStatsJsonResponse));
     }
 }

+ 11 - 0
x-pack/plugin/voting-only-node/src/test/java/org/elasticsearch/cluster/coordination/VotingOnlyNodePluginTests.java

@@ -80,6 +80,17 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
             equalTo(false));
     }
 
+    public void testBootstrapOnlySingleVotingOnlyNode() throws Exception {
+        internalCluster().setBootstrapMasterNodeIndex(0);
+        internalCluster().startNode(Settings.builder().put(VotingOnlyNodePlugin.VOTING_ONLY_NODE_SETTING.getKey(), true)
+            .put(Node.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build());
+        internalCluster().startNode();
+        assertBusy(() -> assertThat(client().admin().cluster().prepareState().get().getState().getNodes().getSize(), equalTo(2)));
+        assertThat(
+            VotingOnlyNodePlugin.isVotingOnlyNode(client().admin().cluster().prepareState().get().getState().nodes().getMasterNode()),
+            equalTo(false));
+    }
+
     public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exception {
         internalCluster().setBootstrapMasterNodeIndex(0);
         internalCluster().startNode();