Browse Source

Java TransportClient: By default, don't sniff other nodes and use addresses provided as is, closes #565.

kimchy 15 years ago
parent
commit
415bb5d7f3

+ 0 - 10
modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java

@@ -144,11 +144,6 @@ public class TransportClient extends AbstractClient {
         injector = modules.createInjector();
 
         injector.getInstance(TransportService.class).start();
-        try {
-            injector.getInstance(TransportClientClusterService.class).start();
-        } catch (Exception e) {
-            // ignore
-        }
 
         nodesService = injector.getInstance(TransportClientNodesService.class);
         internalClient = injector.getInstance(InternalTransportClient.class);
@@ -197,11 +192,6 @@ public class TransportClient extends AbstractClient {
      * Closes the client.
      */
     @Override public void close() {
-        try {
-            injector.getInstance(TransportClientClusterService.class).close();
-        } catch (Exception e) {
-            // ignore
-        }
         injector.getInstance(TransportClientNodesService.class).close();
         injector.getInstance(TransportService.class).close();
         try {

+ 0 - 60
modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientClusterService.java

@@ -1,60 +0,0 @@
-/*
- * Licensed to Elastic Search and Shay Banon under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. Elastic Search licenses this
- * file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.client.transport;
-
-import org.elasticsearch.cluster.ClusterService;
-import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.discovery.DiscoveryService;
-
-/**
- * @author kimchy (Shay Banon)
- */
-public class TransportClientClusterService extends AbstractComponent {
-
-    private final ClusterService clusterService;
-
-    private final TransportClientNodesService nodesService;
-
-    private final DiscoveryService discoveryService;
-
-    @Inject public TransportClientClusterService(Settings settings, ClusterService clusterService, TransportClientNodesService nodesService,
-                                                 DiscoveryService discoveryService) {
-        super(settings);
-        this.clusterService = clusterService;
-        this.nodesService = nodesService;
-        this.discoveryService = discoveryService;
-
-        clusterService.add(nodesService);
-    }
-
-    public void start() {
-        clusterService.add(nodesService);
-        clusterService.start();
-        discoveryService.start();
-    }
-
-    public void close() {
-        clusterService.remove(nodesService);
-        clusterService.close();
-        discoveryService.close();
-    }
-}

+ 33 - 34
modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

@@ -21,13 +21,11 @@ package org.elasticsearch.client.transport;
 
 import org.elasticsearch.ElasticSearchException;
 import org.elasticsearch.action.TransportActions;
+import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.client.Requests;
-import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.collect.ImmutableList;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
@@ -51,7 +49,7 @@ import static org.elasticsearch.common.unit.TimeValue.*;
 /**
  * @author kimchy (shay.banon)
  */
-public class TransportClientNodesService extends AbstractComponent implements ClusterStateListener {
+public class TransportClientNodesService extends AbstractComponent {
 
     private final TimeValue nodesSamplerInterval;
 
@@ -68,11 +66,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
 
     private volatile ImmutableList<DiscoveryNode> nodes = ImmutableList.of();
 
-    private volatile DiscoveryNodes discoveredNodes;
-
     private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
 
-    private final ScheduledNodesSampler nodesSampler = new ScheduledNodesSampler();
+    private final Runnable nodesSampler;
 
     private final ScheduledFuture nodesSamplerFuture;
 
@@ -85,12 +81,17 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
         this.transportService = transportService;
         this.threadPool = threadPool;
 
-        this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(1));
+        this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5));
 
         if (logger.isDebugEnabled()) {
             logger.debug("node_sampler_interval[" + nodesSamplerInterval + "]");
         }
 
+        if (componentSettings.getAsBoolean("sniff", false)) {
+            this.nodesSampler = new ScheduledSniffNodesSampler();
+        } else {
+            this.nodesSampler = new ScheduledConnectNodeSampler();
+        }
         this.nodesSamplerFuture = threadPool.scheduleWithFixedDelay(nodesSampler, nodesSamplerInterval);
 
         // we want the transport service to throw connect exceptions, so we can retry
@@ -112,7 +113,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
     public TransportClientNodesService addTransportAddress(TransportAddress transportAddress) {
         synchronized (transportMutex) {
             ImmutableList.Builder<DiscoveryNode> builder = ImmutableList.builder();
-            listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#temp#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build();
+            listedNodes = builder.addAll(listedNodes).add(new DiscoveryNode("#transport#-" + tempNodeIdGenerator.incrementAndGet(), transportAddress)).build();
         }
         nodesSampler.run();
         return this;
@@ -155,24 +156,26 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
             transportService.disconnectFromNode(listedNode);
     }
 
-    @Override public void clusterChanged(ClusterChangedEvent event) {
-        for (DiscoveryNode node : event.nodesDelta().addedNodes()) {
-            try {
-                transportService.connectToNode(node);
-            } catch (Exception e) {
-                logger.warn("Failed to connect to discovered node [" + node + "]", e);
+    private class ScheduledConnectNodeSampler implements Runnable {
+        @Override public synchronized void run() {
+            HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
+            for (DiscoveryNode node : listedNodes) {
+                if (!transportService.nodeConnected(node)) {
+                    try {
+                        transportService.connectToNode(node);
+                        newNodes.add(node);
+                    } catch (Exception e) {
+                        logger.debug("Failed to connect to node " + node + ", removed from nodes list", e);
+                    }
+                } else {
+                    newNodes.add(node);
+                }
             }
-        }
-        this.discoveredNodes = event.state().nodes();
-        HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>(nodes);
-        newNodes.addAll(discoveredNodes.nodes().values());
-        nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
-        for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
-            transportService.disconnectFromNode(node);
+            nodes = new ImmutableList.Builder<DiscoveryNode>().addAll(newNodes).build();
         }
     }
 
-    private class ScheduledNodesSampler implements Runnable {
+    private class ScheduledSniffNodesSampler implements Runnable {
 
         @Override public synchronized void run() {
             ImmutableList<DiscoveryNode> listedNodes = TransportClientNodesService.this.listedNodes;
@@ -183,7 +186,7 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
                     @Override public void run() {
                         try {
                             transportService.connectToNode(listedNode); // make sure we are connected to it
-                            transportService.sendRequest(listedNode, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfoRequest("_local"), new BaseTransportResponseHandler<NodesInfoResponse>() {
+                            transportService.sendRequest(listedNode, TransportActions.Admin.Cluster.Node.INFO, Requests.nodesInfoRequest("_all"), new BaseTransportResponseHandler<NodesInfoResponse>() {
 
                                 @Override public NodesInfoResponse newInstance() {
                                     return new NodesInfoResponse();
@@ -215,26 +218,22 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
 
             HashSet<DiscoveryNode> newNodes = new HashSet<DiscoveryNode>();
             for (NodesInfoResponse nodesInfoResponse : nodesInfoResponses) {
-                if (nodesInfoResponse.nodes().length > 0) {
-                    DiscoveryNode node = nodesInfoResponse.nodes()[0].node();
+                for (NodeInfo nodeInfo : nodesInfoResponse) {
                     if (!clusterName.equals(nodesInfoResponse.clusterName())) {
-                        logger.warn("Node {} not part of the cluster {}, ignoring...", node, clusterName);
+                        logger.warn("Node {} not part of the cluster {}, ignoring...", nodeInfo.node(), clusterName);
                     } else {
-                        newNodes.add(node);
+                        if (nodeInfo.node().dataNode()) { // only add data nodes to connect to
+                            newNodes.add(nodeInfo.node());
+                        }
                     }
-                } else {
-                    // should not really happen....
-                    logger.debug("No info returned from node...");
                 }
             }
-            if (discoveredNodes != null) {
-                newNodes.addAll(discoveredNodes.nodes().values());
-            }
             // now, make sure we are connected to all the updated nodes
             for (DiscoveryNode node : newNodes) {
                 try {
                     transportService.connectToNode(node);
                 } catch (Exception e) {
+                    newNodes.remove(node);
                     logger.debug("Failed to connect to discovered node [" + node + "]", e);
                 }
             }

+ 3 - 3
modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientDocumentActionsTests.java

@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -35,14 +35,14 @@ public class TransportClientDocumentActionsTests extends DocumentActionsTests {
 
     @Override protected Client getClient1() {
         TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
-        TransportClient client = new TransportClient(settingsBuilder().put("discovery.enabled", false).build());
+        TransportClient client = new TransportClient(settingsBuilder().put("client.transport.sniff", false).build());
         client.addTransportAddress(server1Address);
         return client;
     }
 
     @Override protected Client getClient2() {
         TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
-        TransportClient client = new TransportClient(settingsBuilder().put("discovery.enabled", false).build());
+        TransportClient client = new TransportClient(settingsBuilder().put("client.transport.sniff", false).build());
         client.addTransportAddress(server2Address);
         return client;
     }

+ 49 - 0
modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.java

@@ -0,0 +1,49 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.test.integration.client.transport;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.test.integration.document.DocumentActionsTests;
+import org.elasticsearch.transport.TransportService;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class TransportClientSniffDocumentActionsTests extends DocumentActionsTests {
+
+    @Override protected Client getClient1() {
+        TransportAddress server1Address = ((InternalNode) node("server1")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
+        TransportClient client = new TransportClient(settingsBuilder().put("client.transport.sniff", true).build());
+        client.addTransportAddress(server1Address);
+        return client;
+    }
+
+    @Override protected Client getClient2() {
+        TransportAddress server2Address = ((InternalNode) node("server2")).injector().getInstance(TransportService.class).boundAddress().publishAddress();
+        TransportClient client = new TransportClient(settingsBuilder().put("client.transport.sniff", true).build());
+        client.addTransportAddress(server2Address);
+        return client;
+    }
+}

+ 12 - 0
modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/TransportClientSniffDocumentActionsTests.yml

@@ -0,0 +1,12 @@
+cluster:
+  routing:
+    schedule: 100ms
+index:
+  number_of_shards: 5
+  number_of_replicas: 1
+
+# use large interval node sampler
+client:
+  transport:
+    nodes_sampler_interval: 30s
+