瀏覽代碼

An improvement to unicast discovery to also ping nodes the node itself received a ping from.
Also moved the unicast tests all in a single package.

Closes #5508

Martijn van Groningen 11 年之前
父節點
當前提交
3b73209123

+ 8 - 1
src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java

@@ -240,7 +240,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
         DiscoveryNodes discoNodes = nodesProvider.nodes();
         pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
 
-        List<DiscoveryNode> nodesToPing = newArrayList(nodes);
+        HashSet<DiscoveryNode> nodesToPing = new HashSet<>(Arrays.asList(nodes));
+        for (PingResponse temporalResponse : temporalResponses) {
+            // Only send pings to nodes that have the same cluster name.
+            if (clusterName.equals(temporalResponse.clusterName())) {
+                nodesToPing.add(temporalResponse.target());
+            }
+        }
+
         for (UnicastHostsProvider provider : hostsProviders) {
             nodesToPing.addAll(provider.buildDynamicNodes());
         }

+ 4 - 8
src/test/java/org/elasticsearch/discovery/DiscoveryTests.java → src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java

@@ -29,19 +29,15 @@ import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;
 
-@ClusterScope(scope=Scope.SUITE, numNodes=2)
-public class DiscoveryTests extends ElasticsearchIntegrationTest {
+@ClusterScope(scope=Scope.TEST, numNodes=2)
+public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
 
     @Override
     protected Settings nodeSettings(int nodeOrdinal) {
         return ImmutableSettings.settingsBuilder()
                 .put("discovery.zen.ping.multicast.enabled", false)
-                        // Can't use this, b/c at the moment all node will only ping localhost:9300 and the shared
-                        // cluster will be running there, which leads of no node joining, because the cluster name
-                        // isn't equal.
-//                .put("discovery.zen.ping.unicast.hosts", "localhost")
-                .put("discovery.zen.ping.unicast.hosts", "localhost:25300,localhost:25301")
-                .put("transport.tcp.port", "25300-25400")
+                .put("discovery.zen.ping.unicast.hosts", "localhost")
+                .put("transport.tcp.port", "25300-25400") // Need to use custom tcp port range otherwise we collide with the shared cluster
                 .put(super.nodeSettings(nodeOrdinal)).build();
     }
     

+ 106 - 0
src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsMinimumMasterNodes.java

@@ -0,0 +1,106 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.discovery;
+
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+import static org.hamcrest.Matchers.equalTo;
+
+@ClusterScope(scope=Scope.TEST, numNodes=0)
+public class ZenUnicastDiscoveryTestsMinimumMasterNodes extends ElasticsearchIntegrationTest {
+
+    @Test
+    // Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this
+    // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=3
+    // can't be satisfied.
+    public void testUnicastDiscovery() throws Exception {
+        final Settings settings = ImmutableSettings.settingsBuilder()
+                .put("discovery.zen.ping.multicast.enabled", false)
+                .put("discovery.zen.minimum_master_nodes", 2)
+                .put("discovery.zen.ping.unicast.hosts", "localhost")
+                .put("transport.tcp.port", "25400-25500") // Need to use custom tcp port range otherwise we collide with the shared cluster
+                .build();
+
+        final CountDownLatch latch = new CountDownLatch(3);
+        final AtomicArray<String> nodes = new AtomicArray<>(3);
+        Runnable r1 = new Runnable() {
+
+            @Override
+            public void run() {
+                logger.info("--> start first node");
+                nodes.set(0, cluster().startNode(settings));
+                latch.countDown();
+            }
+        };
+        new Thread(r1).start();
+
+        sleep(between(500, 3000));
+        Runnable r2 = new Runnable() {
+
+            @Override
+            public void run() {
+                logger.info("--> start second node");
+                nodes.set(1, cluster().startNode(settings));
+                latch.countDown();
+            }
+        };
+        new Thread(r2).start();
+
+
+        sleep(between(500, 3000));
+        Runnable r3 = new Runnable() {
+
+            @Override
+            public void run() {
+                logger.info("--> start third node");
+                nodes.set(2, cluster().startNode(settings));
+                latch.countDown();
+            }
+        };
+        new Thread(r3).start();
+        latch.await();
+
+        ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet();
+        assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
+
+        DiscoveryNode masterDiscoNode = null;
+        for (String node : nodes.toArray(new String[3])) {
+            ClusterState state = cluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
+            assertThat(state.nodes().size(), equalTo(3));
+            if (masterDiscoNode == null) {
+                masterDiscoNode = state.nodes().masterNode();
+            } else {
+                assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true));
+            }
+        }
+    }
+}

+ 3 - 2
src/test/java/org/elasticsearch/cluster/ZenUnicastDiscoveryTests.java → src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsSpecificNodes.java

@@ -17,10 +17,11 @@
  * under the License.
  */
 
-package org.elasticsearch.cluster;
+package org.elasticsearch.discovery;
 
 import org.apache.lucene.util.LuceneTestCase;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
@@ -38,7 +39,7 @@ import static org.hamcrest.Matchers.equalTo;
  */
 @LuceneTestCase.Slow
 @ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0)
-public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
+public class ZenUnicastDiscoveryTestsSpecificNodes extends ElasticsearchIntegrationTest {
 
     @Test
     @TestLogging("discovery.zen:TRACE")